1
use std::borrow::Cow;
2
use std::collections::{HashMap, HashSet};
3
use std::sync::Arc;
4

            
5
use bonsaidb_core::connection::Connection;
6
use bonsaidb_core::keyvalue::Timestamp;
7
use bonsaidb_core::schema::{view, CollectionName, ViewName};
8
use parking_lot::RwLock;
9

            
10
use crate::database::keyvalue::ExpirationLoader;
11
use crate::database::Database;
12
use crate::tasks::compactor::Compactor;
13
use crate::tasks::handle::Handle;
14
use crate::tasks::manager::Manager;
15
use crate::views::integrity_scanner::{IntegrityScan, IntegrityScanner, OptionalViewMapHandle};
16
use crate::views::mapper::{Map, Mapper};
17
use crate::Error;
18

            
19
/// Types related to defining [`Job`]s.
20
pub mod handle;
21
/// Types related to the job [`Manager`](manager::Manager).
22
pub mod manager;
23
mod traits;
24

            
25
pub use self::traits::{Job, Keyed};
26

            
27
mod compactor;
28
mod task;
29

            
30
pub use task::Task;
31

            
32
#[derive(Debug, Clone)]
33
pub struct TaskManager {
34
    pub jobs: Manager<Task>,
35
    statuses: Arc<RwLock<Statuses>>,
36
}
37

            
38
type ViewKey = (Arc<Cow<'static, str>>, CollectionName, ViewName);
39

            
40
5197
#[derive(Default, Debug)]
41
pub struct Statuses {
42
    completed_integrity_checks: HashSet<ViewKey>,
43
    key_value_expiration_loads: HashSet<Arc<Cow<'static, str>>>,
44
    view_update_last_status: HashMap<ViewKey, u64>,
45
}
46

            
47
impl TaskManager {
48
5197
    pub fn new(jobs: Manager<Task>) -> Self {
49
5197
        Self {
50
5197
            jobs,
51
5197
            statuses: Arc::default(),
52
5197
        }
53
5197
    }
54

            
55
392092
    pub fn update_view_if_needed(
56
392092
        &self,
57
392092
        view: &dyn view::Serialized,
58
392092
        database: &Database,
59
392092
        block_until_updated: bool,
60
392092
    ) -> Result<(), crate::Error> {
61
392092
        let view_name = view.view_name();
62
392092
        if let Some(job) = self.spawn_integrity_check(view, database) {
63
13191
            job.receive()??;
64
378901
        }
65

            
66
        // If there is no transaction id, there is no data, so the view is "up-to-date"
67
392092
        if let Some(current_transaction_id) = database.last_transaction_id()? {
68
392092
            let needs_reindex = {
69
                // When views finish updating, they store the last transaction_id
70
                // they mapped. If that value is current, we don't need to go
71
                // through the jobs system at all.
72
392092
                let statuses = self.statuses.read();
73
392092
                if let Some(last_transaction_indexed) = statuses.view_update_last_status.get(&(
74
392092
                    database.data.name.clone(),
75
392092
                    view.collection(),
76
392092
                    view.view_name(),
77
392092
                )) {
78
379897
                    last_transaction_indexed < &current_transaction_id
79
                } else {
80
12195
                    true
81
                }
82
            };
83

            
84
392092
            if needs_reindex {
85
207505
                let wait_for_transaction = current_transaction_id;
86
208405
                loop {
87
208405
                    let job = self.jobs.lookup_or_enqueue(Mapper {
88
208405
                        database: database.clone(),
89
208405
                        map: Map {
90
208405
                            database: database.data.name.clone(),
91
208405
                            collection: view.collection(),
92
208405
                            view_name: view_name.clone(),
93
208405
                        },
94
208405
                    });
95
208405

            
96
208405
                    if !block_until_updated {
97
148
                        break;
98
208257
                    }
99

            
100
208257
                    let id = job.receive()??;
101
208257
                    if wait_for_transaction <= id {
102
207357
                        break;
103
900
                    }
104
                }
105
184587
            }
106
        }
107

            
108
392092
        Ok(())
109
392092
    }
110

            
111
2638878
    pub fn key_value_expiration_loaded(&self, database: &Arc<Cow<'static, str>>) -> bool {
112
2638878
        let statuses = self.statuses.read();
113
2638878
        statuses.key_value_expiration_loads.contains(database)
114
2638878
    }
115

            
116
1421536
    pub fn view_integrity_checked(
117
1421536
        &self,
118
1421536
        database: Arc<Cow<'static, str>>,
119
1421536
        collection: CollectionName,
120
1421536
        view_name: ViewName,
121
1421536
    ) -> bool {
122
1421536
        let statuses = self.statuses.read();
123
1421536
        statuses
124
1421536
            .completed_integrity_checks
125
1421536
            .contains(&(database, collection, view_name))
126
1421536
    }
127

            
128
1421536
    pub fn spawn_integrity_check(
129
1421536
        &self,
130
1421536
        view: &dyn view::Serialized,
131
1421536
        database: &Database,
132
1421536
    ) -> Option<Handle<OptionalViewMapHandle, Error>> {
133
1421536
        let view_name = view.view_name();
134
1421536
        if self.view_integrity_checked(
135
1421536
            database.data.name.clone(),
136
1421536
            view.collection(),
137
1421536
            view_name.clone(),
138
1421536
        ) {
139
1400791
            None
140
        } else {
141
20745
            let job = self.jobs.lookup_or_enqueue(IntegrityScanner {
142
20745
                database: database.clone(),
143
20745
                scan: IntegrityScan {
144
20745
                    database: database.data.name.clone(),
145
20745
                    view_version: view.version(),
146
20745
                    collection: view.collection(),
147
20745
                    view_name,
148
20745
                },
149
20745
            });
150
20745
            Some(job)
151
        }
152
1421536
    }
153

            
154
18593
    pub fn mark_integrity_check_complete(
155
18593
        &self,
156
18593
        database: Arc<Cow<'static, str>>,
157
18593
        collection: CollectionName,
158
18593
        view_name: ViewName,
159
18593
    ) {
160
18593
        let mut statuses = self.statuses.write();
161
18593
        statuses
162
18593
            .completed_integrity_checks
163
18593
            .insert((database, collection, view_name));
164
18593
    }
165

            
166
17247
    pub fn mark_key_value_expiration_loaded(&self, database: Arc<Cow<'static, str>>) {
167
17247
        let mut statuses = self.statuses.write();
168
17247
        statuses.key_value_expiration_loads.insert(database);
169
17247
    }
170

            
171
210915
    pub fn mark_view_updated(
172
210915
        &self,
173
210915
        database: Arc<Cow<'static, str>>,
174
210915
        collection: CollectionName,
175
210915
        view_name: ViewName,
176
210915
        transaction_id: u64,
177
210915
    ) {
178
210915
        let mut statuses = self.statuses.write();
179
210915
        statuses
180
210915
            .view_update_last_status
181
210915
            .insert((database, collection, view_name), transaction_id);
182
210915
    }
183

            
184
2638878
    pub fn spawn_key_value_expiration_loader(
185
2638878
        &self,
186
2638878
        database: &Database,
187
2638878
    ) -> Option<Handle<(), Error>> {
188
2638878
        if self.key_value_expiration_loaded(&database.data.name) {
189
2621016
            None
190
        } else {
191
17862
            Some(self.jobs.lookup_or_enqueue(ExpirationLoader {
192
17862
                database: database.clone(),
193
17862
                launched_at: Timestamp::now(),
194
17862
            }))
195
        }
196
2638878
    }
197

            
198
8436
    pub fn spawn_compact_target(
199
8436
        &self,
200
8436
        database: Database,
201
8436
        target: compactor::Target,
202
8436
    ) -> Handle<(), Error> {
203
8436
        self.jobs
204
8436
            .lookup_or_enqueue(Compactor::target(database, target))
205
8436
    }
206

            
207
148
    pub fn compact_collection(
208
148
        &self,
209
148
        database: Database,
210
148
        collection_name: CollectionName,
211
148
    ) -> Result<(), Error> {
212
148
        Ok(self
213
148
            .jobs
214
148
            .lookup_or_enqueue(Compactor::collection(database, collection_name))
215
148
            .receive()??)
216
148
    }
217

            
218
148
    pub fn compact_key_value_store(&self, database: Database) -> Result<(), Error> {
219
148
        Ok(self
220
148
            .jobs
221
148
            .lookup_or_enqueue(Compactor::keyvalue(database))
222
148
            .receive()??)
223
148
    }
224

            
225
148
    pub fn compact_database(&self, database: Database) -> Result<(), Error> {
226
148
        Ok(self
227
148
            .jobs
228
148
            .lookup_or_enqueue(Compactor::database(database))
229
148
            .receive()??)
230
148
    }
231
}