1
use std::{
2
    borrow::Cow,
3
    collections::{HashMap, HashSet},
4
    sync::Arc,
5
};
6

            
7
use async_lock::RwLock;
8
use bonsaidb_core::{
9
    connection::Connection,
10
    keyvalue::Timestamp,
11
    schema::{view, CollectionName, ViewName},
12
};
13
use bonsaidb_utils::{fast_async_read, fast_async_write};
14

            
15
use crate::{
16
    database::{keyvalue::ExpirationLoader, Database},
17
    jobs::{manager::Manager, task::Handle},
18
    tasks::compactor::Compactor,
19
    views::{
20
        integrity_scanner::{IntegrityScan, IntegrityScanner, OptionalViewMapHandle},
21
        mapper::{Map, Mapper},
22
    },
23
    Error,
24
};
25

            
26
mod compactor;
27
mod task;
28

            
29
pub use task::Task;
30

            
31
#[derive(Debug, Clone)]
32
pub struct TaskManager {
33
    pub jobs: Manager<Task>,
34
    statuses: Arc<RwLock<Statuses>>,
35
}
36

            
37
type ViewKey = (Arc<Cow<'static, str>>, CollectionName, ViewName);
38

            
39
2619
#[derive(Default, Debug)]
40
pub struct Statuses {
41
    completed_integrity_checks: HashSet<ViewKey>,
42
    key_value_expiration_loads: HashSet<Arc<Cow<'static, str>>>,
43
    view_update_last_status: HashMap<ViewKey, u64>,
44
}
45

            
46
impl TaskManager {
47
2569
    pub fn new(jobs: Manager<Task>) -> Self {
48
2569
        Self {
49
2569
            jobs,
50
2569
            statuses: Arc::default(),
51
2569
        }
52
2569
    }
53

            
54
245144
    pub async fn update_view_if_needed(
55
245144
        &self,
56
245144
        view: &dyn view::Serialized,
57
245144
        database: &Database,
58
245144
    ) -> Result<(), crate::Error> {
59
245144
        let view_name = view.view_name();
60
245144
        if let Some(job) = self.spawn_integrity_check(view, database).await? {
61
7357
            job.receive().await??;
62
237787
        }
63

            
64
        // If there is no transaction id, there is no data, so the view is "up-to-date"
65
245144
        if let Some(current_transaction_id) = database.last_transaction_id().await? {
66
236687
            let needs_reindex = {
67
                // When views finish updating, they store the last transaction_id
68
                // they mapped. If that value is current, we don't need to go
69
                // through the jobs system at all.
70
236837
                let statuses = fast_async_read!(self.statuses);
71
236687
                if let Some(last_transaction_indexed) = statuses.view_update_last_status.get(&(
72
236687
                    database.data.name.clone(),
73
236687
                    view.collection(),
74
236687
                    view.view_name(),
75
236687
                )) {
76
230503
                    last_transaction_indexed < &current_transaction_id
77
                } else {
78
6184
                    true
79
                }
80
            };
81

            
82
236687
            if needs_reindex {
83
138029
                let wait_for_transaction = current_transaction_id;
84
                loop {
85
139254
                    let job = self
86
139254
                        .jobs
87
139254
                        .lookup_or_enqueue(Mapper {
88
139254
                            database: database.clone(),
89
139254
                            map: Map {
90
139254
                                database: database.data.name.clone(),
91
139254
                                collection: view.collection(),
92
139254
                                view_name: view_name.clone(),
93
139254
                            },
94
139254
                        })
95
400
                        .await;
96
139254
                    let id = job.receive().await??;
97
139179
                    if wait_for_transaction <= id {
98
137954
                        break;
99
1225
                    }
100
                }
101
98658
            }
102
8457
        }
103

            
104
245069
        Ok(())
105
245069
    }
106

            
107
1317963
    pub async fn key_value_expiration_loaded(&self, database: &Arc<Cow<'static, str>>) -> bool {
108
1318063
        let statuses = fast_async_read!(self.statuses);
109
1317963
        statuses.key_value_expiration_loads.contains(database)
110
1317963
    }
111

            
112
281749
    pub async fn view_integrity_checked(
113
281749
        &self,
114
281749
        database: Arc<Cow<'static, str>>,
115
281749
        collection: CollectionName,
116
281749
        view_name: ViewName,
117
281749
    ) -> bool {
118
281799
        let statuses = fast_async_read!(self.statuses);
119
281749
        statuses
120
281749
            .completed_integrity_checks
121
281749
            .contains(&(database, collection.clone(), view_name))
122
281749
    }
123

            
124
281749
    pub async fn spawn_integrity_check(
125
281749
        &self,
126
281749
        view: &dyn view::Serialized,
127
281749
        database: &Database,
128
281749
    ) -> Result<Option<Handle<OptionalViewMapHandle, Error, Task>>, crate::Error> {
129
281749
        let view_name = view.view_name();
130
281749
        if !self
131
281749
            .view_integrity_checked(
132
281749
                database.data.name.clone(),
133
281749
                view.collection(),
134
281749
                view_name.clone(),
135
281749
            )
136
50
            .await
137
        {
138
8175
            let job = self
139
8175
                .jobs
140
8175
                .lookup_or_enqueue(IntegrityScanner {
141
8175
                    database: database.clone(),
142
8175
                    scan: IntegrityScan {
143
8175
                        database: database.data.name.clone(),
144
8175
                        view_version: view.version(),
145
8175
                        collection: view.collection(),
146
8175
                        view_name,
147
8175
                    },
148
8175
                })
149
                .await;
150
8175
            return Ok(Some(job));
151
273574
        }
152
273574

            
153
273574
        Ok(None)
154
281749
    }
155

            
156
8149
    pub async fn mark_integrity_check_complete(
157
8149
        &self,
158
8149
        database: Arc<Cow<'static, str>>,
159
8149
        collection: CollectionName,
160
8149
        view_name: ViewName,
161
8149
    ) {
162
8149
        let mut statuses = fast_async_write!(self.statuses);
163
8149
        statuses
164
8149
            .completed_integrity_checks
165
8149
            .insert((database, collection, view_name));
166
8149
    }
167

            
168
8321
    pub async fn mark_key_value_expiration_loaded(&self, database: Arc<Cow<'static, str>>) {
169
8321
        let mut statuses = fast_async_write!(self.statuses);
170
8321
        statuses.key_value_expiration_loads.insert(database);
171
8321
    }
172

            
173
129458
    pub async fn mark_view_updated(
174
129458
        &self,
175
129458
        database: Arc<Cow<'static, str>>,
176
129458
        collection: CollectionName,
177
129458
        view_name: ViewName,
178
129458
        transaction_id: u64,
179
129458
    ) {
180
129608
        let mut statuses = fast_async_write!(self.statuses);
181
129458
        statuses
182
129458
            .view_update_last_status
183
129458
            .insert((database, collection, view_name), transaction_id);
184
129458
    }
185

            
186
1317963
    pub async fn spawn_key_value_expiration_loader(
187
1317963
        &self,
188
1317963
        database: &crate::Database,
189
1317963
    ) -> Option<Handle<(), Error, Task>> {
190
1317938
        if self.key_value_expiration_loaded(&database.data.name).await {
191
1308274
            None
192
        } else {
193
            Some(
194
9689
                self.jobs
195
9689
                    .lookup_or_enqueue(ExpirationLoader {
196
9689
                        database: database.clone(),
197
9689
                        launched_at: Timestamp::now(),
198
9689
                    })
199
                    .await,
200
            )
201
        }
202
1317963
    }
203

            
204
308
    pub async fn compact_collection(
205
308
        &self,
206
308
        database: crate::Database,
207
308
        collection_name: CollectionName,
208
308
    ) -> Result<(), Error> {
209
308
        Ok(self
210
308
            .jobs
211
308
            .lookup_or_enqueue(Compactor::collection(database, collection_name))
212
            .await
213
308
            .receive()
214
308
            .await??)
215
308
    }
216

            
217
154
    pub async fn compact_key_value_store(&self, database: crate::Database) -> Result<(), Error> {
218
154
        Ok(self
219
154
            .jobs
220
154
            .lookup_or_enqueue(Compactor::keyvalue(database))
221
            .await
222
154
            .receive()
223
154
            .await??)
224
154
    }
225

            
226
77
    pub async fn compact_database(&self, database: crate::Database) -> Result<(), Error> {
227
77
        Ok(self
228
77
            .jobs
229
77
            .lookup_or_enqueue(Compactor::database(database))
230
            .await
231
77
            .receive()
232
77
            .await??)
233
77
    }
234
}