1
use std::{
2
    borrow::Cow,
3
    collections::{HashMap, HashSet},
4
    sync::Arc,
5
};
6

            
7
use async_lock::RwLock;
8
use bonsaidb_core::{
9
    connection::Connection,
10
    keyvalue::Timestamp,
11
    schema::{view, CollectionName, ViewName},
12
};
13
use bonsaidb_utils::{fast_async_read, fast_async_write};
14

            
15
use crate::{
16
    database::{keyvalue::ExpirationLoader, Database},
17
    tasks::{compactor::Compactor, handle::Handle, manager::Manager},
18
    views::{
19
        integrity_scanner::{IntegrityScan, IntegrityScanner, OptionalViewMapHandle},
20
        mapper::{Map, Mapper},
21
    },
22
    Error,
23
};
24

            
25
/// Types related to defining [`Job`]s.
26
pub mod handle;
27
/// Types related to the job [`Manager`](manager::Manager).
28
pub mod manager;
29
mod traits;
30

            
31
pub use self::traits::{Job, Keyed};
32

            
33
mod compactor;
34
mod task;
35

            
36
pub use task::Task;
37

            
38
#[derive(Debug, Clone)]
39
pub struct TaskManager {
40
    pub jobs: Manager<Task>,
41
    statuses: Arc<RwLock<Statuses>>,
42
}
43

            
44
type ViewKey = (Arc<Cow<'static, str>>, CollectionName, ViewName);
45

            
46
2725
#[derive(Default, Debug)]
47
pub struct Statuses {
48
    completed_integrity_checks: HashSet<ViewKey>,
49
    key_value_expiration_loads: HashSet<Arc<Cow<'static, str>>>,
50
    view_update_last_status: HashMap<ViewKey, u64>,
51
}
52

            
53
impl TaskManager {
54
2699
    pub fn new(jobs: Manager<Task>) -> Self {
55
2699
        Self {
56
2699
            jobs,
57
2699
            statuses: Arc::default(),
58
2699
        }
59
2699
    }
60

            
61
257265
    pub async fn update_view_if_needed(
62
257265
        &self,
63
257265
        view: &dyn view::Serialized,
64
257265
        database: &Database,
65
257265
    ) -> Result<(), crate::Error> {
66
257265
        let view_name = view.view_name();
67
257265
        if let Some(job) = self.spawn_integrity_check(view, database).await? {
68
7664
            job.receive().await??;
69
249601
        }
70

            
71
        // If there is no transaction id, there is no data, so the view is "up-to-date"
72
257265
        if let Some(current_transaction_id) = database.last_transaction_id().await? {
73
257265
            let needs_reindex = {
74
                // When views finish updating, they store the last transaction_id
75
                // they mapped. If that value is current, we don't need to go
76
                // through the jobs system at all.
77
257395
                let statuses = fast_async_read!(self.statuses);
78
257265
                if let Some(last_transaction_indexed) = statuses.view_update_last_status.get(&(
79
257265
                    database.data.name.clone(),
80
257265
                    view.collection(),
81
257265
                    view.view_name(),
82
257265
                )) {
83
249887
                    last_transaction_indexed < &current_transaction_id
84
                } else {
85
7378
                    true
86
                }
87
            };
88

            
89
257265
            if needs_reindex {
90
150406
                let wait_for_transaction = current_transaction_id;
91
                loop {
92
151368
                    let job = self
93
151368
                        .jobs
94
151368
                        .lookup_or_enqueue(Mapper {
95
151368
                            database: database.clone(),
96
151368
                            map: Map {
97
151368
                                database: database.data.name.clone(),
98
151368
                                collection: view.collection(),
99
151368
                                view_name: view_name.clone(),
100
151368
                            },
101
151368
                        })
102
364
                        .await;
103
151368
                    let id = job.receive().await??;
104
151368
                    if wait_for_transaction <= id {
105
150406
                        break;
106
962
                    }
107
                }
108
106859
            }
109
        }
110

            
111
257265
        Ok(())
112
257265
    }
113

            
114
1386229
    pub async fn key_value_expiration_loaded(&self, database: &Arc<Cow<'static, str>>) -> bool {
115
1386255
        let statuses = fast_async_read!(self.statuses);
116
1386229
        statuses.key_value_expiration_loads.contains(database)
117
1386229
    }
118

            
119
295396
    pub async fn view_integrity_checked(
120
295396
        &self,
121
295396
        database: Arc<Cow<'static, str>>,
122
295396
        collection: CollectionName,
123
295396
        view_name: ViewName,
124
295396
    ) -> bool {
125
295396
        let statuses = fast_async_read!(self.statuses);
126
295396
        statuses
127
295396
            .completed_integrity_checks
128
295396
            .contains(&(database, collection.clone(), view_name))
129
295396
    }
130

            
131
295396
    pub async fn spawn_integrity_check(
132
295396
        &self,
133
295396
        view: &dyn view::Serialized,
134
295396
        database: &Database,
135
295396
    ) -> Result<Option<Handle<OptionalViewMapHandle, Error>>, crate::Error> {
136
295396
        let view_name = view.view_name();
137
295396
        if !self
138
295396
            .view_integrity_checked(
139
295396
                database.data.name.clone(),
140
295396
                view.collection(),
141
295396
                view_name.clone(),
142
295396
            )
143
            .await
144
        {
145
8524
            let job = self
146
8524
                .jobs
147
8524
                .lookup_or_enqueue(IntegrityScanner {
148
8524
                    database: database.clone(),
149
8524
                    scan: IntegrityScan {
150
8524
                        database: database.data.name.clone(),
151
8524
                        view_version: view.version(),
152
8524
                        collection: view.collection(),
153
8524
                        view_name,
154
8524
                    },
155
8524
                })
156
                .await;
157
8524
            return Ok(Some(job));
158
286872
        }
159
286872

            
160
286872
        Ok(None)
161
295396
    }
162

            
163
8495
    pub async fn mark_integrity_check_complete(
164
8495
        &self,
165
8495
        database: Arc<Cow<'static, str>>,
166
8495
        collection: CollectionName,
167
8495
        view_name: ViewName,
168
8495
    ) {
169
8495
        let mut statuses = fast_async_write!(self.statuses);
170
8495
        statuses
171
8495
            .completed_integrity_checks
172
8495
            .insert((database, collection, view_name));
173
8495
    }
174

            
175
8720
    pub async fn mark_key_value_expiration_loaded(&self, database: Arc<Cow<'static, str>>) {
176
8720
        let mut statuses = fast_async_write!(self.statuses);
177
8720
        statuses.key_value_expiration_loads.insert(database);
178
8720
    }
179

            
180
142995
    pub async fn mark_view_updated(
181
142995
        &self,
182
142995
        database: Arc<Cow<'static, str>>,
183
142995
        collection: CollectionName,
184
142995
        view_name: ViewName,
185
142995
        transaction_id: u64,
186
142995
    ) {
187
143021
        let mut statuses = fast_async_write!(self.statuses);
188
142995
        statuses
189
142995
            .view_update_last_status
190
142995
            .insert((database, collection, view_name), transaction_id);
191
142995
    }
192

            
193
1386229
    pub async fn spawn_key_value_expiration_loader(
194
1386229
        &self,
195
1386229
        database: &crate::Database,
196
1386229
    ) -> Option<Handle<(), Error>> {
197
1386229
        if self.key_value_expiration_loaded(&database.data.name).await {
198
1376035
            None
199
        } else {
200
            Some(
201
10194
                self.jobs
202
10194
                    .lookup_or_enqueue(ExpirationLoader {
203
10194
                        database: database.clone(),
204
10194
                        launched_at: Timestamp::now(),
205
10194
                    })
206
                    .await,
207
            )
208
        }
209
1386229
    }
210

            
211
320
    pub async fn compact_collection(
212
320
        &self,
213
320
        database: crate::Database,
214
320
        collection_name: CollectionName,
215
320
    ) -> Result<(), Error> {
216
320
        Ok(self
217
320
            .jobs
218
320
            .lookup_or_enqueue(Compactor::collection(database, collection_name))
219
            .await
220
320
            .receive()
221
320
            .await??)
222
320
    }
223

            
224
160
    pub async fn compact_key_value_store(&self, database: crate::Database) -> Result<(), Error> {
225
160
        Ok(self
226
160
            .jobs
227
160
            .lookup_or_enqueue(Compactor::keyvalue(database))
228
            .await
229
160
            .receive()
230
160
            .await??)
231
160
    }
232

            
233
80
    pub async fn compact_database(&self, database: crate::Database) -> Result<(), Error> {
234
80
        Ok(self
235
80
            .jobs
236
80
            .lookup_or_enqueue(Compactor::database(database))
237
            .await
238
80
            .receive()
239
80
            .await??)
240
80
    }
241
}