1
use std::fmt::Debug;
2
use std::sync::Arc;
3

            
4
use derive_where::derive_where;
5
use parking_lot::RwLock;
6

            
7
use crate::tasks::handle::{Handle, Id};
8
use crate::tasks::traits::Executable;
9
use crate::tasks::{Job, Keyed};
10

            
11
pub(crate) mod jobs;
12
mod managed_job;
13
pub(crate) use managed_job::ManagedJob;
14

            
15
#[cfg(test)]
16
mod tests;
17

            
18
/// A background jobs manager.
19
#[derive(Debug)]
20
278780
#[derive_where(Clone, Default)]
21
pub struct Manager<Key = ()> {
22
    // #[derive_where(default)]
23
    pub(crate) jobs: Arc<RwLock<jobs::Jobs<Key>>>,
24
}
25

            
26
impl<Key> Manager<Key>
27
where
28
    Key: Clone + std::hash::Hash + Eq + Send + Sync + Debug + 'static,
29
{
30
    /// Pushes a `job` into the queue. Pushing the same job definition twice
31
    /// will yield two tasks in the queue.
32
    #[cfg(test)]
33
1
    pub fn enqueue<J: Job + 'static>(&self, job: J) -> Handle<J::Output, J::Error> {
34
1
        let mut jobs = self.jobs.write();
35
1
        jobs.enqueue(job, None, self.clone())
36
1
    }
37

            
38
    /// Uses [`Keyed::key`] to ensure no other job with the same `key` is
39
    /// currently running. If another job is already running that matches, a
40
    /// clone of that [`Handle`] will be returned. When the job finishes, all
41
    /// [`Handle`] clones will be notified with a copy of the result.
42
273580
    pub fn lookup_or_enqueue<J: Keyed<Key>>(
43
273580
        &self,
44
273580
        job: J,
45
273580
    ) -> Handle<<J as Job>::Output, <J as Job>::Error> {
46
273580
        let mut jobs = self.jobs.write();
47
273580
        jobs.lookup_or_enqueue(job, self.clone())
48
273580
    }
49

            
50
255640
    fn job_completed<T: Clone + Send + Sync + 'static, E: Send + Sync + 'static>(
51
255640
        &self,
52
255640
        id: Id,
53
255640
        key: Option<&Key>,
54
255640
        result: Result<T, E>,
55
255640
    ) {
56
255640
        let mut jobs = self.jobs.write();
57
255640
        jobs.job_completed(id, key, result);
58
255640
    }
59

            
60
    /// Spawns a worker. In general, you shouldn't need to call this function
61
    /// directly.
62
41578
    pub fn spawn_worker(&self) {
63
41578
        let receiver = {
64
41578
            let jobs = self.jobs.read();
65
41578
            jobs.queue()
66
41578
        };
67
41578
        std::thread::Builder::new()
68
41578
            .name(String::from("bonsaidb-tasks"))
69
41578
            .spawn(move || worker_thread(&receiver))
70
41578
            .unwrap();
71
41578
    }
72
}
73

            
74
41578
fn worker_thread(receiver: &flume::Receiver<Box<dyn Executable>>) {
75
297218
    while let Ok(mut job) = receiver.recv() {
76
255640
        job.execute();
77
255640
    }
78
41578
}