1
use std::{any::Any, collections::HashMap, fmt::Debug, sync::Arc};
2

            
3
use flume::{Receiver, Sender};
4
use tokio::sync::oneshot;
5

            
6
use crate::tasks::{
7
    handle::{Handle, Id},
8
    manager::{ManagedJob, Manager},
9
    traits::Executable,
10
    Job, Keyed,
11
};
12

            
13
pub struct Jobs<Key> {
14
    last_task_id: u64,
15
    result_senders: HashMap<Id, Vec<Box<dyn AnySender>>>,
16
    keyed_jobs: HashMap<Key, Id>,
17
    queuer: Sender<Box<dyn Executable>>,
18
    queue: Receiver<Box<dyn Executable>>,
19
}
20

            
21
impl<Key> Debug for Jobs<Key>
22
where
23
    Key: Debug,
24
{
25
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26
        f.debug_struct("Jobs")
27
            .field("last_task_id", &self.last_task_id)
28
            .field("result_senders", &self.result_senders.len())
29
            .field("keyed_jobs", &self.keyed_jobs)
30
            .field("queuer", &self.queuer)
31
            .field("queue", &self.queue)
32
            .finish()
33
    }
34
}
35

            
36
impl<Key> Default for Jobs<Key> {
37
177
    fn default() -> Self {
38
177
        let (queuer, queue) = flume::unbounded();
39
177

            
40
177
        Self {
41
177
            last_task_id: 0,
42
177
            result_senders: HashMap::new(),
43
177
            keyed_jobs: HashMap::new(),
44
177
            queuer,
45
177
            queue,
46
177
        }
47
177
    }
48
}
49

            
50
impl<Key> Jobs<Key>
51
where
52
    Key: Clone + std::hash::Hash + Eq + Send + Sync + Debug + 'static,
53
{
54
702
    pub fn queue(&self) -> Receiver<Box<dyn Executable>> {
55
702
        self.queue.clone()
56
702
    }
57

            
58
160998
    pub fn enqueue<J: Job + 'static>(
59
160998
        &mut self,
60
160998
        job: J,
61
160998
        key: Option<Key>,
62
160998
        manager: Manager<Key>,
63
160998
    ) -> Handle<J::Output, J::Error> {
64
160998
        self.last_task_id = self.last_task_id.wrapping_add(1);
65
160998
        let id = Id(self.last_task_id);
66
160998
        self.queuer
67
160998
            .send(Box::new(ManagedJob {
68
160998
                id,
69
160998
                job,
70
160998
                manager,
71
160998
                key,
72
160998
            }))
73
160998
            .unwrap();
74
160998

            
75
160998
        self.create_new_task_handle(id)
76
160998
    }
77

            
78
178722
    pub fn create_new_task_handle<T: Send + Sync + 'static, E: Send + Sync + 'static>(
79
178722
        &mut self,
80
178722
        id: Id,
81
178722
    ) -> Handle<T, E> {
82
178722
        let (sender, receiver) = oneshot::channel();
83
178722
        let senders = self.result_senders.entry(id).or_insert_with(Vec::default);
84
178722
        senders.push(Box::new(Some(sender)));
85
178722

            
86
178722
        Handle { id, receiver }
87
178722
    }
88

            
89
178721
    pub fn lookup_or_enqueue<J: Keyed<Key>>(
90
178721
        &mut self,
91
178721
        job: J,
92
178721
        manager: Manager<Key>,
93
178721
    ) -> Handle<<J as Job>::Output, <J as Job>::Error> {
94
178721
        let key = job.key();
95
178721
        if let Some(&id) = self.keyed_jobs.get(&key) {
96
17724
            self.create_new_task_handle(id)
97
        } else {
98
160997
            let handle = self.enqueue(job, Some(key.clone()), manager);
99
160997
            self.keyed_jobs.insert(key, handle.id);
100
160997
            handle
101
        }
102
178721
    }
103

            
104
    pub fn job_completed<T: Clone + Send + Sync + 'static, E: Send + Sync + 'static>(
105
        &mut self,
106
        id: Id,
107
        key: Option<&Key>,
108
        result: Result<T, E>,
109
    ) {
110
160772
        if let Some(key) = key {
111
160771
            self.keyed_jobs.remove(key);
112
160771
        }
113

            
114
160772
        if let Some(senders) = self.result_senders.remove(&id) {
115
160772
            let result = result.map_err(Arc::new);
116
339268
            for mut sender_handle in senders {
117
178496
                let sender = sender_handle
118
178496
                    .as_any_mut()
119
178496
                    .downcast_mut::<Option<oneshot::Sender<Result<T, Arc<E>>>>>()
120
178496
                    .unwrap();
121
178496
                if let Some(sender) = sender.take() {
122
178496
                    drop(sender.send(result.clone()));
123
178496
                }
124
            }
125
        }
126
160772
    }
127
}
128

            
129
pub trait AnySender: Any + Send + Sync {
130
    fn as_any_mut(&mut self) -> &'_ mut dyn Any;
131
}
132

            
133
impl<T> AnySender for Option<oneshot::Sender<T>>
134
where
135
    T: Send + Sync + 'static,
136
{
137
178496
    fn as_any_mut(&mut self) -> &'_ mut dyn Any {
138
178496
        self
139
178496
    }
140
}