1
use std::{
2
    borrow::Cow,
3
    collections::{HashMap, HashSet},
4
    sync::Arc,
5
};
6

            
7
use async_lock::RwLock;
8
use bonsaidb_core::{
9
    connection::Connection,
10
    keyvalue::Timestamp,
11
    schema::{view, CollectionName, ViewName},
12
};
13
use bonsaidb_utils::{fast_async_read, fast_async_write};
14

            
15
use crate::{
16
    database::{keyvalue::ExpirationLoader, Database},
17
    jobs::{manager::Manager, task::Handle},
18
    tasks::compactor::Compactor,
19
    views::{
20
        integrity_scanner::{IntegrityScan, IntegrityScanner},
21
        mapper::{Map, Mapper},
22
    },
23
    Error,
24
};
25

            
26
mod compactor;
27
mod task;
28

            
29
pub use task::Task;
30

            
31
#[derive(Debug, Clone)]
32
pub struct TaskManager {
33
    pub jobs: Manager<Task>,
34
    statuses: Arc<RwLock<Statuses>>,
35
}
36

            
37
type ViewKey = (Arc<Cow<'static, str>>, CollectionName, ViewName);
38

            
39
2277
#[derive(Default, Debug)]
40
pub struct Statuses {
41
    completed_integrity_checks: HashSet<ViewKey>,
42
    key_value_expiration_loads: HashSet<Arc<Cow<'static, str>>>,
43
    view_update_last_status: HashMap<ViewKey, u64>,
44
}
45

            
46
impl TaskManager {
47
2277
    pub fn new(jobs: Manager<Task>) -> Self {
48
2277
        Self {
49
2277
            jobs,
50
2277
            statuses: Arc::default(),
51
2277
        }
52
2277
    }
53

            
54
193854
    pub async fn update_view_if_needed(
55
193854
        &self,
56
193854
        view: &dyn view::Serialized,
57
193854
        database: &Database,
58
193854
    ) -> Result<(), crate::Error> {
59
193854
        let view_name = view.view_name();
60
193854
        if let Some(job) = self.spawn_integrity_check(view, database).await? {
61
6538
            job.receive().await??;
62
187316
        }
63

            
64
        // If there is no transaction id, there is no data, so the view is "up-to-date"
65
193854
        if let Some(current_transaction_id) = database.last_transaction_id().await? {
66
186162
            let needs_reindex = {
67
                // When views finish updating, they store the last transaction_id
68
                // they mapped. If that value is current, we don't need to go
69
                // through the jobs system at all.
70
186282
                let statuses = fast_async_read!(self.statuses);
71
186162
                if let Some(last_transaction_indexed) = statuses.view_update_last_status.get(&(
72
186162
                    database.data.name.clone(),
73
186162
                    view.collection(),
74
186162
                    view.view_name(),
75
186162
                )) {
76
180799
                    last_transaction_indexed < &current_transaction_id
77
                } else {
78
5363
                    true
79
                }
80
            };
81

            
82
186162
            if needs_reindex {
83
109472
                let wait_for_transaction = current_transaction_id;
84
                loop {
85
110146
                    let job = self
86
110146
                        .jobs
87
110146
                        .lookup_or_enqueue(Mapper {
88
110146
                            database: database.clone(),
89
110146
                            map: Map {
90
110146
                                database: database.data.name.clone(),
91
110146
                                collection: view.collection(),
92
110146
                                view_name: view_name.clone(),
93
110146
                            },
94
110146
                        })
95
288
                        .await;
96
110146
                    let id = job.receive().await??;
97
110146
                    if wait_for_transaction <= id {
98
109472
                        break;
99
674
                    }
100
                }
101
76690
            }
102
7692
        }
103

            
104
193854
        Ok(())
105
193854
    }
106

            
107
1269241
    pub async fn key_value_expiration_loaded(&self, database: &Arc<Cow<'static, str>>) -> bool {
108
1269337
        let statuses = fast_async_read!(self.statuses);
109
1269241
        statuses.key_value_expiration_loads.contains(database)
110
1269241
    }
111

            
112
228450
    pub async fn view_integrity_checked(
113
228450
        &self,
114
228450
        database: Arc<Cow<'static, str>>,
115
228450
        collection: CollectionName,
116
228450
        view_name: ViewName,
117
228450
    ) -> bool {
118
228594
        let statuses = fast_async_read!(self.statuses);
119
228474
        statuses
120
228474
            .completed_integrity_checks
121
228474
            .contains(&(database, collection.clone(), view_name))
122
228474
    }
123

            
124
228450
    pub async fn spawn_integrity_check(
125
228450
        &self,
126
228450
        view: &dyn view::Serialized,
127
228450
        database: &Database,
128
228474
    ) -> Result<Option<Handle<(), Error, Task>>, crate::Error> {
129
228474
        let view_name = view.view_name();
130
228474
        if !self
131
228474
            .view_integrity_checked(
132
228474
                database.data.name.clone(),
133
228474
                view.collection(),
134
228474
                view_name.clone(),
135
228474
            )
136
120
            .await
137
        {
138
7228
            let job = self
139
7228
                .jobs
140
7228
                .lookup_or_enqueue(IntegrityScanner {
141
7228
                    database: database.clone(),
142
7228
                    scan: IntegrityScan {
143
7228
                        database: database.data.name.clone(),
144
7228
                        view_version: view.version(),
145
7228
                        collection: view.collection(),
146
7228
                        view_name,
147
7228
                    },
148
7228
                })
149
                .await;
150
7228
            return Ok(Some(job));
151
221246
        }
152
221246

            
153
221246
        Ok(None)
154
228474
    }
155

            
156
7203
    pub async fn mark_integrity_check_complete(
157
7203
        &self,
158
7203
        database: Arc<Cow<'static, str>>,
159
7203
        collection: CollectionName,
160
7203
        view_name: ViewName,
161
7203
    ) {
162
7203
        let mut statuses = fast_async_write!(self.statuses);
163
7203
        statuses
164
7203
            .completed_integrity_checks
165
7203
            .insert((database, collection, view_name));
166
7203
    }
167

            
168
7462
    pub async fn mark_key_value_expiration_loaded(&self, database: Arc<Cow<'static, str>>) {
169
7462
        let mut statuses = fast_async_write!(self.statuses);
170
7462
        statuses.key_value_expiration_loads.insert(database);
171
7462
    }
172

            
173
104559
    pub async fn mark_view_updated(
174
104559
        &self,
175
104559
        database: Arc<Cow<'static, str>>,
176
104559
        collection: CollectionName,
177
104559
        view_name: ViewName,
178
104559
        transaction_id: u64,
179
104559
    ) {
180
104631
        let mut statuses = fast_async_write!(self.statuses);
181
104559
        statuses
182
104559
            .view_update_last_status
183
104559
            .insert((database, collection, view_name), transaction_id);
184
104559
    }
185

            
186
1269265
    pub async fn spawn_key_value_expiration_loader(
187
1269265
        &self,
188
1269265
        database: &crate::Database,
189
1269265
    ) -> Option<Handle<(), Error, Task>> {
190
1269217
        if self.key_value_expiration_loaded(&database.data.name).await {
191
1259223
            None
192
        } else {
193
            Some(
194
10042
                self.jobs
195
10042
                    .lookup_or_enqueue(ExpirationLoader {
196
10042
                        database: database.clone(),
197
10042
                        launched_at: Timestamp::now(),
198
10042
                    })
199
                    .await,
200
            )
201
        }
202
1269265
    }
203

            
204
296
    pub async fn compact_collection(
205
296
        &self,
206
296
        database: crate::Database,
207
296
        collection_name: CollectionName,
208
296
    ) -> Result<(), Error> {
209
296
        Ok(self
210
296
            .jobs
211
296
            .lookup_or_enqueue(Compactor::collection(database, collection_name))
212
            .await
213
296
            .receive()
214
296
            .await??)
215
296
    }
216

            
217
148
    pub async fn compact_key_value_store(&self, database: crate::Database) -> Result<(), Error> {
218
148
        Ok(self
219
148
            .jobs
220
148
            .lookup_or_enqueue(Compactor::keyvalue(database))
221
            .await
222
148
            .receive()
223
148
            .await??)
224
148
    }
225

            
226
74
    pub async fn compact_database(&self, database: crate::Database) -> Result<(), Error> {
227
74
        Ok(self
228
74
            .jobs
229
74
            .lookup_or_enqueue(Compactor::database(database))
230
            .await
231
74
            .receive()
232
74
            .await??)
233
74
    }
234
}