1
use std::{
2
    borrow::Cow,
3
    collections::HashSet,
4
    ops::{Deref, DerefMut},
5
    sync::Arc,
6
};
7

            
8
use async_trait::async_trait;
9
use bonsaidb_core::{
10
    arc_bytes::{serde::Bytes, ArcBytes},
11
    connection::Connection,
12
    document::{DocumentId, Header},
13
    schema::{
14
        view::{self, map, Serialized},
15
        CollectionName, ViewName,
16
    },
17
};
18
use nebari::{
19
    io::any::AnyFile,
20
    tree::{AnyTreeRoot, Unversioned, Versioned},
21
    ExecutingTransaction, Tree,
22
};
23
use serde::{Deserialize, Serialize};
24

            
25
use crate::{
26
    database::{deserialize_document, document_tree_name, Database},
27
    jobs::{Job, Keyed},
28
    tasks::Task,
29
    views::{
30
        view_document_map_tree_name, view_entries_tree_name, view_invalidated_docs_tree_name,
31
        view_omitted_docs_tree_name, EntryMapping, ViewEntry,
32
    },
33
    Error,
34
};
35

            
36
#[derive(Debug)]
37
pub struct Mapper {
38
    pub database: Database,
39
    pub map: Map,
40
}
41

            
42
399463
#[derive(Debug, Hash, Eq, PartialEq, Clone)]
43
pub struct Map {
44
    pub database: Arc<Cow<'static, str>>,
45
    pub collection: CollectionName,
46
    pub view_name: ViewName,
47
}
48

            
49
#[async_trait]
50
impl Job for Mapper {
51
    type Output = u64;
52
    type Error = Error;
53

            
54
388374
    #[cfg_attr(feature = "tracing", tracing::instrument)]
55
    #[allow(clippy::too_many_lines)]
56
129458
    async fn execute(&mut self) -> Result<Self::Output, Error> {
57
129458
        let documents =
58
129458
            self.database
59
129458
                .roots()
60
129458
                .tree(self.database.collection_tree::<Versioned, _>(
61
129458
                    &self.map.collection,
62
129458
                    document_tree_name(&self.map.collection),
63
129458
                )?)?;
64

            
65
129458
        let view_entries =
66
129458
            self.database
67
129458
                .roots()
68
129458
                .tree(self.database.collection_tree::<Unversioned, _>(
69
129458
                    &self.map.collection,
70
129458
                    view_entries_tree_name(&self.map.view_name),
71
129458
                )?)?;
72

            
73
129458
        let document_map =
74
129458
            self.database
75
129458
                .roots()
76
129458
                .tree(self.database.collection_tree::<Unversioned, _>(
77
129458
                    &self.map.collection,
78
129458
                    view_document_map_tree_name(&self.map.view_name),
79
129458
                )?)?;
80

            
81
129458
        let invalidated_entries =
82
129458
            self.database
83
129458
                .roots()
84
129458
                .tree(self.database.collection_tree::<Unversioned, _>(
85
129458
                    &self.map.collection,
86
129458
                    view_invalidated_docs_tree_name(&self.map.view_name),
87
129458
                )?)?;
88

            
89
129458
        let omitted_entries =
90
129458
            self.database
91
129458
                .roots()
92
129458
                .tree(self.database.collection_tree::<Unversioned, _>(
93
129458
                    &self.map.collection,
94
129458
                    view_omitted_docs_tree_name(&self.map.view_name),
95
129458
                )?)?;
96
129458
        let transaction_id = self
97
129458
            .database
98
129458
            .last_transaction_id()
99
            .await?
100
129458
            .expect("no way to have documents without a transaction");
101
129458

            
102
129458
        let storage = self.database.clone();
103
129458
        let map_request = self.map.clone();
104
129458

            
105
129458
        tokio::task::spawn_blocking(move || {
106
129458
            map_view(
107
129458
                &invalidated_entries,
108
129458
                &document_map,
109
129458
                &documents,
110
129458
                &omitted_entries,
111
129458
                &view_entries,
112
129458
                &storage,
113
129458
                &map_request,
114
129458
            )
115
129458
        })
116
81981
        .await??;
117

            
118
129458
        self.database
119
129458
            .data
120
129458
            .storage
121
129458
            .tasks()
122
129458
            .mark_view_updated(
123
129458
                self.map.database.clone(),
124
129458
                self.map.collection.clone(),
125
129458
                self.map.view_name.clone(),
126
129458
                transaction_id,
127
129458
            )
128
150
            .await;
129

            
130
129458
        Ok(transaction_id)
131
258916
    }
132
}
133

            
134
129458
fn map_view(
135
129458
    invalidated_entries: &Tree<Unversioned, AnyFile>,
136
129458
    document_map: &Tree<Unversioned, AnyFile>,
137
129458
    documents: &Tree<Versioned, AnyFile>,
138
129458
    omitted_entries: &Tree<Unversioned, AnyFile>,
139
129458
    view_entries: &Tree<Unversioned, AnyFile>,
140
129458
    database: &Database,
141
129458
    map_request: &Map,
142
129458
) -> Result<(), Error> {
143
    // Only do any work if there are invalidated documents to process
144
129458
    let invalidated_ids = invalidated_entries
145
129458
        .get_range(&(..))?
146
129458
        .into_iter()
147
129458
        .map(|(key, _)| key)
148
129458
        .collect::<Vec<_>>();
149
129458
    if !invalidated_ids.is_empty() {
150
9730
        let mut transaction = database
151
9730
            .roots()
152
9730
            .transaction::<_, dyn AnyTreeRoot<AnyFile>>(&[
153
9730
                Box::new(invalidated_entries.clone()) as Box<dyn AnyTreeRoot<AnyFile>>,
154
9730
                Box::new(document_map.clone()),
155
9730
                Box::new(documents.clone()),
156
9730
                Box::new(omitted_entries.clone()),
157
9730
                Box::new(view_entries.clone()),
158
9730
            ])?;
159
9730
        let view = database
160
9730
            .data
161
9730
            .schema
162
9730
            .view_by_name(&map_request.view_name)
163
9730
            .unwrap();
164
57035
        for document_id in &invalidated_ids {
165
47305
            DocumentRequest {
166
47305
                document_id,
167
47305
                map_request,
168
47305
                database,
169
47305
                transaction: &mut transaction,
170
47305
                document_map_index: 1,
171
47305
                documents_index: 2,
172
47305
                omitted_entries_index: 3,
173
47305
                view_entries_index: 4,
174
47305
                view,
175
47305
            }
176
47305
            .map()?;
177
47305
            let invalidated_entries = transaction.tree::<Unversioned>(0).unwrap();
178
47305
            invalidated_entries.remove(document_id)?;
179
        }
180
9730
        transaction.commit()?;
181
119728
    }
182

            
183
129458
    Ok(())
184
129458
}
185

            
186
pub struct DocumentRequest<'a> {
187
    pub document_id: &'a [u8],
188
    pub map_request: &'a Map,
189
    pub database: &'a Database,
190

            
191
    pub transaction: &'a mut ExecutingTransaction<AnyFile>,
192
    pub document_map_index: usize,
193
    pub documents_index: usize,
194
    pub omitted_entries_index: usize,
195
    pub view_entries_index: usize,
196
    pub view: &'a dyn Serialized,
197
}
198

            
199
impl<'a> DocumentRequest<'a> {
200
119994
    pub fn map(&mut self) -> Result<(), Error> {
201
119994
        let documents = self
202
119994
            .transaction
203
119994
            .tree::<Versioned>(self.documents_index)
204
119994
            .unwrap();
205
119994
        let (doc_still_exists, map_result) =
206
119994
            if let Some(document) = documents.get(self.document_id)? {
207
107186
                let document = deserialize_document(&document)?;
208

            
209
                // Call the schema map function
210
                (
211
                    true,
212
107186
                    self.view
213
107186
                        .map(&document)
214
107186
                        .map_err(bonsaidb_core::Error::from)?,
215
                )
216
            } else {
217
12808
                (false, Vec::new())
218
            };
219

            
220
        // We need to store a record of all the mappings this document produced.
221
119994
        let keys: HashSet<Cow<'_, [u8]>> = map_result
222
119994
            .iter()
223
119994
            .map(|map| Cow::Borrowed(map.key.as_slice()))
224
119994
            .collect();
225
119994
        let encrypted_entry = bincode::serialize(&keys)?;
226
119994
        let document_map = self
227
119994
            .transaction
228
119994
            .tree::<Unversioned>(self.document_map_index)
229
119994
            .unwrap();
230
14310
        if let Some(existing_map) =
231
119994
            document_map.replace(self.document_id.to_vec(), encrypted_entry)?
232
        {
233
            // This document previously had been mapped. We will update any keys
234
            // that match, but we need to remove any that are no longer present.
235
14310
            self.remove_existing_view_entries_for_keys(&keys, &existing_map)?;
236
105684
        }
237

            
238
119994
        if map_result.is_empty() {
239
13041
            self.omit_document(doc_still_exists)?;
240
        } else {
241
106953
            let mut has_reduce = true;
242
213856
            for map::Serialized { source, key, value } in map_result {
243
107184
                has_reduce = self.save_mapping(source, &key, value, has_reduce)?;
244
            }
245
        }
246

            
247
119713
        Ok(())
248
119994
    }
249

            
250
13041
    fn omit_document(&mut self, doc_still_exists: bool) -> Result<(), Error> {
251
13041
        if doc_still_exists {
252
233
            let omitted_entries = self
253
233
                .transaction
254
233
                .tree::<Unversioned>(self.omitted_entries_index)
255
233
                .unwrap();
256
233
            omitted_entries.set(self.document_id.to_vec(), b"")?;
257
12808
        }
258
13041
        Ok(())
259
13041
    }
260

            
261
180994
    fn load_entry_for_key(&mut self, key: &[u8]) -> Result<Option<ViewEntryCollection>, Error> {
262
180994
        load_entry_for_key(key, |key| {
263
180994
            self.transaction
264
180994
                .tree::<Unversioned>(self.view_entries_index)
265
180994
                .unwrap()
266
180994
                .get(key)
267
180994
                .map_err(Error::from)
268
180994
        })
269
180994
    }
270

            
271
106878
    fn save_entry_for_key(&mut self, key: &[u8], entry: &ViewEntryCollection) -> Result<(), Error> {
272
106878
        let bytes = bincode::serialize(entry)?;
273
106878
        let view_entries = self
274
106878
            .transaction
275
106878
            .tree::<Unversioned>(self.view_entries_index)
276
106878
            .unwrap();
277
106878
        view_entries.set(key.to_vec(), bytes)?;
278
106903
        Ok(())
279
106903
    }
280

            
281
107184
    fn save_mapping(
282
107184
        &mut self,
283
107184
        source: Header,
284
107184
        key: &[u8],
285
107184
        value: Bytes,
286
107184
        mut has_reduce: bool,
287
107184
    ) -> Result<bool, Error> {
288
107184
        // Before altering any data, verify that the key is unique if this is a unique view.
289
107184
        if self.view.unique() {
290
60743
            if let Ok(Some(existing_entry)) = self.load_entry_for_key(key) {
291
1245
                if existing_entry.mappings[0].source.id != source.id {
292
281
                    return Err(Error::Core(bonsaidb_core::Error::UniqueKeyViolation {
293
281
                        view: self.map_request.view_name.clone(),
294
281
                        conflicting_document: Box::new(source),
295
281
                        existing_document: Box::new(existing_entry.mappings[0].source.clone()),
296
281
                    }));
297
964
                }
298
59498
            }
299
46441
        }
300
106903
        let omitted_entries = self
301
106903
            .transaction
302
106903
            .tree::<Unversioned>(self.omitted_entries_index)
303
106903
            .unwrap();
304
106903
        omitted_entries.remove(self.document_id)?;
305

            
306
106903
        let entry_mapping = EntryMapping { source, value };
307

            
308
        // Add a new ViewEntry or update an existing
309
        // ViewEntry for the key given
310
106903
        let view_entry = if let Ok(Some(mut entry)) = self.load_entry_for_key(key) {
311
            // attempt to update an existing
312
            // entry for this document, if
313
            // present
314
20928
            let mut found = false;
315
45753
            for mapping in &mut entry.mappings {
316
45753
                if mapping.source.id == entry_mapping.source.id {
317
1116
                    found = true;
318
1116
                    mapping.value = entry_mapping.value.clone();
319
1116
                    break;
320
44637
                }
321
            }
322

            
323
            // If an existing mapping wasn't
324
            // found, add it
325
20928
            if !found {
326
19812
                entry.mappings.push(entry_mapping);
327
19816
            }
328

            
329
            // There was a choice to be made here of whether to call
330
            // reduce()  with all of the existing values, or call it with
331
            // rereduce=true passing only the new value and the old stored
332
            // value. In this implementation, it's technically less
333
            // efficient, but we can guarantee that every value has only
334
            // been reduced once, and potentially re-reduced a single-time.
335
            // If we constantly try to update the value to optimize the size
336
            // of `mappings`, the fear is that the value computed may lose
337
            // precision in some contexts over time. Thus, the decision was
338
            // made to always call reduce() with all the mappings within a
339
            // single ViewEntry.
340
20928
            if has_reduce {
341
20928
                let mappings = entry
342
20928
                    .mappings
343
20928
                    .iter()
344
65565
                    .map(|m| (key, m.value.as_slice()))
345
20928
                    .collect::<Vec<_>>();
346
20928

            
347
20928
                match self.view.reduce(&mappings, false) {
348
19889
                    Ok(reduced) => {
349
19889
                        entry.reduced_value = Bytes::from(reduced);
350
19889
                    }
351
1039
                    Err(view::Error::Core(bonsaidb_core::Error::ReduceUnimplemented)) => {
352
1039
                        has_reduce = false;
353
1039
                    }
354
                    Err(other) => return Err(Error::from(other)),
355
                }
356
            }
357

            
358
20928
            entry
359
        } else {
360
85975
            let reduced_value = if has_reduce {
361
85975
                match self.view.reduce(&[(key, &entry_mapping.value)], false) {
362
24651
                    Ok(reduced_value) => reduced_value,
363
                    Err(view::Error::Core(bonsaidb_core::Error::ReduceUnimplemented)) => {
364
61299
                        has_reduce = false;
365
61299
                        Vec::default()
366
                    }
367
                    Err(other) => return Err(Error::from(other)),
368
                }
369
            } else {
370
                Vec::default()
371
            };
372
85950
            ViewEntryCollection::from(ViewEntry {
373
85950
                key: Bytes::from(key),
374
85950
                view_version: self.view.version(),
375
85950
                mappings: vec![entry_mapping],
376
85950
                reduced_value: Bytes::from(reduced_value),
377
85950
            })
378
        };
379
106878
        self.save_entry_for_key(key, &view_entry)?;
380
106903
        Ok(has_reduce)
381
107184
    }
382

            
383
14310
    fn remove_existing_view_entries_for_keys(
384
14310
        &mut self,
385
14310
        keys: &HashSet<Cow<'_, [u8]>>,
386
14310
        existing_map: &[u8],
387
14310
    ) -> Result<(), Error> {
388
14310
        let existing_keys = bincode::deserialize::<HashSet<Cow<'_, [u8]>>>(existing_map)?;
389
14310
        let mut has_reduce = true;
390
14310
        for key_to_remove_from in existing_keys.difference(keys) {
391
13348
            if let Ok(Some(mut entry_collection)) = self.load_entry_for_key(key_to_remove_from) {
392
13348
                let document_id = DocumentId::try_from(self.document_id).unwrap();
393
13348
                entry_collection
394
13348
                    .mappings
395
13656
                    .retain(|m| m.source.id != document_id);
396
13348

            
397
13348
                if entry_collection.mappings.is_empty() {
398
13040
                    entry_collection.remove_active_entry();
399
13040
                    if entry_collection.is_empty() {
400
                        // Remove the key
401
13040
                        let view_entries = self
402
13040
                            .transaction
403
13040
                            .tree::<Unversioned>(self.view_entries_index)
404
13040
                            .unwrap();
405
13040
                        view_entries.remove(key_to_remove_from)?;
406
13040
                        continue;
407
                    }
408
308
                } else if has_reduce {
409
308
                    let mappings = entry_collection
410
308
                        .mappings
411
308
                        .iter()
412
308
                        .map(|m| (&key_to_remove_from[..], m.value.as_slice()))
413
308
                        .collect::<Vec<_>>();
414
308

            
415
308
                    match self.view.reduce(&mappings, false) {
416
308
                        Ok(reduced) => {
417
308
                            entry_collection.reduced_value = Bytes::from(reduced);
418
308
                        }
419
                        Err(view::Error::Core(bonsaidb_core::Error::ReduceUnimplemented)) => {
420
                            has_reduce = false;
421
                        }
422
                        Err(other) => return Err(Error::from(other)),
423
                    }
424
                }
425

            
426
308
                let value = bincode::serialize(&entry_collection)?;
427
308
                let view_entries = self
428
308
                    .transaction
429
308
                    .tree::<Unversioned>(self.view_entries_index)
430
308
                    .unwrap();
431
308
                view_entries.set(key_to_remove_from.to_vec(), value)?;
432
            }
433
        }
434

            
435
14310
        Ok(())
436
14310
    }
437
}
438

            
439
impl Keyed<Task> for Mapper {
440
140547
    fn key(&self) -> Task {
441
140547
        Task::ViewMap(self.map.clone())
442
140547
    }
443
}
444

            
445
493172
#[derive(Debug, Serialize, Deserialize)]
446
pub struct ViewEntryCollection {
447
    entries: Vec<ViewEntry>,
448
    #[serde(skip)]
449
    active_index: usize,
450
}
451

            
452
impl From<ViewEntry> for ViewEntryCollection {
453
85975
    fn from(entry: ViewEntry) -> Self {
454
85975
        Self {
455
85975
            entries: vec![entry],
456
85975
            active_index: 0,
457
85975
        }
458
85975
    }
459
}
460

            
461
impl From<ViewEntryCollection> for ViewEntry {
462
457398
    fn from(collection: ViewEntryCollection) -> Self {
463
457398
        collection
464
457398
            .entries
465
457398
            .into_iter()
466
457398
            .nth(collection.active_index)
467
457398
            .unwrap()
468
457398
    }
469
}
470

            
471
impl Deref for ViewEntryCollection {
472
    type Target = ViewEntry;
473

            
474
36110
    fn deref(&self) -> &Self::Target {
475
36110
        &self.entries[self.active_index]
476
36110
    }
477
}
478

            
479
impl DerefMut for ViewEntryCollection {
480
74285
    fn deref_mut(&mut self) -> &mut Self::Target {
481
74285
        &mut self.entries[self.active_index]
482
74285
    }
483
}
484

            
485
impl ViewEntryCollection {
486
13040
    pub fn remove_active_entry(&mut self) {
487
13040
        self.entries.remove(self.active_index);
488
13040
    }
489

            
490
13040
    pub fn is_empty(&self) -> bool {
491
13040
        self.entries.is_empty()
492
13040
    }
493
}
494

            
495
fn load_entry_for_key<F: FnOnce(&[u8]) -> Result<Option<ArcBytes<'static>>, Error>>(
496
    key: &[u8],
497
    get_entry_fn: F,
498
) -> Result<Option<ViewEntryCollection>, Error> {
499
180994
    get_entry_fn(key)?
500
180994
        .map(|bytes| bincode::deserialize(&bytes).map_err(Error::from))
501
180994
        .transpose()
502
180994
}