1
use std::{fmt::Debug, sync::Arc};
2

            
3
use async_lock::RwLock;
4
use bonsaidb_utils::{fast_async_read, fast_async_write};
5
use derive_where::derive_where;
6

            
7
use crate::tasks::{
8
    handle::{Handle, Id},
9
    Job, Keyed,
10
};
11

            
12
pub(crate) mod jobs;
13
mod managed_job;
14
pub(crate) use managed_job::ManagedJob;
15

            
16
#[cfg(test)]
17
mod tests;
18

            
19
/// A background jobs manager.
20
#[derive(Debug)]
21
189775
#[derive_where(Clone, Default)]
22
pub struct Manager<Key = ()> {
23
    // #[derive_where(default)]
24
    pub(crate) jobs: Arc<RwLock<jobs::Jobs<Key>>>,
25
}
26

            
27
impl<Key> Manager<Key>
28
where
29
    Key: Clone + std::hash::Hash + Eq + Send + Sync + Debug + 'static,
30
{
31
    /// Pushes a `job` into the queue. Pushing the same job definition twice
32
    /// will yield two tasks in the queue.
33
    #[cfg(test)]
34
1
    pub async fn enqueue<J: Job + 'static>(&self, job: J) -> Handle<J::Output, J::Error> {
35
1
        let mut jobs = fast_async_write!(self.jobs);
36
1
        jobs.enqueue(job, None, self.clone())
37
1
    }
38

            
39
    /// Uses [`Keyed::key`] to ensure no other job with the same `key` is
40
    /// currently running. If another job is already running that matches, a
41
    /// clone of that [`Handle`] will be returned. When the job finishes, all
42
    /// [`Handle`] clones will be notified with a copy of the result.
43
178721
    pub async fn lookup_or_enqueue<J: Keyed<Key>>(
44
178721
        &self,
45
178721
        job: J,
46
178721
    ) -> Handle<<J as Job>::Output, <J as Job>::Error> {
47
179085
        let mut jobs = fast_async_write!(self.jobs);
48
178721
        jobs.lookup_or_enqueue(job, self.clone())
49
178721
    }
50

            
51
160772
    async fn job_completed<T: Clone + Send + Sync + 'static, E: Send + Sync + 'static>(
52
160772
        &self,
53
160772
        id: Id,
54
160772
        key: Option<&Key>,
55
160772
        result: Result<T, E>,
56
160772
    ) {
57
161318
        let mut jobs = fast_async_write!(self.jobs);
58
160772
        jobs.job_completed(id, key, result);
59
160772
    }
60

            
61
    /// Spawns a worker. In general, you shouldn't need to call this function
62
    /// directly.
63
702
    pub fn spawn_worker(&self) {
64
702
        let manager = self.clone();
65
702
        tokio::spawn(async move {
66
12118
            manager.execute_jobs().await;
67
702
        });
68
702
    }
69

            
70
702
    async fn execute_jobs(&self) {
71
702
        let receiver = {
72
702
            let jobs = fast_async_read!(self.jobs);
73
702
            jobs.queue()
74
        };
75
7420
        while let Ok(mut job) = receiver.recv_async().await {
76
6748
            job.execute().await;
77
        }
78
    }
79
}