1
use std::fmt::Debug;
2
use std::sync::Arc;
3

            
4
use derive_where::derive_where;
5
use parking_lot::RwLock;
6

            
7
use crate::tasks::handle::{Handle, Id};
8
use crate::tasks::traits::Executable;
9
use crate::tasks::{Job, Keyed};
10

            
11
pub(crate) mod jobs;
12
mod managed_job;
13
pub(crate) use managed_job::ManagedJob;
14

            
15
#[cfg(test)]
16
mod tests;
17

            
18
/// A background jobs manager.
19
#[derive(Debug)]
20
275310
#[derive_where(Clone, Default)]
21
pub struct Manager<Key = ()> {
22
    // #[derive_where(default)]
23
    pub(crate) jobs: Arc<RwLock<jobs::Jobs<Key>>>,
24
}
25

            
26
impl<Key> Manager<Key>
27
where
28
    Key: Clone + std::hash::Hash + Eq + Send + Sync + Debug + 'static,
29
{
30
    /// Pushes a `job` into the queue. Pushing the same job definition twice
31
    /// will yield two tasks in the queue.
32
    #[cfg(test)]
33
1
    pub fn enqueue<J: Job + 'static>(&self, job: J) -> Handle<J::Output, J::Error> {
34
1
        let mut jobs = self.jobs.write();
35
1
        jobs.enqueue(job, None, self.clone())
36
1
    }
37

            
38
    /// Uses [`Keyed::key`] to ensure no other job with the same `key` is
39
    /// currently running. If another job is already running that matches, a
40
    /// clone of that [`Handle`] will be returned. When the job finishes, all
41
    /// [`Handle`] clones will be notified with a copy of the result.
42
270112
    pub fn lookup_or_enqueue<J: Keyed<Key>>(
43
270112
        &self,
44
270112
        job: J,
45
270112
    ) -> Handle<<J as Job>::Output, <J as Job>::Error> {
46
270112
        let mut jobs = self.jobs.write();
47
270112
        jobs.lookup_or_enqueue(job, self.clone())
48
270112
    }
49

            
50
257324
    fn job_completed<T: Clone + Send + Sync + 'static, E: Send + Sync + 'static>(
51
257324
        &self,
52
257324
        id: Id,
53
257324
        key: Option<&Key>,
54
257324
        result: Result<T, E>,
55
257324
    ) {
56
257324
        let mut jobs = self.jobs.write();
57
257324
        jobs.job_completed(id, key, result);
58
257324
    }
59

            
60
    /// Spawns a worker. In general, you shouldn't need to call this function
61
    /// directly.
62
20782
    pub fn spawn_worker(&self) {
63
20782
        let receiver = {
64
20782
            let jobs = self.jobs.read();
65
20782
            jobs.queue()
66
20782
        };
67
20782
        std::thread::Builder::new()
68
20782
            .name(String::from("bonsaidb-tasks"))
69
20782
            .spawn(move || worker_thread(&receiver))
70
20782
            .unwrap();
71
20782
    }
72
}
73

            
74
20782
fn worker_thread(receiver: &flume::Receiver<Box<dyn Executable>>) {
75
278106
    while let Ok(mut job) = receiver.recv() {
76
257324
        job.execute();
77
257324
    }
78
20782
}