1
use std::{
2
    borrow::Cow,
3
    collections::{HashMap, HashSet},
4
    sync::Arc,
5
};
6

            
7
use bonsaidb_core::{
8
    connection::Connection,
9
    keyvalue::Timestamp,
10
    schema::{view, CollectionName, ViewName},
11
};
12
use parking_lot::RwLock;
13

            
14
use crate::{
15
    database::{keyvalue::ExpirationLoader, Database},
16
    tasks::{compactor::Compactor, handle::Handle, manager::Manager},
17
    views::{
18
        integrity_scanner::{IntegrityScan, IntegrityScanner, OptionalViewMapHandle},
19
        mapper::{Map, Mapper},
20
    },
21
    Error,
22
};
23

            
24
/// Types related to defining [`Job`]s.
25
pub mod handle;
26
/// Types related to the job [`Manager`](manager::Manager).
27
pub mod manager;
28
mod traits;
29

            
30
pub use self::traits::{Job, Keyed};
31

            
32
mod compactor;
33
mod task;
34

            
35
pub use task::Task;
36

            
37
#[derive(Debug, Clone)]
38
pub struct TaskManager {
39
    pub jobs: Manager<Task>,
40
    statuses: Arc<RwLock<Statuses>>,
41
}
42

            
43
type ViewKey = (Arc<Cow<'static, str>>, CollectionName, ViewName);
44

            
45
3467
#[derive(Default, Debug)]
46
pub struct Statuses {
47
    completed_integrity_checks: HashSet<ViewKey>,
48
    key_value_expiration_loads: HashSet<Arc<Cow<'static, str>>>,
49
    view_update_last_status: HashMap<ViewKey, u64>,
50
}
51

            
52
impl TaskManager {
53
3467
    pub fn new(jobs: Manager<Task>) -> Self {
54
3467
        Self {
55
3467
            jobs,
56
3467
            statuses: Arc::default(),
57
3467
        }
58
3467
    }
59

            
60
309400
    pub fn update_view_if_needed(
61
309400
        &self,
62
309400
        view: &dyn view::Serialized,
63
309400
        database: &Database,
64
309400
    ) -> Result<(), crate::Error> {
65
309400
        let view_name = view.view_name();
66
309400
        if let Some(job) = self.spawn_integrity_check(view, database) {
67
9503
            job.receive()??;
68
299897
        }
69

            
70
        // If there is no transaction id, there is no data, so the view is "up-to-date"
71
309400
        if let Some(current_transaction_id) = database.last_transaction_id()? {
72
309400
            let needs_reindex = {
73
                // When views finish updating, they store the last transaction_id
74
                // they mapped. If that value is current, we don't need to go
75
                // through the jobs system at all.
76
309400
                let statuses = self.statuses.read();
77
309400
                if let Some(last_transaction_indexed) = statuses.view_update_last_status.get(&(
78
309400
                    database.data.name.clone(),
79
309400
                    view.collection(),
80
309400
                    view.view_name(),
81
309400
                )) {
82
302713
                    last_transaction_indexed < &current_transaction_id
83
                } else {
84
6687
                    true
85
                }
86
            };
87

            
88
309400
            if needs_reindex {
89
157465
                let wait_for_transaction = current_transaction_id;
90
158268
                loop {
91
158268
                    let job = self.jobs.lookup_or_enqueue(Mapper {
92
158268
                        database: database.clone(),
93
158268
                        map: Map {
94
158268
                            database: database.data.name.clone(),
95
158268
                            collection: view.collection(),
96
158268
                            view_name: view_name.clone(),
97
158268
                        },
98
158268
                    });
99
158268
                    let id = job.receive()??;
100
158268
                    if wait_for_transaction <= id {
101
157465
                        break;
102
803
                    }
103
                }
104
151935
            }
105
        }
106

            
107
309400
        Ok(())
108
309400
    }
109

            
110
2165463
    pub fn key_value_expiration_loaded(&self, database: &Arc<Cow<'static, str>>) -> bool {
111
2165463
        let statuses = self.statuses.read();
112
2165463
        statuses.key_value_expiration_loads.contains(database)
113
2165463
    }
114

            
115
356021
    pub fn view_integrity_checked(
116
356021
        &self,
117
356021
        database: Arc<Cow<'static, str>>,
118
356021
        collection: CollectionName,
119
356021
        view_name: ViewName,
120
356021
    ) -> bool {
121
356021
        let statuses = self.statuses.read();
122
356021
        statuses
123
356021
            .completed_integrity_checks
124
356021
            .contains(&(database, collection, view_name))
125
356021
    }
126

            
127
356021
    pub fn spawn_integrity_check(
128
356021
        &self,
129
356021
        view: &dyn view::Serialized,
130
356021
        database: &Database,
131
356021
    ) -> Option<Handle<OptionalViewMapHandle, Error>> {
132
356021
        let view_name = view.view_name();
133
356021
        if self.view_integrity_checked(
134
356021
            database.data.name.clone(),
135
356021
            view.collection(),
136
356021
            view_name.clone(),
137
356021
        ) {
138
345310
            None
139
        } else {
140
10711
            let job = self.jobs.lookup_or_enqueue(IntegrityScanner {
141
10711
                database: database.clone(),
142
10711
                scan: IntegrityScan {
143
10711
                    database: database.data.name.clone(),
144
10711
                    view_version: view.version(),
145
10711
                    collection: view.collection(),
146
10711
                    view_name,
147
10711
                },
148
10711
            });
149
10711
            Some(job)
150
        }
151
356021
    }
152

            
153
10679
    pub fn mark_integrity_check_complete(
154
10679
        &self,
155
10679
        database: Arc<Cow<'static, str>>,
156
10679
        collection: CollectionName,
157
10679
        view_name: ViewName,
158
10679
    ) {
159
10679
        let mut statuses = self.statuses.write();
160
10679
        statuses
161
10679
            .completed_integrity_checks
162
10679
            .insert((database, collection, view_name));
163
10679
    }
164

            
165
12167
    pub fn mark_key_value_expiration_loaded(&self, database: Arc<Cow<'static, str>>) {
166
12167
        let mut statuses = self.statuses.write();
167
12167
        statuses.key_value_expiration_loads.insert(database);
168
12167
    }
169

            
170
160460
    pub fn mark_view_updated(
171
160460
        &self,
172
160460
        database: Arc<Cow<'static, str>>,
173
160460
        collection: CollectionName,
174
160460
        view_name: ViewName,
175
160460
        transaction_id: u64,
176
160460
    ) {
177
160460
        let mut statuses = self.statuses.write();
178
160460
        statuses
179
160460
            .view_update_last_status
180
160460
            .insert((database, collection, view_name), transaction_id);
181
160460
    }
182

            
183
2165463
    pub fn spawn_key_value_expiration_loader(
184
2165463
        &self,
185
2165463
        database: &Database,
186
2165463
    ) -> Option<Handle<(), Error>> {
187
2165463
        if self.key_value_expiration_loaded(&database.data.name) {
188
2152933
            None
189
        } else {
190
12530
            Some(self.jobs.lookup_or_enqueue(ExpirationLoader {
191
12530
                database: database.clone(),
192
12530
                launched_at: Timestamp::now(),
193
12530
            }))
194
        }
195
2165463
    }
196

            
197
5580
    pub fn spawn_compact_target(
198
5580
        &self,
199
5580
        database: Database,
200
5580
        target: compactor::Target,
201
5580
    ) -> Handle<(), Error> {
202
5580
        self.jobs
203
5580
            .lookup_or_enqueue(Compactor::target(database, target))
204
5580
    }
205

            
206
124
    pub fn compact_collection(
207
124
        &self,
208
124
        database: Database,
209
124
        collection_name: CollectionName,
210
124
    ) -> Result<(), Error> {
211
124
        Ok(self
212
124
            .jobs
213
124
            .lookup_or_enqueue(Compactor::collection(database, collection_name))
214
124
            .receive()??)
215
124
    }
216

            
217
124
    pub fn compact_key_value_store(&self, database: Database) -> Result<(), Error> {
218
124
        Ok(self
219
124
            .jobs
220
124
            .lookup_or_enqueue(Compactor::keyvalue(database))
221
124
            .receive()??)
222
124
    }
223

            
224
124
    pub fn compact_database(&self, database: Database) -> Result<(), Error> {
225
124
        Ok(self
226
124
            .jobs
227
124
            .lookup_or_enqueue(Compactor::database(database))
228
124
            .receive()??)
229
124
    }
230
}