1
use std::{
2
    borrow::Cow,
3
    collections::{HashMap, HashSet},
4
    sync::Arc,
5
};
6

            
7
use bonsaidb_core::{
8
    connection::Connection,
9
    keyvalue::Timestamp,
10
    schema::{view, CollectionName, ViewName},
11
};
12
use parking_lot::RwLock;
13

            
14
use crate::{
15
    database::{keyvalue::ExpirationLoader, Database},
16
    tasks::{compactor::Compactor, handle::Handle, manager::Manager},
17
    views::{
18
        integrity_scanner::{IntegrityScan, IntegrityScanner, OptionalViewMapHandle},
19
        mapper::{Map, Mapper},
20
    },
21
    Error,
22
};
23

            
24
/// Types related to defining [`Job`]s.
25
pub mod handle;
26
/// Types related to the job [`Manager`](manager::Manager).
27
pub mod manager;
28
mod traits;
29

            
30
pub use self::traits::{Job, Keyed};
31

            
32
mod compactor;
33
mod task;
34

            
35
pub use task::Task;
36

            
37
#[derive(Debug, Clone)]
38
pub struct TaskManager {
39
    pub jobs: Manager<Task>,
40
    statuses: Arc<RwLock<Statuses>>,
41
}
42

            
43
type ViewKey = (Arc<Cow<'static, str>>, CollectionName, ViewName);
44

            
45
3467
#[derive(Default, Debug)]
46
pub struct Statuses {
47
    completed_integrity_checks: HashSet<ViewKey>,
48
    key_value_expiration_loads: HashSet<Arc<Cow<'static, str>>>,
49
    view_update_last_status: HashMap<ViewKey, u64>,
50
}
51

            
52
impl TaskManager {
53
3467
    pub fn new(jobs: Manager<Task>) -> Self {
54
3467
        Self {
55
3467
            jobs,
56
3467
            statuses: Arc::default(),
57
3467
        }
58
3467
    }
59

            
60
306640
    pub fn update_view_if_needed(
61
306640
        &self,
62
306640
        view: &dyn view::Serialized,
63
306640
        database: &Database,
64
306640
    ) -> Result<(), crate::Error> {
65
306640
        let view_name = view.view_name();
66
306640
        if let Some(job) = self.spawn_integrity_check(view, database) {
67
9504
            job.receive()??;
68
297136
        }
69

            
70
        // If there is no transaction id, there is no data, so the view is "up-to-date"
71
306640
        if let Some(current_transaction_id) = database.last_transaction_id()? {
72
306640
            let needs_reindex = {
73
                // When views finish updating, they store the last transaction_id
74
                // they mapped. If that value is current, we don't need to go
75
                // through the jobs system at all.
76
306640
                let statuses = self.statuses.read();
77
306640
                if let Some(last_transaction_indexed) = statuses.view_update_last_status.get(&(
78
306640
                    database.data.name.clone(),
79
306640
                    view.collection(),
80
306640
                    view.view_name(),
81
306640
                )) {
82
300198
                    last_transaction_indexed < &current_transaction_id
83
                } else {
84
6442
                    true
85
                }
86
            };
87

            
88
306640
            if needs_reindex {
89
156559
                let wait_for_transaction = current_transaction_id;
90
157603
                loop {
91
157603
                    let job = self.jobs.lookup_or_enqueue(Mapper {
92
157603
                        database: database.clone(),
93
157603
                        map: Map {
94
157603
                            database: database.data.name.clone(),
95
157603
                            collection: view.collection(),
96
157603
                            view_name: view_name.clone(),
97
157603
                        },
98
157603
                    });
99
157603
                    let id = job.receive()??;
100
157573
                    if wait_for_transaction <= id {
101
156529
                        break;
102
1044
                    }
103
                }
104
150081
            }
105
        }
106

            
107
306610
        Ok(())
108
306610
    }
109

            
110
2150223
    pub fn key_value_expiration_loaded(&self, database: &Arc<Cow<'static, str>>) -> bool {
111
2150223
        let statuses = self.statuses.read();
112
2150223
        statuses.key_value_expiration_loads.contains(database)
113
2150223
    }
114

            
115
353261
    pub fn view_integrity_checked(
116
353261
        &self,
117
353261
        database: Arc<Cow<'static, str>>,
118
353261
        collection: CollectionName,
119
353261
        view_name: ViewName,
120
353261
    ) -> bool {
121
353261
        let statuses = self.statuses.read();
122
353261
        statuses
123
353261
            .completed_integrity_checks
124
353261
            .contains(&(database, collection, view_name))
125
353261
    }
126

            
127
353261
    pub fn spawn_integrity_check(
128
353261
        &self,
129
353261
        view: &dyn view::Serialized,
130
353261
        database: &Database,
131
353261
    ) -> Option<Handle<OptionalViewMapHandle, Error>> {
132
353261
        let view_name = view.view_name();
133
353261
        if self.view_integrity_checked(
134
353261
            database.data.name.clone(),
135
353261
            view.collection(),
136
353261
            view_name.clone(),
137
353261
        ) {
138
342545
            None
139
        } else {
140
10716
            let job = self.jobs.lookup_or_enqueue(IntegrityScanner {
141
10716
                database: database.clone(),
142
10716
                scan: IntegrityScan {
143
10716
                    database: database.data.name.clone(),
144
10716
                    view_version: view.version(),
145
10716
                    collection: view.collection(),
146
10716
                    view_name,
147
10716
                },
148
10716
            });
149
10716
            Some(job)
150
        }
151
353261
    }
152

            
153
10679
    pub fn mark_integrity_check_complete(
154
10679
        &self,
155
10679
        database: Arc<Cow<'static, str>>,
156
10679
        collection: CollectionName,
157
10679
        view_name: ViewName,
158
10679
    ) {
159
10679
        let mut statuses = self.statuses.write();
160
10679
        statuses
161
10679
            .completed_integrity_checks
162
10679
            .insert((database, collection, view_name));
163
10679
    }
164

            
165
12167
    pub fn mark_key_value_expiration_loaded(&self, database: Arc<Cow<'static, str>>) {
166
12167
        let mut statuses = self.statuses.write();
167
12167
        statuses.key_value_expiration_loads.insert(database);
168
12167
    }
169

            
170
159500
    pub fn mark_view_updated(
171
159500
        &self,
172
159500
        database: Arc<Cow<'static, str>>,
173
159500
        collection: CollectionName,
174
159500
        view_name: ViewName,
175
159500
        transaction_id: u64,
176
159500
    ) {
177
159500
        let mut statuses = self.statuses.write();
178
159500
        statuses
179
159500
            .view_update_last_status
180
159500
            .insert((database, collection, view_name), transaction_id);
181
159500
    }
182

            
183
2150223
    pub fn spawn_key_value_expiration_loader(
184
2150223
        &self,
185
2150223
        database: &Database,
186
2150223
    ) -> Option<Handle<(), Error>> {
187
2150223
        if self.key_value_expiration_loaded(&database.data.name) {
188
2137630
            None
189
        } else {
190
12593
            Some(self.jobs.lookup_or_enqueue(ExpirationLoader {
191
12593
                database: database.clone(),
192
12593
                launched_at: Timestamp::now(),
193
12593
            }))
194
        }
195
2150223
    }
196

            
197
5580
    pub fn spawn_compact_target(
198
5580
        &self,
199
5580
        database: Database,
200
5580
        target: compactor::Target,
201
5580
    ) -> Handle<(), Error> {
202
5580
        self.jobs
203
5580
            .lookup_or_enqueue(Compactor::target(database, target))
204
5580
    }
205

            
206
124
    pub fn compact_collection(
207
124
        &self,
208
124
        database: Database,
209
124
        collection_name: CollectionName,
210
124
    ) -> Result<(), Error> {
211
124
        Ok(self
212
124
            .jobs
213
124
            .lookup_or_enqueue(Compactor::collection(database, collection_name))
214
124
            .receive()??)
215
124
    }
216

            
217
124
    pub fn compact_key_value_store(&self, database: Database) -> Result<(), Error> {
218
124
        Ok(self
219
124
            .jobs
220
124
            .lookup_or_enqueue(Compactor::keyvalue(database))
221
124
            .receive()??)
222
124
    }
223

            
224
124
    pub fn compact_database(&self, database: Database) -> Result<(), Error> {
225
124
        Ok(self
226
124
            .jobs
227
124
            .lookup_or_enqueue(Compactor::database(database))
228
124
            .receive()??)
229
124
    }
230
}