1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
use std::any::Any;
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;

use flume::{Receiver, Sender};

use crate::tasks::handle::{Handle, Id};
use crate::tasks::manager::{ManagedJob, Manager};
use crate::tasks::traits::Executable;
use crate::tasks::{Job, Keyed};

pub struct Jobs<Key> {
    last_task_id: u64,
    result_senders: HashMap<Id, Vec<Box<dyn AnySender>>>,
    keyed_jobs: HashMap<Key, Id>,
    queuer: Sender<Box<dyn Executable>>,
    queue: Receiver<Box<dyn Executable>>,
}

impl<Key> Debug for Jobs<Key>
where
    Key: Debug,
{
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("Jobs")
            .field("last_task_id", &self.last_task_id)
            .field("result_senders", &self.result_senders.len())
            .field("keyed_jobs", &self.keyed_jobs)
            .field("queuer", &self.queuer)
            .field("queue", &self.queue)
            .finish()
    }
}

impl<Key> Default for Jobs<Key> {
    fn default() -> Self {
        let (queuer, queue) = flume::unbounded();

        Self {
            last_task_id: 0,
            result_senders: HashMap::new(),
            keyed_jobs: HashMap::new(),
            queuer,
            queue,
        }
    }
}

impl<Key> Jobs<Key>
where
    Key: Clone + std::hash::Hash + Eq + Send + Sync + Debug + 'static,
{
    pub fn queue(&self) -> Receiver<Box<dyn Executable>> {
        self.queue.clone()
    }

    pub fn enqueue<J: Job + 'static>(
        &mut self,
        job: J,
        key: Option<Key>,
        manager: Manager<Key>,
    ) -> Handle<J::Output, J::Error> {
        self.last_task_id = self.last_task_id.wrapping_add(1);
        let id = Id(self.last_task_id);
        self.queuer
            .send(Box::new(ManagedJob {
                id,
                job,
                manager,
                key,
            }))
            .unwrap();

        self.create_new_task_handle(id)
    }

    pub fn create_new_task_handle<T: Send + Sync + 'static, E: Send + Sync + 'static>(
        &mut self,
        id: Id,
    ) -> Handle<T, E> {
        let (sender, receiver) = flume::bounded(1);
        let senders = self.result_senders.entry(id).or_insert_with(Vec::default);
        senders.push(Box::new(sender));

        Handle { id, receiver }
    }

    pub fn lookup_or_enqueue<J: Keyed<Key>>(
        &mut self,
        job: J,
        manager: Manager<Key>,
    ) -> Handle<<J as Job>::Output, <J as Job>::Error> {
        let key = job.key();
        if let Some(&id) = self.keyed_jobs.get(&key) {
            self.create_new_task_handle(id)
        } else {
            let handle = self.enqueue(job, Some(key.clone()), manager);
            self.keyed_jobs.insert(key, handle.id);
            handle
        }
    }

    pub fn job_completed<T: Clone + Send + Sync + 'static, E: Send + Sync + 'static>(
        &mut self,
        id: Id,
        key: Option<&Key>,
        result: Result<T, E>,
    ) {
        if let Some(key) = key {
            self.keyed_jobs.remove(key);
        }

        if let Some(senders) = self.result_senders.remove(&id) {
            let result = result.map_err(Arc::new);
            for sender_handle in senders {
                let sender = sender_handle
                    .as_any()
                    .downcast_ref::<flume::Sender<Result<T, Arc<E>>>>()
                    .unwrap();
                drop(sender.send(result.clone()));
            }
        }
    }
}

pub trait AnySender: Any + Send + Sync {
    fn as_any(&self) -> &'_ dyn Any;
}

impl<T> AnySender for flume::Sender<T>
where
    T: Send + Sync + 'static,
{
    fn as_any(&self) -> &'_ dyn Any {
        self
    }
}