1
use std::{fmt::Debug, sync::Arc};
2

            
3
use async_lock::RwLock;
4
use bonsaidb_utils::{fast_async_read, fast_async_write};
5
use derive_where::derive_where;
6

            
7
use crate::jobs::{
8
    task::{Handle, Id},
9
    Job, Keyed,
10
};
11

            
12
pub(crate) mod jobs;
13
mod managed_job;
14
pub(crate) use managed_job::ManagedJob;
15

            
16
#[cfg(test)]
17
mod tests;
18

            
19
/// A background jobs manager.
20
#[derive(Debug)]
21
421538
#[derive_where(Clone, Default)]
22
pub struct Manager<Key = ()> {
23
    // #[derive_where(default)]
24
    pub(crate) jobs: Arc<RwLock<jobs::Jobs<Key>>>,
25
}
26

            
27
impl<Key> Manager<Key>
28
where
29
    Key: Clone + std::hash::Hash + Eq + Send + Sync + Debug + 'static,
30
{
31
    /// Pushes a `job` into the queue. Pushing the same job definition twice
32
    /// will yield two tasks in the queue.
33
51932
    pub async fn enqueue<J: Job + 'static>(&self, job: J) -> Handle<J::Output, J::Error, Key> {
34
52550
        let mut jobs = fast_async_write!(self.jobs);
35
51932
        jobs.enqueue(job, None, self.clone())
36
51932
    }
37

            
38
    /// Uses [`Keyed::key`] to ensure no other job with the same `key` is
39
    /// currently running. If another job is already running that matches, a
40
    /// clone of that [`Handle`] will be returned. When the job finishes, all
41
    /// [`Handle`] clones will be notified with a copy of the result.
42
158952
    pub async fn lookup_or_enqueue<J: Keyed<Key>>(
43
158952
        &self,
44
158952
        job: J,
45
158952
    ) -> Handle<<J as Job>::Output, <J as Job>::Error, Key> {
46
159352
        let mut jobs = fast_async_write!(self.jobs);
47
158952
        jobs.lookup_or_enqueue(job, self.clone())
48
158952
    }
49

            
50
198400
    async fn job_completed<T: Clone + Send + Sync + 'static, E: Send + Sync + 'static>(
51
198400
        &self,
52
198400
        id: Id,
53
198400
        key: Option<&Key>,
54
198400
        result: Result<T, E>,
55
198400
    ) {
56
200164
        let mut jobs = fast_async_write!(self.jobs);
57
198400
        jobs.job_completed(id, key, result);
58
198400
    }
59

            
60
    /// Spawns a worker. In general, you shouldn't need to call this function
61
    /// directly.
62
1966
    pub fn spawn_worker(&self) {
63
1966
        let manager = self.clone();
64
1966
        tokio::spawn(async move {
65
183128
            manager.execute_jobs().await;
66
1966
        });
67
1966
    }
68

            
69
1966
    async fn execute_jobs(&self) {
70
1966
        let receiver = {
71
1966
            let jobs = fast_async_read!(self.jobs);
72
1966
            jobs.queue()
73
        };
74
92454
        while let Ok(mut job) = receiver.recv_async().await {
75
110662
            job.execute().await;
76
        }
77
    }
78
}