1
use std::borrow::Cow;
2
use std::collections::{HashMap, HashSet};
3
use std::sync::Arc;
4

            
5
use bonsaidb_core::connection::Connection;
6
use bonsaidb_core::keyvalue::Timestamp;
7
use bonsaidb_core::schema::{view, CollectionName, ViewName};
8
use parking_lot::RwLock;
9

            
10
use crate::database::keyvalue::ExpirationLoader;
11
use crate::database::Database;
12
use crate::tasks::compactor::Compactor;
13
use crate::tasks::handle::Handle;
14
use crate::tasks::manager::Manager;
15
use crate::views::integrity_scanner::{IntegrityScan, IntegrityScanner, OptionalViewMapHandle};
16
use crate::views::mapper::{Map, Mapper};
17
use crate::Error;
18

            
19
/// Types related to defining [`Job`]s.
20
pub mod handle;
21
/// Types related to the job [`Manager`](manager::Manager).
22
pub mod manager;
23
mod traits;
24

            
25
pub use self::traits::{Job, Keyed};
26

            
27
mod compactor;
28
mod task;
29

            
30
pub use task::Task;
31

            
32
#[derive(Debug, Clone)]
33
pub struct TaskManager {
34
    pub jobs: Manager<Task>,
35
    statuses: Arc<RwLock<Statuses>>,
36
}
37

            
38
type ViewKey = (Arc<Cow<'static, str>>, CollectionName, ViewName);
39

            
40
5195
#[derive(Default, Debug)]
41
pub struct Statuses {
42
    completed_integrity_checks: HashSet<ViewKey>,
43
    key_value_expiration_loads: HashSet<Arc<Cow<'static, str>>>,
44
    view_update_last_status: HashMap<ViewKey, u64>,
45
}
46

            
47
impl TaskManager {
48
5195
    pub fn new(jobs: Manager<Task>) -> Self {
49
5195
        Self {
50
5195
            jobs,
51
5195
            statuses: Arc::default(),
52
5195
        }
53
5195
    }
54

            
55
401522
    pub fn update_view_if_needed(
56
401522
        &self,
57
401522
        view: &dyn view::Serialized,
58
401522
        database: &Database,
59
401522
        block_until_updated: bool,
60
401522
    ) -> Result<(), crate::Error> {
61
401522
        let view_name = view.view_name();
62
401522
        if let Some(job) = self.spawn_integrity_check(view, database) {
63
13189
            job.receive()??;
64
388333
        }
65

            
66
        // If there is no transaction id, there is no data, so the view is "up-to-date"
67
401522
        if let Some(current_transaction_id) = database.last_transaction_id()? {
68
401522
            let needs_reindex = {
69
                // When views finish updating, they store the last transaction_id
70
                // they mapped. If that value is current, we don't need to go
71
                // through the jobs system at all.
72
401522
                let statuses = self.statuses.read();
73
401522
                if let Some(last_transaction_indexed) = statuses.view_update_last_status.get(&(
74
401522
                    database.data.name.clone(),
75
401522
                    view.collection(),
76
401522
                    view.view_name(),
77
401522
                )) {
78
392256
                    last_transaction_indexed < &current_transaction_id
79
                } else {
80
9266
                    true
81
                }
82
            };
83

            
84
401522
            if needs_reindex {
85
204828
                let wait_for_transaction = current_transaction_id;
86
205044
                loop {
87
205044
                    let job = self.jobs.lookup_or_enqueue(Mapper {
88
205044
                        database: database.clone(),
89
205044
                        map: Map {
90
205044
                            database: database.data.name.clone(),
91
205044
                            collection: view.collection(),
92
205044
                            view_name: view_name.clone(),
93
205044
                        },
94
205044
                    });
95
205044

            
96
205044
                    if !block_until_updated {
97
148
                        break;
98
204896
                    }
99

            
100
204896
                    let id = job.receive()??;
101
204896
                    if wait_for_transaction <= id {
102
204680
                        break;
103
216
                    }
104
                }
105
196694
            }
106
        }
107

            
108
401522
        Ok(())
109
401522
    }
110

            
111
2670763
    pub fn key_value_expiration_loaded(&self, database: &Arc<Cow<'static, str>>) -> bool {
112
2670763
        let statuses = self.statuses.read();
113
2670763
        statuses.key_value_expiration_loads.contains(database)
114
2670763
    }
115

            
116
1454148
    pub fn view_integrity_checked(
117
1454148
        &self,
118
1454148
        database: Arc<Cow<'static, str>>,
119
1454148
        collection: CollectionName,
120
1454148
        view_name: ViewName,
121
1454148
    ) -> bool {
122
1454148
        let statuses = self.statuses.read();
123
1454148
        statuses
124
1454148
            .completed_integrity_checks
125
1454148
            .contains(&(database, collection, view_name))
126
1454148
    }
127

            
128
1454148
    pub fn spawn_integrity_check(
129
1454148
        &self,
130
1454148
        view: &dyn view::Serialized,
131
1454148
        database: &Database,
132
1454148
    ) -> Option<Handle<OptionalViewMapHandle, Error>> {
133
1454148
        let view_name = view.view_name();
134
1454148
        if self.view_integrity_checked(
135
1454148
            database.data.name.clone(),
136
1454148
            view.collection(),
137
1454148
            view_name.clone(),
138
1454148
        ) {
139
1433436
            None
140
        } else {
141
20712
            let job = self.jobs.lookup_or_enqueue(IntegrityScanner {
142
20712
                database: database.clone(),
143
20712
                scan: IntegrityScan {
144
20712
                    database: database.data.name.clone(),
145
20712
                    view_version: view.version(),
146
20712
                    collection: view.collection(),
147
20712
                    view_name,
148
20712
                },
149
20712
            });
150
20712
            Some(job)
151
        }
152
1454148
    }
153

            
154
18663
    pub fn mark_integrity_check_complete(
155
18663
        &self,
156
18663
        database: Arc<Cow<'static, str>>,
157
18663
        collection: CollectionName,
158
18663
        view_name: ViewName,
159
18663
    ) {
160
18663
        let mut statuses = self.statuses.write();
161
18663
        statuses
162
18663
            .completed_integrity_checks
163
18663
            .insert((database, collection, view_name));
164
18663
    }
165

            
166
17242
    pub fn mark_key_value_expiration_loaded(&self, database: Arc<Cow<'static, str>>) {
167
17242
        let mut statuses = self.statuses.write();
168
17242
        statuses.key_value_expiration_loads.insert(database);
169
17242
    }
170

            
171
212533
    pub fn mark_view_updated(
172
212533
        &self,
173
212533
        database: Arc<Cow<'static, str>>,
174
212533
        collection: CollectionName,
175
212533
        view_name: ViewName,
176
212533
        transaction_id: u64,
177
212533
    ) {
178
212533
        let mut statuses = self.statuses.write();
179
212533
        statuses
180
212533
            .view_update_last_status
181
212533
            .insert((database, collection, view_name), transaction_id);
182
212533
    }
183

            
184
2670763
    pub fn spawn_key_value_expiration_loader(
185
2670763
        &self,
186
2670763
        database: &Database,
187
2670763
    ) -> Option<Handle<(), Error>> {
188
2670763
        if self.key_value_expiration_loaded(&database.data.name) {
189
2652974
            None
190
        } else {
191
17789
            Some(self.jobs.lookup_or_enqueue(ExpirationLoader {
192
17789
                database: database.clone(),
193
17789
                launched_at: Timestamp::now(),
194
17789
            }))
195
        }
196
2670763
    }
197

            
198
8436
    pub fn spawn_compact_target(
199
8436
        &self,
200
8436
        database: Database,
201
8436
        target: compactor::Target,
202
8436
    ) -> Handle<(), Error> {
203
8436
        self.jobs
204
8436
            .lookup_or_enqueue(Compactor::target(database, target))
205
8436
    }
206

            
207
148
    pub fn compact_collection(
208
148
        &self,
209
148
        database: Database,
210
148
        collection_name: CollectionName,
211
148
    ) -> Result<(), Error> {
212
148
        Ok(self
213
148
            .jobs
214
148
            .lookup_or_enqueue(Compactor::collection(database, collection_name))
215
148
            .receive()??)
216
148
    }
217

            
218
148
    pub fn compact_key_value_store(&self, database: Database) -> Result<(), Error> {
219
148
        Ok(self
220
148
            .jobs
221
148
            .lookup_or_enqueue(Compactor::keyvalue(database))
222
148
            .receive()??)
223
148
    }
224

            
225
148
    pub fn compact_database(&self, database: Database) -> Result<(), Error> {
226
148
        Ok(self
227
148
            .jobs
228
148
            .lookup_or_enqueue(Compactor::database(database))
229
148
            .receive()??)
230
148
    }
231
}