1
use std::{any::Any, collections::HashMap, fmt::Debug, sync::Arc};
2

            
3
use flume::{Receiver, Sender};
4
use tokio::sync::oneshot;
5

            
6
use crate::jobs::{
7
    manager::{ManagedJob, Manager},
8
    task::{Handle, Id},
9
    traits::Executable,
10
    Job, Keyed,
11
};
12

            
13
pub struct Jobs<Key> {
14
    last_task_id: u64,
15
    result_senders: HashMap<Id, Vec<Box<dyn AnySender>>>,
16
    keyed_jobs: HashMap<Key, Id>,
17
    queuer: Sender<Box<dyn Executable>>,
18
    queue: Receiver<Box<dyn Executable>>,
19
}
20

            
21
impl<Key> Debug for Jobs<Key>
22
where
23
    Key: Debug,
24
{
25
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26
        f.debug_struct("Jobs")
27
            .field("last_task_id", &self.last_task_id)
28
            .field("result_senders", &self.result_senders.len())
29
            .field("keyed_jobs", &self.keyed_jobs)
30
            .field("queuer", &self.queuer)
31
            .field("queue", &self.queue)
32
            .finish()
33
    }
34
}
35

            
36
impl<Key> Default for Jobs<Key> {
37
253
    fn default() -> Self {
38
253
        let (queuer, queue) = flume::unbounded();
39
253

            
40
253
        Self {
41
253
            last_task_id: 0,
42
253
            result_senders: HashMap::new(),
43
253
            keyed_jobs: HashMap::new(),
44
253
            queuer,
45
253
            queue,
46
253
        }
47
253
    }
48
}
49

            
50
impl<Key> Jobs<Key>
51
where
52
    Key: Clone + std::hash::Hash + Eq + Send + Sync + Debug + 'static,
53
{
54
1966
    pub fn queue(&self) -> Receiver<Box<dyn Executable>> {
55
1966
        self.queue.clone()
56
1966
    }
57

            
58
198667
    pub fn enqueue<J: Job + 'static>(
59
198667
        &mut self,
60
198667
        job: J,
61
198667
        key: Option<Key>,
62
198667
        manager: Manager<Key>,
63
198667
    ) -> Handle<J::Output, J::Error, Key> {
64
198667
        self.last_task_id = self.last_task_id.wrapping_add(1);
65
198667
        let id = Id(self.last_task_id);
66
198667
        self.queuer
67
198667
            .send(Box::new(ManagedJob {
68
198667
                id,
69
198667
                job,
70
198667
                key,
71
198667
                manager: manager.clone(),
72
198667
            }))
73
198667
            .unwrap();
74
198667

            
75
198667
        self.create_new_task_handle(id, manager)
76
198667
    }
77

            
78
210885
    pub fn create_new_task_handle<T: Send + Sync + 'static, E: Send + Sync + 'static>(
79
210885
        &mut self,
80
210885
        id: Id,
81
210885
        manager: Manager<Key>,
82
210885
    ) -> Handle<T, E, Key> {
83
210885
        let (sender, receiver) = oneshot::channel();
84
210885
        let senders = self.result_senders.entry(id).or_insert_with(Vec::default);
85
210885
        senders.push(Box::new(Some(sender)));
86
210885

            
87
210885
        Handle {
88
210885
            id,
89
210885
            manager,
90
210885
            receiver,
91
210885
        }
92
210885
    }
93

            
94
158952
    pub fn lookup_or_enqueue<J: Keyed<Key>>(
95
158952
        &mut self,
96
158952
        job: J,
97
158952
        manager: Manager<Key>,
98
158952
    ) -> Handle<<J as Job>::Output, <J as Job>::Error, Key> {
99
158952
        let key = job.key();
100
158952
        if let Some(&id) = self.keyed_jobs.get(&key) {
101
12242
            self.create_new_task_handle(id, manager)
102
        } else {
103
146710
            let handle = self.enqueue(job, Some(key.clone()), manager);
104
146710
            self.keyed_jobs.insert(key, handle.id);
105
146710
            handle
106
        }
107
158952
    }
108

            
109
    pub fn job_completed<T: Clone + Send + Sync + 'static, E: Send + Sync + 'static>(
110
        &mut self,
111
        id: Id,
112
        key: Option<&Key>,
113
        result: Result<T, E>,
114
    ) {
115
198400
        if let Some(key) = key {
116
146468
            self.keyed_jobs.remove(key);
117
180717
        }
118

            
119
198400
        if let Some(senders) = self.result_senders.remove(&id) {
120
198400
            let result = result.map_err(Arc::new);
121
409018
            for mut sender_handle in senders {
122
210618
                let sender = sender_handle
123
210618
                    .as_any_mut()
124
210618
                    .downcast_mut::<Option<oneshot::Sender<Result<T, Arc<E>>>>>()
125
210618
                    .unwrap();
126
210618
                if let Some(sender) = sender.take() {
127
210618
                    drop(sender.send(result.clone()));
128
210618
                }
129
            }
130
        }
131
198400
    }
132
}
133

            
134
pub trait AnySender: Any + Send + Sync {
135
    fn as_any_mut(&mut self) -> &'_ mut dyn Any;
136
}
137

            
138
impl<T> AnySender for Option<oneshot::Sender<T>>
139
where
140
    T: Send + Sync + 'static,
141
{
142
210618
    fn as_any_mut(&mut self) -> &'_ mut dyn Any {
143
210618
        self
144
210618
    }
145
}