1
use std::{
2
    borrow::Cow,
3
    collections::{HashMap, HashSet},
4
    sync::Arc,
5
};
6

            
7
use async_lock::RwLock;
8
use bonsaidb_core::{
9
    connection::Connection,
10
    keyvalue::Timestamp,
11
    schema::{view, CollectionName, ViewName},
12
};
13
use bonsaidb_utils::{fast_async_read, fast_async_write};
14

            
15
use crate::{
16
    database::{keyvalue::ExpirationLoader, Database},
17
    jobs::{manager::Manager, task::Handle},
18
    tasks::compactor::Compactor,
19
    views::{
20
        integrity_scanner::{IntegrityScan, IntegrityScanner},
21
        mapper::{Map, Mapper},
22
    },
23
    Error,
24
};
25

            
26
mod compactor;
27
mod task;
28

            
29
pub use task::Task;
30

            
31
#[derive(Debug, Clone)]
32
pub struct TaskManager {
33
    pub jobs: Manager<Task>,
34
    statuses: Arc<RwLock<Statuses>>,
35
}
36

            
37
type ViewKey = (Arc<Cow<'static, str>>, CollectionName, ViewName);
38

            
39
2063
#[derive(Default, Debug)]
40
pub struct Statuses {
41
    completed_integrity_checks: HashSet<ViewKey>,
42
    key_value_expiration_loads: HashSet<Arc<Cow<'static, str>>>,
43
    view_update_last_status: HashMap<ViewKey, u64>,
44
}
45

            
46
impl TaskManager {
47
2063
    pub fn new(jobs: Manager<Task>) -> Self {
48
2063
        Self {
49
2063
            jobs,
50
2063
            statuses: Arc::default(),
51
2063
        }
52
2063
    }
53

            
54
172948
    pub async fn update_view_if_needed(
55
172948
        &self,
56
172948
        view: &dyn view::Serialized,
57
172948
        database: &Database,
58
172948
    ) -> Result<(), crate::Error> {
59
172948
        let view_name = view.view_name();
60
172948
        if let Some(job) = self.spawn_integrity_check(view, database).await? {
61
5942
            job.receive().await??;
62
167006
        }
63

            
64
        // If there is no transaction id, there is no data, so the view is "up-to-date"
65
172926
        if let Some(current_transaction_id) = database.last_transaction_id().await? {
66
166057
            let needs_reindex = {
67
                // When views finish updating, they store the last transaction_id
68
                // they mapped. If that value is current, we don't need to go
69
                // through the jobs system at all.
70
166057
                let statuses = fast_async_read!(self.statuses);
71
166057
                if let Some(last_transaction_indexed) = statuses.view_update_last_status.get(&(
72
166057
                    database.data.name.clone(),
73
166057
                    view.collection(),
74
166057
                    view.view_name(),
75
166057
                )) {
76
161280
                    last_transaction_indexed < &current_transaction_id
77
                } else {
78
4777
                    true
79
                }
80
            };
81

            
82
166057
            if needs_reindex {
83
97246
                let wait_for_transaction = current_transaction_id;
84
                loop {
85
97994
                    let job = self
86
97994
                        .jobs
87
97994
                        .lookup_or_enqueue(Mapper {
88
97994
                            database: database.clone(),
89
97994
                            map: Map {
90
97994
                                database: database.data.name.clone(),
91
97994
                                collection: view.collection(),
92
97994
                                view_name: view_name.clone(),
93
97994
                            },
94
97994
                        })
95
198
                        .await;
96
97994
                    let id = job.receive().await??;
97
97972
                    if wait_for_transaction <= id {
98
97224
                        break;
99
748
                    }
100
                }
101
68811
            }
102
6891
        }
103

            
104
172926
        Ok(())
105
172926
    }
106

            
107
1147522
    pub async fn key_value_expiration_loaded(&self, database: &Arc<Cow<'static, str>>) -> bool {
108
1147544
        let statuses = fast_async_read!(self.statuses);
109
1147434
        statuses.key_value_expiration_loads.contains(database)
110
1147434
    }
111

            
112
204533
    pub async fn view_integrity_checked(
113
204533
        &self,
114
204533
        database: Arc<Cow<'static, str>>,
115
204533
        collection: CollectionName,
116
204533
        view_name: ViewName,
117
204533
    ) -> bool {
118
204533
        let statuses = fast_async_read!(self.statuses);
119
204489
        statuses
120
204489
            .completed_integrity_checks
121
204489
            .contains(&(database, collection.clone(), view_name))
122
204489
    }
123

            
124
204555
    pub async fn spawn_integrity_check(
125
204555
        &self,
126
204555
        view: &dyn view::Serialized,
127
204555
        database: &Database,
128
204555
    ) -> Result<Option<Handle<(), Error, Task>>, crate::Error> {
129
204467
        let view_name = view.view_name();
130
204467
        if !self
131
204467
            .view_integrity_checked(
132
204467
                database.data.name.clone(),
133
204467
                view.collection(),
134
204467
                view_name.clone(),
135
204467
            )
136
44
            .await
137
        {
138
6571
            let job = self
139
6571
                .jobs
140
6571
                .lookup_or_enqueue(IntegrityScanner {
141
6571
                    database: database.clone(),
142
6571
                    scan: IntegrityScan {
143
6571
                        database: database.data.name.clone(),
144
6571
                        view_version: view.version(),
145
6571
                        collection: view.collection(),
146
6571
                        view_name,
147
6571
                    },
148
6571
                })
149
                .await;
150
6571
            return Ok(Some(job));
151
197984
        }
152
197984

            
153
197984
        Ok(None)
154
204555
    }
155

            
156
6548
    pub async fn mark_integrity_check_complete(
157
6548
        &self,
158
6548
        database: Arc<Cow<'static, str>>,
159
6548
        collection: CollectionName,
160
6548
        view_name: ViewName,
161
6548
    ) {
162
6504
        let mut statuses = fast_async_write!(self.statuses);
163
6504
        statuses
164
6504
            .completed_integrity_checks
165
6504
            .insert((database, collection, view_name));
166
6504
    }
167

            
168
6800
    pub async fn mark_key_value_expiration_loaded(&self, database: Arc<Cow<'static, str>>) {
169
6844
        let mut statuses = fast_async_write!(self.statuses);
170
6800
        statuses.key_value_expiration_loads.insert(database);
171
6800
    }
172

            
173
92938
    pub async fn mark_view_updated(
174
92938
        &self,
175
92938
        database: Arc<Cow<'static, str>>,
176
92938
        collection: CollectionName,
177
92938
        view_name: ViewName,
178
92938
        transaction_id: u64,
179
92938
    ) {
180
92982
        let mut statuses = fast_async_write!(self.statuses);
181
92938
        statuses
182
92938
            .view_update_last_status
183
92938
            .insert((database, collection, view_name), transaction_id);
184
92938
    }
185

            
186
1147522
    pub async fn spawn_key_value_expiration_loader(
187
1147522
        &self,
188
1147522
        database: &crate::Database,
189
1147522
    ) -> Option<Handle<(), Error, Task>> {
190
1147522
        if self.key_value_expiration_loaded(&database.data.name).await {
191
1138313
            None
192
        } else {
193
            Some(
194
9165
                self.jobs
195
9165
                    .lookup_or_enqueue(ExpirationLoader {
196
9165
                        database: database.clone(),
197
9165
                        launched_at: Timestamp::now(),
198
9165
                    })
199
                    .await,
200
            )
201
        }
202
1147500
    }
203

            
204
268
    pub async fn compact_collection(
205
268
        &self,
206
268
        database: crate::Database,
207
268
        collection_name: CollectionName,
208
268
    ) -> Result<(), Error> {
209
268
        Ok(self
210
268
            .jobs
211
268
            .lookup_or_enqueue(Compactor::collection(database, collection_name))
212
            .await
213
268
            .receive()
214
268
            .await??)
215
268
    }
216

            
217
134
    pub async fn compact_key_value_store(&self, database: crate::Database) -> Result<(), Error> {
218
134
        Ok(self
219
134
            .jobs
220
134
            .lookup_or_enqueue(Compactor::keyvalue(database))
221
            .await
222
134
            .receive()
223
134
            .await??)
224
134
    }
225

            
226
67
    pub async fn compact_database(&self, database: crate::Database) -> Result<(), Error> {
227
67
        Ok(self
228
67
            .jobs
229
67
            .lookup_or_enqueue(Compactor::database(database))
230
            .await
231
67
            .receive()
232
67
            .await??)
233
67
    }
234
}