1
use std::{any::Any, collections::HashMap, fmt::Debug, sync::Arc};
2

            
3
use flume::{Receiver, Sender};
4

            
5
use crate::tasks::{
6
    handle::{Handle, Id},
7
    manager::{ManagedJob, Manager},
8
    traits::Executable,
9
    Job, Keyed,
10
};
11

            
12
pub struct Jobs<Key> {
13
    last_task_id: u64,
14
    result_senders: HashMap<Id, Vec<Box<dyn AnySender>>>,
15
    keyed_jobs: HashMap<Key, Id>,
16
    queuer: Sender<Box<dyn Executable>>,
17
    queue: Receiver<Box<dyn Executable>>,
18
}
19

            
20
impl<Key> Debug for Jobs<Key>
21
where
22
    Key: Debug,
23
{
24
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25
        f.debug_struct("Jobs")
26
            .field("last_task_id", &self.last_task_id)
27
            .field("result_senders", &self.result_senders.len())
28
            .field("keyed_jobs", &self.keyed_jobs)
29
            .field("queuer", &self.queuer)
30
            .field("queue", &self.queue)
31
            .finish()
32
    }
33
}
34

            
35
impl<Key> Default for Jobs<Key> {
36
3469
    fn default() -> Self {
37
3469
        let (queuer, queue) = flume::unbounded();
38
3469

            
39
3469
        Self {
40
3469
            last_task_id: 0,
41
3469
            result_senders: HashMap::new(),
42
3469
            keyed_jobs: HashMap::new(),
43
3469
            queuer,
44
3469
            queue,
45
3469
        }
46
3469
    }
47
}
48

            
49
impl<Key> Jobs<Key>
50
where
51
    Key: Clone + std::hash::Hash + Eq + Send + Sync + Debug + 'static,
52
{
53
13870
    pub fn queue(&self) -> Receiver<Box<dyn Executable>> {
54
13870
        self.queue.clone()
55
13870
    }
56

            
57
194447
    pub fn enqueue<J: Job + 'static>(
58
194447
        &mut self,
59
194447
        job: J,
60
194447
        key: Option<Key>,
61
194447
        manager: Manager<Key>,
62
194447
    ) -> Handle<J::Output, J::Error> {
63
194447
        self.last_task_id = self.last_task_id.wrapping_add(1);
64
194447
        let id = Id(self.last_task_id);
65
194447
        self.queuer
66
194447
            .send(Box::new(ManagedJob {
67
194447
                id,
68
194447
                job,
69
194447
                manager,
70
194447
                key,
71
194447
            }))
72
194447
            .unwrap();
73
194447

            
74
194447
        self.create_new_task_handle(id)
75
194447
    }
76

            
77
202702
    pub fn create_new_task_handle<T: Send + Sync + 'static, E: Send + Sync + 'static>(
78
202702
        &mut self,
79
202702
        id: Id,
80
202702
    ) -> Handle<T, E> {
81
202702
        let (sender, receiver) = flume::bounded(1);
82
202702
        let senders = self.result_senders.entry(id).or_insert_with(Vec::default);
83
202702
        senders.push(Box::new(sender));
84
202702

            
85
202702
        Handle { id, receiver }
86
202702
    }
87

            
88
202701
    pub fn lookup_or_enqueue<J: Keyed<Key>>(
89
202701
        &mut self,
90
202701
        job: J,
91
202701
        manager: Manager<Key>,
92
202701
    ) -> Handle<<J as Job>::Output, <J as Job>::Error> {
93
202701
        let key = job.key();
94
202701
        if let Some(&id) = self.keyed_jobs.get(&key) {
95
8315
            self.create_new_task_handle(id)
96
        } else {
97
194386
            let handle = self.enqueue(job, Some(key.clone()), manager);
98
194386
            self.keyed_jobs.insert(key, handle.id);
99
194386
            handle
100
        }
101
202701
    }
102

            
103
    pub fn job_completed<T: Clone + Send + Sync + 'static, E: Send + Sync + 'static>(
104
        &mut self,
105
        id: Id,
106
        key: Option<&Key>,
107
        result: Result<T, E>,
108
    ) {
109
194447
        if let Some(key) = key {
110
194446
            self.keyed_jobs.remove(key);
111
194446
        }
112

            
113
194447
        if let Some(senders) = self.result_senders.remove(&id) {
114
194447
            let result = result.map_err(Arc::new);
115
397149
            for sender_handle in senders {
116
202702
                let sender = sender_handle
117
202702
                    .as_any()
118
202702
                    .downcast_ref::<flume::Sender<Result<T, Arc<E>>>>()
119
202702
                    .unwrap();
120
202702
                drop(sender.send(result.clone()));
121
202702
            }
122
        }
123
194447
    }
124
}
125

            
126
pub trait AnySender: Any + Send + Sync {
127
    fn as_any(&self) -> &'_ dyn Any;
128
}
129

            
130
impl<T> AnySender for flume::Sender<T>
131
where
132
    T: Send + Sync + 'static,
133
{
134
202702
    fn as_any(&self) -> &'_ dyn Any {
135
202702
        self
136
202702
    }
137
}