1
use std::any::Any;
2
use std::collections::HashMap;
3
use std::fmt::Debug;
4
use std::sync::Arc;
5

            
6
use flume::{Receiver, Sender};
7

            
8
use crate::tasks::handle::{Handle, Id};
9
use crate::tasks::manager::{ManagedJob, Manager};
10
use crate::tasks::traits::Executable;
11
use crate::tasks::{Job, Keyed};
12

            
13
pub struct Jobs<Key> {
14
    last_task_id: u64,
15
    result_senders: HashMap<Id, Vec<Box<dyn AnySender>>>,
16
    keyed_jobs: HashMap<Key, Id>,
17
    queuer: Sender<Box<dyn Executable>>,
18
    queue: Receiver<Box<dyn Executable>>,
19
}
20

            
21
impl<Key> Debug for Jobs<Key>
22
where
23
    Key: Debug,
24
{
25
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26
        f.debug_struct("Jobs")
27
            .field("last_task_id", &self.last_task_id)
28
            .field("result_senders", &self.result_senders.len())
29
            .field("keyed_jobs", &self.keyed_jobs)
30
            .field("queuer", &self.queuer)
31
            .field("queue", &self.queue)
32
            .finish()
33
    }
34
}
35

            
36
impl<Key> Default for Jobs<Key> {
37
5337
    fn default() -> Self {
38
5337
        let (queuer, queue) = flume::unbounded();
39
5337

            
40
5337
        Self {
41
5337
            last_task_id: 0,
42
5337
            result_senders: HashMap::new(),
43
5337
            keyed_jobs: HashMap::new(),
44
5337
            queuer,
45
5337
            queue,
46
5337
        }
47
5337
    }
48
}
49

            
50
impl<Key> Jobs<Key>
51
where
52
    Key: Clone + std::hash::Hash + Eq + Send + Sync + Debug + 'static,
53
{
54
21342
    pub fn queue(&self) -> Receiver<Box<dyn Executable>> {
55
21342
        self.queue.clone()
56
21342
    }
57

            
58
266989
    pub fn enqueue<J: Job + 'static>(
59
266989
        &mut self,
60
266989
        job: J,
61
266989
        key: Option<Key>,
62
266989
        manager: Manager<Key>,
63
266989
    ) -> Handle<J::Output, J::Error> {
64
266989
        self.last_task_id = self.last_task_id.wrapping_add(1);
65
266989
        let id = Id(self.last_task_id);
66
266989
        self.queuer
67
266989
            .send(Box::new(ManagedJob {
68
266989
                id,
69
266989
                job,
70
266989
                manager,
71
266989
                key,
72
266989
            }))
73
266989
            .unwrap();
74
266989

            
75
266989
        self.create_new_task_handle(id)
76
266989
    }
77

            
78
278999
    pub fn create_new_task_handle<T: Send + Sync + 'static, E: Send + Sync + 'static>(
79
278999
        &mut self,
80
278999
        id: Id,
81
278999
    ) -> Handle<T, E> {
82
278999
        let (sender, receiver) = flume::bounded(1);
83
278999
        let senders = self.result_senders.entry(id).or_insert_with(Vec::default);
84
278999
        senders.push(Box::new(sender));
85
278999

            
86
278999
        Handle { id, receiver }
87
278999
    }
88

            
89
278998
    pub fn lookup_or_enqueue<J: Keyed<Key>>(
90
278998
        &mut self,
91
278998
        job: J,
92
278998
        manager: Manager<Key>,
93
278998
    ) -> Handle<<J as Job>::Output, <J as Job>::Error> {
94
278998
        let key = job.key();
95
278998
        if let Some(&id) = self.keyed_jobs.get(&key) {
96
12010
            self.create_new_task_handle(id)
97
        } else {
98
266988
            let handle = self.enqueue(job, Some(key.clone()), manager);
99
266988
            self.keyed_jobs.insert(key, handle.id);
100
266988
            handle
101
        }
102
278998
    }
103

            
104
    pub fn job_completed<T: Clone + Send + Sync + 'static, E: Send + Sync + 'static>(
105
        &mut self,
106
        id: Id,
107
        key: Option<&Key>,
108
        result: Result<T, E>,
109
    ) {
110
266989
        if let Some(key) = key {
111
266988
            self.keyed_jobs.remove(key);
112
266988
        }
113

            
114
266989
        if let Some(senders) = self.result_senders.remove(&id) {
115
266989
            let result = result.map_err(Arc::new);
116
545988
            for sender_handle in senders {
117
278999
                let sender = sender_handle
118
278999
                    .as_any()
119
278999
                    .downcast_ref::<flume::Sender<Result<T, Arc<E>>>>()
120
278999
                    .unwrap();
121
278999
                drop(sender.send(result.clone()));
122
278999
            }
123
        }
124
266989
    }
125
}
126

            
127
pub trait AnySender: Any + Send + Sync {
128
    fn as_any(&self) -> &'_ dyn Any;
129
}
130

            
131
impl<T> AnySender for flume::Sender<T>
132
where
133
    T: Send + Sync + 'static,
134
{
135
278999
    fn as_any(&self) -> &'_ dyn Any {
136
278999
        self
137
278999
    }
138
}