1
use std::{
2
    borrow::Cow,
3
    collections::{HashMap, HashSet},
4
    sync::Arc,
5
};
6

            
7
use async_lock::RwLock;
8
use bonsaidb_core::{
9
    connection::Connection,
10
    keyvalue::Timestamp,
11
    schema::{view, CollectionName, ViewName},
12
};
13
use bonsaidb_utils::{fast_async_read, fast_async_write};
14

            
15
use crate::{
16
    database::{keyvalue::ExpirationLoader, Database},
17
    jobs::{manager::Manager, task::Handle},
18
    tasks::compactor::Compactor,
19
    views::{
20
        integrity_scanner::{IntegrityScan, IntegrityScanner},
21
        mapper::{Map, Mapper},
22
    },
23
    Error,
24
};
25

            
26
mod compactor;
27
mod task;
28

            
29
pub use task::Task;
30

            
31
#[derive(Debug, Clone)]
32
pub struct TaskManager {
33
    pub jobs: Manager<Task>,
34
    statuses: Arc<RwLock<Statuses>>,
35
}
36

            
37
type ViewKey = (Arc<Cow<'static, str>>, CollectionName, ViewName);
38

            
39
2277
#[derive(Default, Debug)]
40
pub struct Statuses {
41
    completed_integrity_checks: HashSet<ViewKey>,
42
    key_value_expiration_loads: HashSet<Arc<Cow<'static, str>>>,
43
    view_update_last_status: HashMap<ViewKey, u64>,
44
}
45

            
46
impl TaskManager {
47
2277
    pub fn new(jobs: Manager<Task>) -> Self {
48
2277
        Self {
49
2277
            jobs,
50
2277
            statuses: Arc::default(),
51
2277
        }
52
2277
    }
53

            
54
193134
    pub async fn update_view_if_needed(
55
193134
        &self,
56
193134
        view: &dyn view::Serialized,
57
193134
        database: &Database,
58
193134
    ) -> Result<(), crate::Error> {
59
193134
        let view_name = view.view_name();
60
193134
        if let Some(job) = self.spawn_integrity_check(view, database).await? {
61
6538
            job.receive().await??;
62
186596
        }
63

            
64
        // If there is no transaction id, there is no data, so the view is "up-to-date"
65
193134
        if let Some(current_transaction_id) = database.last_transaction_id().await? {
66
185442
            let needs_reindex = {
67
                // When views finish updating, they store the last transaction_id
68
                // they mapped. If that value is current, we don't need to go
69
                // through the jobs system at all.
70
185466
                let statuses = fast_async_read!(self.statuses);
71
185442
                if let Some(last_transaction_indexed) = statuses.view_update_last_status.get(&(
72
185442
                    database.data.name.clone(),
73
185442
                    view.collection(),
74
185442
                    view.view_name(),
75
185442
                )) {
76
180054
                    last_transaction_indexed < &current_transaction_id
77
                } else {
78
5388
                    true
79
                }
80
            };
81

            
82
185442
            if needs_reindex {
83
110791
                let wait_for_transaction = current_transaction_id;
84
                loop {
85
111705
                    let job = self
86
111705
                        .jobs
87
111705
                        .lookup_or_enqueue(Mapper {
88
111705
                            database: database.clone(),
89
111705
                            map: Map {
90
111705
                                database: database.data.name.clone(),
91
111705
                                collection: view.collection(),
92
111705
                                view_name: view_name.clone(),
93
111705
                            },
94
111705
                        })
95
288
                        .await;
96
111705
                    let id = job.receive().await??;
97
111705
                    if wait_for_transaction <= id {
98
110791
                        break;
99
914
                    }
100
                }
101
74651
            }
102
7692
        }
103

            
104
193134
        Ok(())
105
193134
    }
106

            
107
1279393
    pub async fn key_value_expiration_loaded(&self, database: &Arc<Cow<'static, str>>) -> bool {
108
1279465
        let statuses = fast_async_read!(self.statuses);
109
1279393
        statuses.key_value_expiration_loads.contains(database)
110
1279393
    }
111

            
112
227754
    pub async fn view_integrity_checked(
113
227754
        &self,
114
227754
        database: Arc<Cow<'static, str>>,
115
227754
        collection: CollectionName,
116
227754
        view_name: ViewName,
117
227754
    ) -> bool {
118
227826
        let statuses = fast_async_read!(self.statuses);
119
227754
        statuses
120
227754
            .completed_integrity_checks
121
227754
            .contains(&(database, collection.clone(), view_name))
122
227754
    }
123

            
124
227754
    pub async fn spawn_integrity_check(
125
227754
        &self,
126
227754
        view: &dyn view::Serialized,
127
227754
        database: &Database,
128
227754
    ) -> Result<Option<Handle<(), Error, Task>>, crate::Error> {
129
227754
        let view_name = view.view_name();
130
227754
        if !self
131
227754
            .view_integrity_checked(
132
227754
                database.data.name.clone(),
133
227754
                view.collection(),
134
227754
                view_name.clone(),
135
227754
            )
136
72
            .await
137
        {
138
7228
            let job = self
139
7228
                .jobs
140
7228
                .lookup_or_enqueue(IntegrityScanner {
141
7228
                    database: database.clone(),
142
7228
                    scan: IntegrityScan {
143
7228
                        database: database.data.name.clone(),
144
7228
                        view_version: view.version(),
145
7228
                        collection: view.collection(),
146
7228
                        view_name,
147
7228
                    },
148
7228
                })
149
                .await;
150
7228
            return Ok(Some(job));
151
220526
        }
152
220526

            
153
220526
        Ok(None)
154
227754
    }
155

            
156
7203
    pub async fn mark_integrity_check_complete(
157
7203
        &self,
158
7203
        database: Arc<Cow<'static, str>>,
159
7203
        collection: CollectionName,
160
7203
        view_name: ViewName,
161
7203
    ) {
162
7203
        let mut statuses = fast_async_write!(self.statuses);
163
7203
        statuses
164
7203
            .completed_integrity_checks
165
7203
            .insert((database, collection, view_name));
166
7203
    }
167

            
168
7440
    pub async fn mark_key_value_expiration_loaded(&self, database: Arc<Cow<'static, str>>) {
169
7440
        let mut statuses = fast_async_write!(self.statuses);
170
7440
        statuses.key_value_expiration_loads.insert(database);
171
7440
    }
172

            
173
105973
    pub async fn mark_view_updated(
174
105973
        &self,
175
105973
        database: Arc<Cow<'static, str>>,
176
105973
        collection: CollectionName,
177
105973
        view_name: ViewName,
178
105973
        transaction_id: u64,
179
105973
    ) {
180
106021
        let mut statuses = fast_async_write!(self.statuses);
181
105973
        statuses
182
105973
            .view_update_last_status
183
105973
            .insert((database, collection, view_name), transaction_id);
184
105973
    }
185

            
186
1279393
    pub async fn spawn_key_value_expiration_loader(
187
1279393
        &self,
188
1279393
        database: &crate::Database,
189
1279393
    ) -> Option<Handle<(), Error, Task>> {
190
1279393
        if self.key_value_expiration_loaded(&database.data.name).await {
191
1269152
            None
192
        } else {
193
            Some(
194
10241
                self.jobs
195
10241
                    .lookup_or_enqueue(ExpirationLoader {
196
10241
                        database: database.clone(),
197
10241
                        launched_at: Timestamp::now(),
198
10241
                    })
199
                    .await,
200
            )
201
        }
202
1279393
    }
203

            
204
296
    pub async fn compact_collection(
205
296
        &self,
206
296
        database: crate::Database,
207
296
        collection_name: CollectionName,
208
296
    ) -> Result<(), Error> {
209
296
        Ok(self
210
296
            .jobs
211
296
            .lookup_or_enqueue(Compactor::collection(database, collection_name))
212
            .await
213
296
            .receive()
214
296
            .await??)
215
296
    }
216

            
217
148
    pub async fn compact_key_value_store(&self, database: crate::Database) -> Result<(), Error> {
218
148
        Ok(self
219
148
            .jobs
220
148
            .lookup_or_enqueue(Compactor::keyvalue(database))
221
            .await
222
148
            .receive()
223
148
            .await??)
224
148
    }
225

            
226
74
    pub async fn compact_database(&self, database: crate::Database) -> Result<(), Error> {
227
74
        Ok(self
228
74
            .jobs
229
74
            .lookup_or_enqueue(Compactor::database(database))
230
            .await
231
74
            .receive()
232
74
            .await??)
233
74
    }
234
}