1
use std::{fmt::Debug, sync::Arc};
2

            
3
use derive_where::derive_where;
4
use parking_lot::RwLock;
5

            
6
use crate::tasks::{
7
    handle::{Handle, Id},
8
    traits::Executable,
9
    Job, Keyed,
10
};
11

            
12
pub(crate) mod jobs;
13
mod managed_job;
14
pub(crate) use managed_job::ManagedJob;
15

            
16
#[cfg(test)]
17
mod tests;
18

            
19
/// A background jobs manager.
20
#[derive(Debug)]
21
201036
#[derive_where(Clone, Default)]
22
pub struct Manager<Key = ()> {
23
    // #[derive_where(default)]
24
    pub(crate) jobs: Arc<RwLock<jobs::Jobs<Key>>>,
25
}
26

            
27
impl<Key> Manager<Key>
28
where
29
    Key: Clone + std::hash::Hash + Eq + Send + Sync + Debug + 'static,
30
{
31
    /// Pushes a `job` into the queue. Pushing the same job definition twice
32
    /// will yield two tasks in the queue.
33
    #[cfg(test)]
34
1
    pub fn enqueue<J: Job + 'static>(&self, job: J) -> Handle<J::Output, J::Error> {
35
1
        let mut jobs = self.jobs.write();
36
1
        jobs.enqueue(job, None, self.clone())
37
1
    }
38

            
39
    /// Uses [`Keyed::key`] to ensure no other job with the same `key` is
40
    /// currently running. If another job is already running that matches, a
41
    /// clone of that [`Handle`] will be returned. When the job finishes, all
42
    /// [`Handle`] clones will be notified with a copy of the result.
43
197566
    pub fn lookup_or_enqueue<J: Keyed<Key>>(
44
197566
        &self,
45
197566
        job: J,
46
197566
    ) -> Handle<<J as Job>::Output, <J as Job>::Error> {
47
197566
        let mut jobs = self.jobs.write();
48
197566
        jobs.lookup_or_enqueue(job, self.clone())
49
197566
    }
50

            
51
189263
    fn job_completed<T: Clone + Send + Sync + 'static, E: Send + Sync + 'static>(
52
189263
        &self,
53
189263
        id: Id,
54
189263
        key: Option<&Key>,
55
189263
        result: Result<T, E>,
56
189263
    ) {
57
189263
        let mut jobs = self.jobs.write();
58
189263
        jobs.job_completed(id, key, result);
59
189263
    }
60

            
61
    /// Spawns a worker. In general, you shouldn't need to call this function
62
    /// directly.
63
13870
    pub fn spawn_worker(&self) {
64
13870
        let receiver = {
65
13870
            let jobs = self.jobs.read();
66
13870
            jobs.queue()
67
13870
        };
68
13870
        std::thread::Builder::new()
69
13870
            .name(String::from("bonsaidb-tasks"))
70
13870
            .spawn(move || worker_thread(&receiver))
71
13870
            .unwrap();
72
13870
    }
73
}
74

            
75
13870
fn worker_thread(receiver: &flume::Receiver<Box<dyn Executable>>) {
76
203133
    while let Ok(mut job) = receiver.recv() {
77
189263
        job.execute();
78
189263
    }
79
13870
}