1
use std::borrow::Cow;
2
use std::collections::{HashMap, HashSet};
3
use std::sync::Arc;
4

            
5
use bonsaidb_core::connection::Connection;
6
use bonsaidb_core::keyvalue::Timestamp;
7
use bonsaidb_core::schema::{view, CollectionName, ViewName};
8
use parking_lot::RwLock;
9

            
10
use crate::database::keyvalue::ExpirationLoader;
11
use crate::database::Database;
12
use crate::tasks::compactor::Compactor;
13
use crate::tasks::handle::Handle;
14
use crate::tasks::manager::Manager;
15
use crate::views::integrity_scanner::{IntegrityScan, IntegrityScanner, OptionalViewMapHandle};
16
use crate::views::mapper::{Map, Mapper};
17
use crate::Error;
18

            
19
/// Types related to defining [`Job`]s.
20
pub mod handle;
21
/// Types related to the job [`Manager`](manager::Manager).
22
pub mod manager;
23
mod traits;
24

            
25
pub use self::traits::{Job, Keyed};
26

            
27
mod compactor;
28
mod task;
29

            
30
pub use task::Task;
31

            
32
#[derive(Debug, Clone)]
33
pub struct TaskManager {
34
    pub jobs: Manager<Task>,
35
    statuses: Arc<RwLock<Statuses>>,
36
}
37

            
38
type ViewKey = (Arc<Cow<'static, str>>, CollectionName, ViewName);
39

            
40
5335
#[derive(Default, Debug)]
41
pub struct Statuses {
42
    completed_integrity_checks: HashSet<ViewKey>,
43
    key_value_expiration_loads: HashSet<Arc<Cow<'static, str>>>,
44
    view_update_last_status: HashMap<ViewKey, u64>,
45
}
46

            
47
impl TaskManager {
48
5335
    pub fn new(jobs: Manager<Task>) -> Self {
49
5335
        Self {
50
5335
            jobs,
51
5335
            statuses: Arc::default(),
52
5335
        }
53
5335
    }
54

            
55
410741
    pub fn update_view_if_needed(
56
410741
        &self,
57
410741
        view: &dyn view::Serialized,
58
410741
        database: &Database,
59
410741
        block_until_updated: bool,
60
410741
    ) -> Result<(), crate::Error> {
61
410741
        let view_name = view.view_name();
62
410741
        if let Some(job) = self.spawn_integrity_check(view, database) {
63
13550
            job.receive()??;
64
397191
        }
65

            
66
        // If there is no transaction id, there is no data, so the view is "up-to-date"
67
410741
        if let Some(current_transaction_id) = database.last_transaction_id()? {
68
410741
            let needs_reindex = {
69
                // When views finish updating, they store the last transaction_id
70
                // they mapped. If that value is current, we don't need to go
71
                // through the jobs system at all.
72
410741
                let statuses = self.statuses.read();
73
410741
                if let Some(last_transaction_indexed) = statuses.view_update_last_status.get(&(
74
410741
                    database.data.name.clone(),
75
410741
                    view.collection(),
76
410741
                    view.view_name(),
77
410741
                )) {
78
401638
                    last_transaction_indexed < &current_transaction_id
79
                } else {
80
9103
                    true
81
                }
82
            };
83

            
84
410741
            if needs_reindex {
85
212499
                let wait_for_transaction = current_transaction_id;
86
212758
                loop {
87
212758
                    let job = self.jobs.lookup_or_enqueue(Mapper {
88
212758
                        database: database.clone(),
89
212758
                        map: Map {
90
212758
                            database: database.data.name.clone(),
91
212758
                            collection: view.collection(),
92
212758
                            view_name: view_name.clone(),
93
212758
                        },
94
212758
                    });
95
212758

            
96
212758
                    if !block_until_updated {
97
152
                        break;
98
212606
                    }
99

            
100
212606
                    let id = job.receive()??;
101
212606
                    if wait_for_transaction <= id {
102
212347
                        break;
103
259
                    }
104
                }
105
198242
            }
106
        }
107

            
108
410741
        Ok(())
109
410741
    }
110

            
111
2724354
    pub fn key_value_expiration_loaded(&self, database: &Arc<Cow<'static, str>>) -> bool {
112
2724354
        let statuses = self.statuses.read();
113
2724354
        statuses.key_value_expiration_loads.contains(database)
114
2724354
    }
115

            
116
1470425
    pub fn view_integrity_checked(
117
1470425
        &self,
118
1470425
        database: Arc<Cow<'static, str>>,
119
1470425
        collection: CollectionName,
120
1470425
        view_name: ViewName,
121
1470425
    ) -> bool {
122
1470425
        let statuses = self.statuses.read();
123
1470425
        statuses
124
1470425
            .completed_integrity_checks
125
1470425
            .contains(&(database, collection, view_name))
126
1470425
    }
127

            
128
1470425
    pub fn spawn_integrity_check(
129
1470425
        &self,
130
1470425
        view: &dyn view::Serialized,
131
1470425
        database: &Database,
132
1470425
    ) -> Option<Handle<OptionalViewMapHandle, Error>> {
133
1470425
        let view_name = view.view_name();
134
1470425
        if self.view_integrity_checked(
135
1470425
            database.data.name.clone(),
136
1470425
            view.collection(),
137
1470425
            view_name.clone(),
138
1470425
        ) {
139
1449487
            None
140
        } else {
141
20938
            let job = self.jobs.lookup_or_enqueue(IntegrityScanner {
142
20938
                database: database.clone(),
143
20938
                scan: IntegrityScan {
144
20938
                    database: database.data.name.clone(),
145
20938
                    view_version: view.version(),
146
20938
                    collection: view.collection(),
147
20938
                    view_name,
148
20938
                },
149
20938
            });
150
20938
            Some(job)
151
        }
152
1470425
    }
153

            
154
19099
    pub fn mark_integrity_check_complete(
155
19099
        &self,
156
19099
        database: Arc<Cow<'static, str>>,
157
19099
        collection: CollectionName,
158
19099
        view_name: ViewName,
159
19099
    ) {
160
19099
        let mut statuses = self.statuses.write();
161
19099
        statuses
162
19099
            .completed_integrity_checks
163
19099
            .insert((database, collection, view_name));
164
19099
    }
165

            
166
17561
    pub fn mark_key_value_expiration_loaded(&self, database: Arc<Cow<'static, str>>) {
167
17561
        let mut statuses = self.statuses.write();
168
17561
        statuses.key_value_expiration_loads.insert(database);
169
17561
    }
170

            
171
221052
    pub fn mark_view_updated(
172
221052
        &self,
173
221052
        database: Arc<Cow<'static, str>>,
174
221052
        collection: CollectionName,
175
221052
        view_name: ViewName,
176
221052
        transaction_id: u64,
177
221052
    ) {
178
221052
        let mut statuses = self.statuses.write();
179
221052
        statuses
180
221052
            .view_update_last_status
181
221052
            .insert((database, collection, view_name), transaction_id);
182
221052
    }
183

            
184
2724354
    pub fn spawn_key_value_expiration_loader(
185
2724354
        &self,
186
2724354
        database: &Database,
187
2724354
    ) -> Option<Handle<(), Error>> {
188
2724354
        if self.key_value_expiration_loaded(&database.data.name) {
189
2706342
            None
190
        } else {
191
18012
            Some(self.jobs.lookup_or_enqueue(ExpirationLoader {
192
18012
                database: database.clone(),
193
18012
                launched_at: Timestamp::now(),
194
18012
            }))
195
        }
196
2724354
    }
197

            
198
8664
    pub fn spawn_compact_target(
199
8664
        &self,
200
8664
        database: Database,
201
8664
        target: compactor::Target,
202
8664
    ) -> Handle<(), Error> {
203
8664
        self.jobs
204
8664
            .lookup_or_enqueue(Compactor::target(database, target))
205
8664
    }
206

            
207
152
    pub fn compact_collection(
208
152
        &self,
209
152
        database: Database,
210
152
        collection_name: CollectionName,
211
152
    ) -> Result<(), Error> {
212
152
        Ok(self
213
152
            .jobs
214
152
            .lookup_or_enqueue(Compactor::collection(database, collection_name))
215
152
            .receive()??)
216
152
    }
217

            
218
152
    pub fn compact_key_value_store(&self, database: Database) -> Result<(), Error> {
219
152
        Ok(self
220
152
            .jobs
221
152
            .lookup_or_enqueue(Compactor::keyvalue(database))
222
152
            .receive()??)
223
152
    }
224

            
225
152
    pub fn compact_database(&self, database: Database) -> Result<(), Error> {
226
152
        Ok(self
227
152
            .jobs
228
152
            .lookup_or_enqueue(Compactor::database(database))
229
152
            .receive()??)
230
152
    }
231
}