1
use std::borrow::Cow;
2
use std::collections::hash_map::RandomState;
3
use std::collections::{BTreeMap, BTreeSet, HashSet};
4
use std::sync::Arc;
5

            
6
use bonsaidb_core::arc_bytes::serde::Bytes;
7
use bonsaidb_core::arc_bytes::{ArcBytes, OwnedBytes};
8
use bonsaidb_core::connection::Connection;
9
use bonsaidb_core::schema::view::{self, map, Serialized, ViewUpdatePolicy};
10
use bonsaidb_core::schema::{CollectionName, ViewName};
11
use easy_parallel::Parallel;
12
use nebari::io::any::AnyFile;
13
use nebari::tree::{AnyTreeRoot, CompareSwap, KeyOperation, Operation, Unversioned, Versioned};
14
use nebari::{LockedTransactionTree, Tree, UnlockedTransactionTree};
15

            
16
use crate::database::{deserialize_document, document_tree_name, Database};
17
use crate::tasks::{Job, Keyed, Task};
18
use crate::views::{
19
    view_document_map_tree_name, view_entries_tree_name, view_invalidated_docs_tree_name,
20
    EntryMapping, ViewEntry,
21
};
22
use crate::Error;
23

            
24
#[derive(Debug)]
25
pub struct Mapper {
26
    pub database: Database,
27
    pub map: Map,
28
}
29

            
30
647920
#[derive(Debug, Hash, Eq, PartialEq, Clone)]
31
pub struct Map {
32
    pub database: Arc<Cow<'static, str>>,
33
    pub collection: CollectionName,
34
    pub view_name: ViewName,
35
}
36

            
37
impl Job for Mapper {
38
    type Error = Error;
39
    type Output = u64;
40

            
41
    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
42
    #[allow(clippy::too_many_lines)]
43
210915
    fn execute(&mut self) -> Result<Self::Output, Error> {
44
210915
        let documents =
45
210915
            self.database
46
210915
                .roots()
47
210915
                .tree(self.database.collection_tree::<Versioned, _>(
48
210915
                    &self.map.collection,
49
210915
                    document_tree_name(&self.map.collection),
50
210915
                )?)?;
51

            
52
210915
        let view_entries =
53
210915
            self.database
54
210915
                .roots()
55
210915
                .tree(self.database.collection_tree::<Unversioned, _>(
56
210915
                    &self.map.collection,
57
210915
                    view_entries_tree_name(&self.map.view_name),
58
210915
                )?)?;
59

            
60
210915
        let document_map =
61
210915
            self.database
62
210915
                .roots()
63
210915
                .tree(self.database.collection_tree::<Unversioned, _>(
64
210915
                    &self.map.collection,
65
210915
                    view_document_map_tree_name(&self.map.view_name),
66
210915
                )?)?;
67

            
68
210915
        let invalidated_entries =
69
210915
            self.database
70
210915
                .roots()
71
210915
                .tree(self.database.collection_tree::<Unversioned, _>(
72
210915
                    &self.map.collection,
73
210915
                    view_invalidated_docs_tree_name(&self.map.view_name),
74
210915
                )?)?;
75

            
76
210915
        let transaction_id = self
77
210915
            .database
78
210915
            .last_transaction_id()?
79
210915
            .expect("no way to have documents without a transaction");
80
210915

            
81
210915
        let storage = self.database.clone();
82
210915
        let map_request = self.map.clone();
83
210915

            
84
210915
        map_view(
85
210915
            &invalidated_entries,
86
210915
            &document_map,
87
210915
            &documents,
88
210915
            &view_entries,
89
210915
            &storage,
90
210915
            &map_request,
91
210915
        )?;
92

            
93
210915
        self.database.storage.instance.tasks().mark_view_updated(
94
210915
            self.map.database.clone(),
95
210915
            self.map.collection.clone(),
96
210915
            self.map.view_name.clone(),
97
210915
            transaction_id,
98
210915
        );
99
210915

            
100
210915
        Ok(transaction_id)
101
210915
    }
102
}
103

            
104
210915
fn map_view(
105
210915
    invalidated_entries: &Tree<Unversioned, AnyFile>,
106
210915
    document_map: &Tree<Unversioned, AnyFile>,
107
210915
    documents: &Tree<Versioned, AnyFile>,
108
210915
    view_entries: &Tree<Unversioned, AnyFile>,
109
210915
    database: &Database,
110
210915
    map_request: &Map,
111
210915
) -> Result<(), Error> {
112
    const CHUNK_SIZE: usize = 100_000;
113
    // Only do any work if there are invalidated documents to process
114
210915
    let mut invalidated_ids = invalidated_entries
115
210915
        .get_range(&(..))?
116
210915
        .into_iter()
117
210915
        .map(|(key, _)| key)
118
210915
        .collect::<Vec<_>>();
119
230902
    while !invalidated_ids.is_empty() {
120
19987
        let transaction = database
121
19987
            .roots()
122
19987
            .transaction::<_, dyn AnyTreeRoot<AnyFile>>(&[
123
19987
                Box::new(invalidated_entries.clone()) as Box<dyn AnyTreeRoot<AnyFile>>,
124
19987
                Box::new(document_map.clone()),
125
19987
                Box::new(documents.clone()),
126
19987
                Box::new(view_entries.clone()),
127
19987
            ])?;
128
        {
129
19987
            let view = database
130
19987
                .data
131
19987
                .schema
132
19987
                .view_by_name(&map_request.view_name)
133
19987
                .unwrap();
134
19987

            
135
19987
            let document_ids = invalidated_ids
136
19987
                .drain(invalidated_ids.len().saturating_sub(CHUNK_SIZE)..)
137
19987
                .collect::<Vec<_>>();
138
19987
            let document_map = transaction.unlocked_tree(1).unwrap();
139
19987
            let documents = transaction.unlocked_tree(2).unwrap();
140
19987
            let view_entries = transaction.unlocked_tree(3).unwrap();
141
19987
            DocumentRequest {
142
19987
                document_ids: document_ids.clone(),
143
19987
                map_request,
144
19987
                database,
145
19987
                document_map,
146
19987
                documents,
147
19987
                view_entries,
148
19987
                view,
149
19987
            }
150
19987
            .map()?;
151

            
152
19987
            let mut invalidated_entries = transaction.tree::<Unversioned>(0).unwrap();
153
19987
            invalidated_entries.modify(document_ids, nebari::tree::Operation::Remove)?;
154
        }
155
19987
        transaction.commit()?;
156
    }
157

            
158
210915
    Ok(())
159
210915
}
160

            
161
pub struct DocumentRequest<'a> {
162
    pub document_ids: Vec<ArcBytes<'static>>,
163
    pub map_request: &'a Map,
164
    pub database: &'a Database,
165

            
166
    pub document_map: &'a UnlockedTransactionTree<AnyFile>,
167
    pub documents: &'a UnlockedTransactionTree<AnyFile>,
168
    pub view_entries: &'a UnlockedTransactionTree<AnyFile>,
169
    pub view: &'a dyn Serialized,
170
}
171

            
172
type DocumentIdPayload = (ArcBytes<'static>, Option<ArcBytes<'static>>);
173
type BatchPayload = (Vec<ArcBytes<'static>>, flume::Receiver<DocumentIdPayload>);
174

            
175
impl<'a> DocumentRequest<'a> {
176
460982
    fn generate_batches(
177
460982
        batch_sender: flume::Sender<BatchPayload>,
178
460982
        document_ids: &[ArcBytes<'static>],
179
460982
        documents: &UnlockedTransactionTree<AnyFile>,
180
460982
    ) -> Result<(), Error> {
181
460982
        // Generate batches
182
460982
        let mut documents = documents.lock::<Versioned>();
183
460982
        for chunk in document_ids.chunks(1024) {
184
460982
            let (document_id_sender, document_id_receiver) = flume::bounded(chunk.len());
185
460982
            batch_sender
186
460982
                .send((chunk.to_vec(), document_id_receiver))
187
460982
                .unwrap();
188
460982
            let mut documents = documents.get_multiple(chunk.iter().map(ArcBytes::as_slice))?;
189
460982
            documents.sort_by(|a, b| a.0.cmp(&b.0));
190

            
191
547174
            for document_id in chunk.iter().rev() {
192
547174
                let document = documents
193
547174
                    .last()
194
547174
                    .map_or(false, |(key, _)| (key == document_id))
195
547174
                    .then(|| documents.pop().unwrap().1);
196
547174

            
197
547174
                document_id_sender
198
547174
                    .send((document_id.clone(), document))
199
547174
                    .unwrap();
200
547174
            }
201

            
202
460982
            drop(document_id_sender);
203
        }
204
460982
        drop(batch_sender);
205
460982
        Ok(())
206
460982
    }
207

            
208
460982
    fn map_batches(
209
460982
        batch_receiver: &flume::Receiver<BatchPayload>,
210
460982
        mapped_sender: flume::Sender<Batch>,
211
460982
        view: &dyn Serialized,
212
460982
        parallelization: usize,
213
460982
    ) -> Result<(), Error> {
214
        // Process batches
215
921964
        while let Ok((document_ids, document_id_receiver)) = batch_receiver.recv() {
216
460982
            let mut batch = Batch {
217
460982
                document_ids,
218
460982
                ..Batch::default()
219
460982
            };
220
1843928
            for result in Parallel::new()
221
1843928
                .each(1..=parallelization, |_| -> Result<_, Error> {
222
1843928
                    let mut results = Vec::new();
223
2391102
                    while let Ok((document_id, document)) = document_id_receiver.recv() {
224
547174
                        let map_result = if let Some(document) = document {
225
506618
                            let document = deserialize_document(&document)?;
226

            
227
                            // Call the schema map function
228
506618
                            view.map(&document).map_err(bonsaidb_core::Error::from)?
229
                        } else {
230
                            // Get multiple didn't return this document ID.
231
40556
                            Vec::new()
232
                        };
233
547174
                        let keys: HashSet<OwnedBytes> = map_result
234
547174
                            .iter()
235
547174
                            .map(|map| OwnedBytes::from(map.key.as_slice()))
236
547174
                            .collect();
237
547174
                        let new_keys = ArcBytes::from(bincode::serialize(&keys)?);
238

            
239
547174
                        results.push((document_id, new_keys, keys, map_result));
240
                    }
241

            
242
1843928
                    Ok(results)
243
1843928
                })
244
460982
                .run()
245
            {
246
1843928
                for (document_id, new_keys, keys, map_result) in result? {
247
1054149
                    for key in &keys {
248
506975
                        batch.all_keys.insert(key.0.clone());
249
506975
                    }
250
547174
                    batch.document_maps.insert(document_id.clone(), new_keys);
251
547174
                    batch.document_keys.insert(document_id.clone(), keys);
252
1054149
                    for mapping in map_result {
253
506975
                        let key_mappings = batch
254
506975
                            .new_mappings
255
506975
                            .entry(ArcBytes::from(mapping.key.to_vec()))
256
506975
                            .or_default();
257
506975
                        key_mappings.push(mapping);
258
506975
                    }
259
                }
260
            }
261
460982
            mapped_sender.send(batch).unwrap();
262
        }
263
460982
        drop(mapped_sender);
264
460982
        Ok(())
265
460982
    }
266

            
267
460982
    fn update_document_map(
268
460982
        document_ids: Vec<ArcBytes<'static>>,
269
460982
        document_map: &mut LockedTransactionTree<'_, Unversioned, AnyFile>,
270
460982
        document_maps: &BTreeMap<ArcBytes<'static>, ArcBytes<'static>>,
271
460982
        mut document_keys: BTreeMap<ArcBytes<'static>, HashSet<OwnedBytes>>,
272
460982
        all_keys: &mut BTreeSet<ArcBytes<'static>>,
273
460982
    ) -> Result<BTreeMap<ArcBytes<'static>, HashSet<ArcBytes<'static>>>, Error> {
274
460982
        // We need to store a record of all the mappings this document produced.
275
460982
        let mut maps_to_clear = Vec::new();
276
460982
        document_map.modify(
277
460982
            document_ids,
278
547174
            nebari::tree::Operation::CompareSwap(CompareSwap::new(&mut |key, value| {
279
547174
                if let Some(existing_map) = value {
280
80644
                    maps_to_clear.push((key.to_owned(), existing_map));
281
466530
                }
282
547174
                let new_map = document_maps.get(key).unwrap();
283
547174
                KeyOperation::Set(new_map.clone())
284
547174
            })),
285
460982
        )?;
286
460982
        let mut view_entries_to_clean = BTreeMap::new();
287
541626
        for (document_id, existing_map) in maps_to_clear {
288
80644
            let existing_keys = bincode::deserialize::<HashSet<OwnedBytes>>(&existing_map)?;
289
80644
            let new_keys = document_keys.remove(&document_id).unwrap();
290
80644
            for key in existing_keys.difference(&new_keys) {
291
42104
                all_keys.insert(key.clone().0);
292
42104
                let key_documents = view_entries_to_clean
293
42104
                    .entry(key.clone().0)
294
42104
                    .or_insert_with(HashSet::<_, RandomState>::default);
295
42104
                key_documents.insert(document_id.clone());
296
42104
            }
297
        }
298
460982
        Ok(view_entries_to_clean)
299
460982
    }
300

            
301
460982
    fn update_view_entries(
302
460982
        view: &dyn Serialized,
303
460982
        map_request: &Map,
304
460982
        view_entries: &mut LockedTransactionTree<'_, Unversioned, AnyFile>,
305
460982
        all_keys: BTreeSet<ArcBytes<'static>>,
306
460982
        view_entries_to_clean: BTreeMap<ArcBytes<'static>, HashSet<ArcBytes<'static>>>,
307
460982
        new_mappings: BTreeMap<ArcBytes<'static>, Vec<map::Serialized>>,
308
460982
    ) -> Result<(), Error> {
309
460982
        let mut updater = ViewEntryUpdater {
310
460982
            view,
311
460982
            map_request,
312
460982
            view_entries_to_clean,
313
460982
            new_mappings,
314
460982
            result: Ok(()),
315
460982
            has_reduce: true,
316
460982
        };
317
460982
        view_entries
318
460982
            .modify(
319
460982
                all_keys.into_iter().collect(),
320
504171
                Operation::CompareSwap(CompareSwap::new(&mut |key, view_entries| {
321
504171
                    updater.compare_swap_view_entry(key, view_entries)
322
504171
                })),
323
460982
            )
324
460982
            .map_err(Error::from)
325
460982
            .and(updater.result)
326
460982
    }
327

            
328
460982
    fn save_mappings(
329
460982
        mapped_receiver: &flume::Receiver<Batch>,
330
460982
        view: &dyn Serialized,
331
460982
        map_request: &Map,
332
460982
        document_map: &mut LockedTransactionTree<'_, Unversioned, AnyFile>,
333
460982
        view_entries: &mut LockedTransactionTree<'_, Unversioned, AnyFile>,
334
460982
    ) -> Result<(), Error> {
335
        while let Ok(Batch {
336
460982
            document_ids,
337
460982
            document_maps,
338
460982
            document_keys,
339
460982
            new_mappings,
340
460982
            mut all_keys,
341
921262
        }) = mapped_receiver.recv()
342
        {
343
460982
            let view_entries_to_clean = Self::update_document_map(
344
460982
                document_ids,
345
460982
                document_map,
346
460982
                &document_maps,
347
460982
                document_keys,
348
460982
                &mut all_keys,
349
460982
            )?;
350

            
351
460982
            Self::update_view_entries(
352
460982
                view,
353
460982
                map_request,
354
460982
                view_entries,
355
460982
                all_keys,
356
460982
                view_entries_to_clean,
357
460982
                new_mappings,
358
460982
            )?;
359
        }
360
460280
        Ok(())
361
460982
    }
362

            
363
460982
    pub fn map(&mut self) -> Result<(), Error> {
364
460982
        let (batch_sender, batch_receiver) = flume::bounded(1);
365
460982
        let (mapped_sender, mapped_receiver) = flume::bounded(1);
366

            
367
1382946
        for result in Parallel::new()
368
460982
            .add(|| Self::generate_batches(batch_sender, &self.document_ids, self.documents))
369
460982
            .add(|| {
370
460982
                Self::map_batches(
371
460982
                    &batch_receiver,
372
460982
                    mapped_sender,
373
460982
                    self.view,
374
460982
                    self.database.storage().parallelization(),
375
460982
                )
376
460982
            })
377
460982
            .add(|| {
378
460982
                let mut document_map = self.document_map.lock();
379
460982
                let mut view_entries = self.view_entries.lock();
380
460982
                Self::save_mappings(
381
460982
                    &mapped_receiver,
382
460982
                    self.view,
383
460982
                    self.map_request,
384
460982
                    &mut document_map,
385
460982
                    &mut view_entries,
386
460982
                )
387
460982
            })
388
460982
            .run()
389
        {
390
1382946
            result?;
391
        }
392

            
393
460280
        Ok(())
394
460982
    }
395
}
396

            
397
460982
#[derive(Default)]
398
struct Batch {
399
    document_ids: Vec<ArcBytes<'static>>,
400
    document_maps: BTreeMap<ArcBytes<'static>, ArcBytes<'static>>,
401
    document_keys: BTreeMap<ArcBytes<'static>, HashSet<OwnedBytes>>,
402
    new_mappings: BTreeMap<ArcBytes<'static>, Vec<map::Serialized>>,
403
    all_keys: BTreeSet<ArcBytes<'static>>,
404
}
405

            
406
impl Keyed<Task> for Mapper {
407
226090
    fn key(&self) -> Task {
408
226090
        Task::ViewMap(self.map.clone())
409
226090
    }
410
}
411

            
412
struct ViewEntryUpdater<'a> {
413
    view: &'a dyn Serialized,
414
    map_request: &'a Map,
415
    view_entries_to_clean: BTreeMap<ArcBytes<'static>, HashSet<ArcBytes<'static>>>,
416
    new_mappings: BTreeMap<ArcBytes<'static>, Vec<map::Serialized>>,
417
    result: Result<(), Error>,
418
    has_reduce: bool,
419
}
420

            
421
impl<'a> ViewEntryUpdater<'a> {
422
504171
    fn compare_swap_view_entry(
423
504171
        &mut self,
424
504171
        key: &ArcBytes<'_>,
425
504171
        view_entries: Option<ArcBytes<'static>>,
426
504171
    ) -> KeyOperation<ArcBytes<'static>> {
427
504171
        let mut view_entry = view_entries
428
504171
            .and_then(|view_entries| bincode::deserialize::<ViewEntry>(&view_entries).ok())
429
504171
            .unwrap_or_else(|| ViewEntry {
430
193697
                key: Bytes::from(key.to_vec()),
431
193697
                view_version: self.view.version(),
432
193697
                mappings: vec![],
433
193697
                reduced_value: Bytes::default(),
434
504171
            });
435
504171
        let key = key.to_owned();
436
504171
        if let Some(document_ids) = self.view_entries_to_clean.remove(&key) {
437
39828
            view_entry
438
39828
                .mappings
439
43132
                .retain(|m| !document_ids.contains(m.source.id.as_ref()));
440
39828

            
441
39828
            if view_entry.mappings.is_empty() && !self.new_mappings.contains_key(&key[..]) {
442
38796
                return KeyOperation::Remove;
443
1032
            } else if self.has_reduce {
444
1032
                let mappings = view_entry
445
1032
                    .mappings
446
1032
                    .iter()
447
1032
                    .map(|m| (&key[..], m.value.as_slice()))
448
1032
                    .collect::<Vec<_>>();
449
1032

            
450
1032
                match self.view.reduce(&mappings, false) {
451
1032
                    Ok(reduced) => {
452
1032
                        view_entry.reduced_value = Bytes::from(reduced);
453
1032
                    }
454
                    Err(view::Error::Core(bonsaidb_core::Error::ReduceUnimplemented)) => {
455
                        self.has_reduce = false;
456
                    }
457
                    Err(other) => {
458
                        self.result = Err(Error::from(other));
459
                        return KeyOperation::Skip;
460
                    }
461
                }
462
            }
463
464343
        }
464

            
465
465375
        if let Some(new_mappings) = self.new_mappings.remove(&key[..]) {
466
970764
            for map::Serialized { source, value, .. } in new_mappings {
467
                // Before altering any data, verify that the key is unique if this is a unique view.
468
506975
                if self.view.update_policy() == ViewUpdatePolicy::Unique
469
104369
                    && !view_entry.mappings.is_empty()
470
1654
                    && view_entry.mappings[0].source.id != source.id
471
                {
472
702
                    self.result = Err(Error::Core(bonsaidb_core::Error::UniqueKeyViolation {
473
702
                        view: self.map_request.view_name.clone(),
474
702
                        conflicting_document: Box::new(source),
475
702
                        existing_document: Box::new(view_entry.mappings[0].source.clone()),
476
702
                    }));
477
702
                    return KeyOperation::Skip;
478
506273
                }
479
506273
                let entry_mapping = EntryMapping { source, value };
480
506273

            
481
506273
                // attempt to update an existing
482
506273
                // entry for this document, if
483
506273
                // present
484
506273
                let mut found = false;
485
74912533
                for mapping in &mut view_entry.mappings {
486
74445096
                    if mapping.source.id == entry_mapping.source.id {
487
38836
                        found = true;
488
38836
                        mapping.source.revision = entry_mapping.source.revision;
489
38836
                        mapping.value = entry_mapping.value.clone();
490
38836
                        break;
491
74406260
                    }
492
                }
493

            
494
                // If an existing mapping wasn't
495
                // found, add it
496
506273
                if !found {
497
467437
                    view_entry.mappings.push(entry_mapping);
498
467437
                }
499
            }
500

            
501
            // There was a choice to be made here of whether to call
502
            // reduce()  with all of the existing values, or call it with
503
            // rereduce=true passing only the new value and the old stored
504
            // value. In this implementation, it's technically less
505
            // efficient, but we can guarantee that every value has only
506
            // been reduced once, and potentially re-reduced a single-time.
507
            // If we constantly try to update the value to optimize the size
508
            // of `mappings`, the fear is that the value computed may lose
509
            // precision in some contexts over time. Thus, the decision was
510
            // made to always call reduce() with all the mappings within a
511
            // single ViewEntry.
512
463789
            if self.has_reduce {
513
463489
                let mappings = view_entry
514
463489
                    .mappings
515
463489
                    .iter()
516
74784361
                    .map(|m| (key.as_slice(), m.value.as_slice()))
517
463489
                    .collect::<Vec<_>>();
518
463489

            
519
463489
                match self.view.reduce(&mappings, false) {
520
356773
                    Ok(reduced) => {
521
356773
                        view_entry.reduced_value = Bytes::from(reduced);
522
356773
                    }
523
106716
                    Err(view::Error::Core(bonsaidb_core::Error::ReduceUnimplemented)) => {
524
106716
                        self.has_reduce = false;
525
106716
                    }
526
                    Err(other) => {
527
                        self.result = Err(Error::from(other));
528
                        return KeyOperation::Skip;
529
                    }
530
                }
531
300
            }
532
884
        }
533

            
534
464673
        let value = bincode::serialize(&view_entry).unwrap();
535
464673
        KeyOperation::Set(ArcBytes::from(value))
536
504171
    }
537
}