1
use std::fmt::Debug;
2
use std::sync::Arc;
3

            
4
use derive_where::derive_where;
5
use parking_lot::RwLock;
6

            
7
use crate::tasks::handle::{Handle, Id};
8
use crate::tasks::traits::Executable;
9
use crate::tasks::{Job, Keyed};
10

            
11
pub(crate) mod jobs;
12
mod managed_job;
13
pub(crate) use managed_job::ManagedJob;
14

            
15
#[cfg(test)]
16
mod tests;
17

            
18
/// A background jobs manager.
19
#[derive(Debug)]
20
284336
#[derive_where(Clone, Default)]
21
pub struct Manager<Key = ()> {
22
    // #[derive_where(default)]
23
    pub(crate) jobs: Arc<RwLock<jobs::Jobs<Key>>>,
24
}
25

            
26
impl<Key> Manager<Key>
27
where
28
    Key: Clone + std::hash::Hash + Eq + Send + Sync + Debug + 'static,
29
{
30
    /// Pushes a `job` into the queue. Pushing the same job definition twice
31
    /// will yield two tasks in the queue.
32
    #[cfg(test)]
33
1
    pub fn enqueue<J: Job + 'static>(&self, job: J) -> Handle<J::Output, J::Error> {
34
1
        let mut jobs = self.jobs.write();
35
1
        jobs.enqueue(job, None, self.clone())
36
1
    }
37

            
38
    /// Uses [`Keyed::key`] to ensure no other job with the same `key` is
39
    /// currently running. If another job is already running that matches, a
40
    /// clone of that [`Handle`] will be returned. When the job finishes, all
41
    /// [`Handle`] clones will be notified with a copy of the result.
42
278998
    pub fn lookup_or_enqueue<J: Keyed<Key>>(
43
278998
        &self,
44
278998
        job: J,
45
278998
    ) -> Handle<<J as Job>::Output, <J as Job>::Error> {
46
278998
        let mut jobs = self.jobs.write();
47
278998
        jobs.lookup_or_enqueue(job, self.clone())
48
278998
    }
49

            
50
266989
    fn job_completed<T: Clone + Send + Sync + 'static, E: Send + Sync + 'static>(
51
266989
        &self,
52
266989
        id: Id,
53
266989
        key: Option<&Key>,
54
266989
        result: Result<T, E>,
55
266989
    ) {
56
266989
        let mut jobs = self.jobs.write();
57
266989
        jobs.job_completed(id, key, result);
58
266989
    }
59

            
60
    /// Spawns a worker. In general, you shouldn't need to call this function
61
    /// directly.
62
21342
    pub fn spawn_worker(&self) {
63
21342
        let receiver = {
64
21342
            let jobs = self.jobs.read();
65
21342
            jobs.queue()
66
21342
        };
67
21342
        std::thread::Builder::new()
68
21342
            .name(String::from("bonsaidb-tasks"))
69
21342
            .spawn(move || worker_thread(&receiver))
70
21342
            .unwrap();
71
21342
    }
72
}
73

            
74
21342
fn worker_thread(receiver: &flume::Receiver<Box<dyn Executable>>) {
75
288331
    while let Ok(mut job) = receiver.recv() {
76
266989
        job.execute();
77
266989
    }
78
21342
}