1
use std::{
2
    borrow::Cow,
3
    collections::{hash_map::RandomState, BTreeMap, BTreeSet, HashSet},
4
    sync::Arc,
5
};
6

            
7
use bonsaidb_core::{
8
    arc_bytes::{serde::Bytes, ArcBytes, OwnedBytes},
9
    connection::Connection,
10
    schema::{
11
        view::{self, map, Serialized},
12
        CollectionName, ViewName,
13
    },
14
};
15
use easy_parallel::Parallel;
16
use nebari::{
17
    io::any::AnyFile,
18
    tree::{AnyTreeRoot, CompareSwap, KeyOperation, Operation, Unversioned, Versioned},
19
    LockedTransactionTree, Tree, UnlockedTransactionTree,
20
};
21

            
22
use crate::{
23
    database::{deserialize_document, document_tree_name, Database},
24
    tasks::{Job, Keyed, Task},
25
    views::{
26
        view_document_map_tree_name, view_entries_tree_name, view_invalidated_docs_tree_name,
27
        EntryMapping, ViewEntry,
28
    },
29
    Error,
30
};
31

            
32
#[derive(Debug)]
33
pub struct Mapper {
34
    pub database: Database,
35
    pub map: Map,
36
}
37

            
38
489290
#[derive(Debug, Hash, Eq, PartialEq, Clone)]
39
pub struct Map {
40
    pub database: Arc<Cow<'static, str>>,
41
    pub collection: CollectionName,
42
    pub view_name: ViewName,
43
}
44

            
45
impl Job for Mapper {
46
    type Output = u64;
47
    type Error = Error;
48

            
49
320920
    #[cfg_attr(feature = "tracing", tracing::instrument)]
50
160460
    #[allow(clippy::too_many_lines)]
51
    fn execute(&mut self) -> Result<Self::Output, Error> {
52
        let documents =
53
            self.database
54
                .roots()
55
                .tree(self.database.collection_tree::<Versioned, _>(
56
                    &self.map.collection,
57
                    document_tree_name(&self.map.collection),
58
                )?)?;
59

            
60
        let view_entries =
61
            self.database
62
                .roots()
63
                .tree(self.database.collection_tree::<Unversioned, _>(
64
                    &self.map.collection,
65
                    view_entries_tree_name(&self.map.view_name),
66
                )?)?;
67

            
68
        let document_map =
69
            self.database
70
                .roots()
71
                .tree(self.database.collection_tree::<Unversioned, _>(
72
                    &self.map.collection,
73
                    view_document_map_tree_name(&self.map.view_name),
74
                )?)?;
75

            
76
        let invalidated_entries =
77
            self.database
78
                .roots()
79
                .tree(self.database.collection_tree::<Unversioned, _>(
80
                    &self.map.collection,
81
                    view_invalidated_docs_tree_name(&self.map.view_name),
82
                )?)?;
83

            
84
        let transaction_id = self
85
            .database
86
            .last_transaction_id()?
87
            .expect("no way to have documents without a transaction");
88

            
89
        let storage = self.database.clone();
90
        let map_request = self.map.clone();
91

            
92
        map_view(
93
            &invalidated_entries,
94
            &document_map,
95
            &documents,
96
            &view_entries,
97
            &storage,
98
            &map_request,
99
        )?;
100

            
101
        self.database.storage.instance.tasks().mark_view_updated(
102
            self.map.database.clone(),
103
            self.map.collection.clone(),
104
            self.map.view_name.clone(),
105
            transaction_id,
106
        );
107

            
108
        Ok(transaction_id)
109
    }
110
}
111

            
112
160460
fn map_view(
113
160460
    invalidated_entries: &Tree<Unversioned, AnyFile>,
114
160460
    document_map: &Tree<Unversioned, AnyFile>,
115
160460
    documents: &Tree<Versioned, AnyFile>,
116
160460
    view_entries: &Tree<Unversioned, AnyFile>,
117
160460
    database: &Database,
118
160460
    map_request: &Map,
119
160460
) -> Result<(), Error> {
120
    const CHUNK_SIZE: usize = 100_000;
121
    // Only do any work if there are invalidated documents to process
122
160460
    let mut invalidated_ids = invalidated_entries
123
160460
        .get_range(&(..))?
124
160460
        .into_iter()
125
160460
        .map(|(key, _)| key)
126
160460
        .collect::<Vec<_>>();
127
174301
    while !invalidated_ids.is_empty() {
128
13841
        let transaction = database
129
13841
            .roots()
130
13841
            .transaction::<_, dyn AnyTreeRoot<AnyFile>>(&[
131
13841
                Box::new(invalidated_entries.clone()) as Box<dyn AnyTreeRoot<AnyFile>>,
132
13841
                Box::new(document_map.clone()),
133
13841
                Box::new(documents.clone()),
134
13841
                Box::new(view_entries.clone()),
135
13841
            ])?;
136
        {
137
13841
            let view = database
138
13841
                .data
139
13841
                .schema
140
13841
                .view_by_name(&map_request.view_name)
141
13841
                .unwrap();
142
13841

            
143
13841
            let document_ids = invalidated_ids
144
13841
                .drain(invalidated_ids.len().saturating_sub(CHUNK_SIZE)..)
145
13841
                .collect::<Vec<_>>();
146
13841
            let document_map = transaction.unlocked_tree(1).unwrap();
147
13841
            let documents = transaction.unlocked_tree(2).unwrap();
148
13841
            let view_entries = transaction.unlocked_tree(3).unwrap();
149
13841
            DocumentRequest {
150
13841
                document_ids: document_ids.clone(),
151
13841
                map_request,
152
13841
                database,
153
13841
                document_map,
154
13841
                documents,
155
13841
                view_entries,
156
13841
                view,
157
13841
            }
158
13841
            .map()?;
159

            
160
13841
            let mut invalidated_entries = transaction.tree::<Unversioned>(0).unwrap();
161
13841
            invalidated_entries.modify(document_ids, nebari::tree::Operation::Remove)?;
162
        }
163
13841
        transaction.commit()?;
164
    }
165

            
166
160460
    Ok(())
167
160460
}
168

            
169
pub struct DocumentRequest<'a> {
170
    pub document_ids: Vec<ArcBytes<'static>>,
171
    pub map_request: &'a Map,
172
    pub database: &'a Database,
173

            
174
    pub document_map: &'a UnlockedTransactionTree<AnyFile>,
175
    pub documents: &'a UnlockedTransactionTree<AnyFile>,
176
    pub view_entries: &'a UnlockedTransactionTree<AnyFile>,
177
    pub view: &'a dyn Serialized,
178
}
179

            
180
type DocumentIdPayload = (ArcBytes<'static>, Option<ArcBytes<'static>>);
181
type BatchPayload = (Vec<ArcBytes<'static>>, flume::Receiver<DocumentIdPayload>);
182

            
183
impl<'a> DocumentRequest<'a> {
184
114926
    fn generate_batches(
185
114926
        batch_sender: flume::Sender<BatchPayload>,
186
114926
        document_ids: &[ArcBytes<'static>],
187
114926
        documents: &UnlockedTransactionTree<AnyFile>,
188
114926
    ) -> Result<(), Error> {
189
114926
        // Generate batches
190
114926
        let mut documents = documents.lock::<Versioned>();
191
114926
        for chunk in document_ids.chunks(1024) {
192
114926
            let (document_id_sender, document_id_receiver) = flume::bounded(chunk.len());
193
114926
            batch_sender
194
114926
                .send((chunk.to_vec(), document_id_receiver))
195
114926
                .unwrap();
196
114926
            let mut documents = documents.get_multiple(chunk.iter().map(ArcBytes::as_slice))?;
197
114926
            documents.sort_by(|a, b| a.0.cmp(&b.0));
198

            
199
156554
            for document_id in chunk.iter().rev() {
200
156554
                let document = documents
201
156554
                    .last()
202
156554
                    .map_or(false, |(key, _)| (key == document_id))
203
156554
                    .then(|| documents.pop().unwrap().1);
204
156554

            
205
156554
                document_id_sender
206
156554
                    .send((document_id.clone(), document))
207
156554
                    .unwrap();
208
156554
            }
209

            
210
114926
            drop(document_id_sender);
211
        }
212
114926
        drop(batch_sender);
213
114926
        Ok(())
214
114926
    }
215

            
216
114926
    fn map_batches(
217
114926
        batch_receiver: &flume::Receiver<BatchPayload>,
218
114926
        mapped_sender: flume::Sender<Batch>,
219
114926
        view: &dyn Serialized,
220
114926
        parallelization: usize,
221
114926
    ) -> Result<(), Error> {
222
        // Process batches
223
229852
        while let Ok((document_ids, document_id_receiver)) = batch_receiver.recv() {
224
114926
            let mut batch = Batch {
225
114926
                document_ids,
226
114926
                ..Batch::default()
227
114926
            };
228
229852
            for result in Parallel::new()
229
229852
                .each(1..=parallelization, |_| -> Result<_, Error> {
230
229852
                    let mut results = Vec::new();
231
386406
                    while let Ok((document_id, document)) = document_id_receiver.recv() {
232
156554
                        let map_result = if let Some(document) = document {
233
140810
                            let document = deserialize_document(&document)?;
234

            
235
                            // Call the schema map function
236
140810
                            view.map(&document).map_err(bonsaidb_core::Error::from)?
237
                        } else {
238
                            // Get multiple didn't return this document ID.
239
15744
                            Vec::new()
240
                        };
241
156554
                        let keys: HashSet<OwnedBytes> = map_result
242
156554
                            .iter()
243
156554
                            .map(|map| OwnedBytes::from(map.key.as_slice()))
244
156554
                            .collect();
245
156554
                        let new_keys = ArcBytes::from(bincode::serialize(&keys)?);
246

            
247
156554
                        results.push((document_id, new_keys, keys, map_result));
248
                    }
249

            
250
229852
                    Ok(results)
251
229852
                })
252
114926
                .run()
253
            {
254
229852
                for (document_id, new_keys, keys, map_result) in result? {
255
297662
                    for key in &keys {
256
141108
                        batch.all_keys.insert(key.0.clone());
257
141108
                    }
258
156554
                    batch.document_maps.insert(document_id.clone(), new_keys);
259
156554
                    batch.document_keys.insert(document_id.clone(), keys);
260
297662
                    for mapping in map_result {
261
141108
                        let key_mappings = batch
262
141108
                            .new_mappings
263
141108
                            .entry(ArcBytes::from(mapping.key.to_vec()))
264
141108
                            .or_insert_with(Vec::default);
265
141108
                        key_mappings.push(mapping);
266
141108
                    }
267
                }
268
            }
269
114926
            mapped_sender.send(batch).unwrap();
270
        }
271
114926
        drop(mapped_sender);
272
114926
        Ok(())
273
114926
    }
274

            
275
114926
    fn update_document_map(
276
114926
        document_ids: Vec<ArcBytes<'static>>,
277
114926
        document_map: &mut LockedTransactionTree<'_, Unversioned, AnyFile>,
278
114926
        document_maps: &BTreeMap<ArcBytes<'static>, ArcBytes<'static>>,
279
114926
        mut document_keys: BTreeMap<ArcBytes<'static>, HashSet<OwnedBytes>>,
280
114926
        all_keys: &mut BTreeSet<ArcBytes<'static>>,
281
114926
    ) -> Result<BTreeMap<ArcBytes<'static>, HashSet<ArcBytes<'static>>>, Error> {
282
114926
        // We need to store a record of all the mappings this document produced.
283
114926
        let mut maps_to_clear = Vec::new();
284
114926
        document_map.modify(
285
114926
            document_ids,
286
114926
            nebari::tree::Operation::CompareSwap(CompareSwap::new(&mut |key, value| {
287
156554
                if let Some(existing_map) = value {
288
17253
                    maps_to_clear.push((key.to_owned(), existing_map));
289
139301
                }
290
156554
                let new_map = document_maps.get(key).unwrap();
291
156554
                KeyOperation::Set(new_map.clone())
292
156554
            })),
293
114926
        )?;
294
114926
        let mut view_entries_to_clean = BTreeMap::new();
295
132179
        for (document_id, existing_map) in maps_to_clear {
296
17253
            let existing_keys = bincode::deserialize::<HashSet<OwnedBytes>>(&existing_map)?;
297
17253
            let new_keys = document_keys.remove(&document_id).unwrap();
298
17253
            for key in existing_keys.difference(&new_keys) {
299
16613
                all_keys.insert(key.clone().0);
300
16613
                let key_documents = view_entries_to_clean
301
16613
                    .entry(key.clone().0)
302
16613
                    .or_insert_with(HashSet::<_, RandomState>::default);
303
16613
                key_documents.insert(document_id.clone());
304
16613
            }
305
        }
306
114926
        Ok(view_entries_to_clean)
307
114926
    }
308

            
309
114896
    fn update_view_entries(
310
114896
        view: &dyn Serialized,
311
114896
        map_request: &Map,
312
114896
        view_entries: &mut LockedTransactionTree<'_, Unversioned, AnyFile>,
313
114896
        all_keys: BTreeSet<ArcBytes<'static>>,
314
114896
        view_entries_to_clean: BTreeMap<ArcBytes<'static>, HashSet<ArcBytes<'static>>>,
315
114896
        new_mappings: BTreeMap<ArcBytes<'static>, Vec<map::Serialized>>,
316
114896
    ) -> Result<(), Error> {
317
114896
        let mut updater = ViewEntryUpdater {
318
114896
            view,
319
114896
            map_request,
320
114896
            view_entries_to_clean,
321
114896
            new_mappings,
322
114896
            result: Ok(()),
323
114896
            has_reduce: true,
324
114896
        };
325
114896
        view_entries
326
114896
            .modify(
327
114896
                all_keys.into_iter().collect(),
328
145673
                Operation::CompareSwap(CompareSwap::new(&mut |key, view_entries| {
329
145673
                    updater.compare_swap_view_entry(key, view_entries)
330
145673
                })),
331
114896
            )
332
114896
            .map_err(Error::from)
333
114896
            .and(updater.result)
334
114896
    }
335

            
336
114926
    fn save_mappings(
337
114926
        mapped_receiver: &flume::Receiver<Batch>,
338
114926
        view: &dyn Serialized,
339
114926
        map_request: &Map,
340
114926
        document_map: &mut LockedTransactionTree<'_, Unversioned, AnyFile>,
341
114926
        view_entries: &mut LockedTransactionTree<'_, Unversioned, AnyFile>,
342
114926
    ) -> Result<(), Error> {
343
        while let Ok(Batch {
344
114896
            document_ids,
345
114896
            document_maps,
346
114896
            document_keys,
347
114896
            new_mappings,
348
114896
            mut all_keys,
349
229294
        }) = mapped_receiver.recv()
350
        {
351
114896
            let view_entries_to_clean = Self::update_document_map(
352
114896
                document_ids,
353
114896
                document_map,
354
114896
                &document_maps,
355
114896
                document_keys,
356
114896
                &mut all_keys,
357
114896
            )?;
358

            
359
114896
            Self::update_view_entries(
360
114896
                view,
361
114896
                map_request,
362
114896
                view_entries,
363
114896
                all_keys,
364
114896
                view_entries_to_clean,
365
114896
                new_mappings,
366
114896
            )?;
367
        }
368
114398
        Ok(())
369
114926
    }
370

            
371
114926
    pub fn map(&mut self) -> Result<(), Error> {
372
114926
        let (batch_sender, batch_receiver) = flume::bounded(1);
373
114926
        let (mapped_sender, mapped_receiver) = flume::bounded(1);
374

            
375
344778
        for result in Parallel::new()
376
114926
            .add(|| Self::generate_batches(batch_sender, &self.document_ids, self.documents))
377
114926
            .add(|| {
378
114926
                Self::map_batches(
379
114926
                    &batch_receiver,
380
114926
                    mapped_sender,
381
114926
                    self.view,
382
114926
                    self.database.storage().parallelization(),
383
114926
                )
384
114926
            })
385
114926
            .add(|| {
386
114926
                let mut document_map = self.document_map.lock();
387
114926
                let mut view_entries = self.view_entries.lock();
388
114926
                Self::save_mappings(
389
114926
                    &mapped_receiver,
390
114926
                    self.view,
391
114926
                    self.map_request,
392
114926
                    &mut document_map,
393
114926
                    &mut view_entries,
394
114926
                )
395
114926
            })
396
114926
            .run()
397
        {
398
344778
            result?;
399
        }
400

            
401
114398
        Ok(())
402
114926
    }
403
}
404

            
405
114926
#[derive(Default)]
406
struct Batch {
407
    document_ids: Vec<ArcBytes<'static>>,
408
    document_maps: BTreeMap<ArcBytes<'static>, ArcBytes<'static>>,
409
    document_keys: BTreeMap<ArcBytes<'static>, HashSet<OwnedBytes>>,
410
    new_mappings: BTreeMap<ArcBytes<'static>, Vec<map::Serialized>>,
411
    all_keys: BTreeSet<ArcBytes<'static>>,
412
}
413

            
414
impl Keyed<Task> for Mapper {
415
168370
    fn key(&self) -> Task {
416
168370
        Task::ViewMap(self.map.clone())
417
168370
    }
418
}
419

            
420
struct ViewEntryUpdater<'a> {
421
    view: &'a dyn Serialized,
422
    map_request: &'a Map,
423
    view_entries_to_clean: BTreeMap<ArcBytes<'static>, HashSet<ArcBytes<'static>>>,
424
    new_mappings: BTreeMap<ArcBytes<'static>, Vec<map::Serialized>>,
425
    result: Result<(), Error>,
426
    has_reduce: bool,
427
}
428

            
429
impl<'a> ViewEntryUpdater<'a> {
430
145673
    fn compare_swap_view_entry(
431
145673
        &mut self,
432
145673
        key: &ArcBytes<'_>,
433
145673
        view_entries: Option<ArcBytes<'static>>,
434
145673
    ) -> KeyOperation<ArcBytes<'static>> {
435
145673
        let mut view_entry = view_entries
436
145673
            .and_then(|view_entries| bincode::deserialize::<ViewEntry>(&view_entries).ok())
437
145673
            .unwrap_or_else(|| ViewEntry {
438
121908
                key: Bytes::from(key.to_vec()),
439
121908
                view_version: self.view.version(),
440
121908
                mappings: vec![],
441
121908
                reduced_value: Bytes::default(),
442
145673
            });
443
145673
        let key = key.to_owned();
444
145673
        if let Some(document_ids) = self.view_entries_to_clean.remove(&key) {
445
16519
            view_entry
446
16519
                .mappings
447
16985
                .retain(|m| !document_ids.contains(m.source.id.as_ref()));
448
16519

            
449
16519
            if view_entry.mappings.is_empty() && !self.new_mappings.contains_key(&key[..]) {
450
15993
                return KeyOperation::Remove;
451
496
            } else if self.has_reduce {
452
496
                let mappings = view_entry
453
496
                    .mappings
454
496
                    .iter()
455
496
                    .map(|m| (&key[..], m.value.as_slice()))
456
496
                    .collect::<Vec<_>>();
457
496

            
458
496
                match self.view.reduce(&mappings, false) {
459
496
                    Ok(reduced) => {
460
496
                        view_entry.reduced_value = Bytes::from(reduced);
461
496
                    }
462
                    Err(view::Error::Core(bonsaidb_core::Error::ReduceUnimplemented)) => {
463
                        self.has_reduce = false;
464
                    }
465
                    Err(other) => {
466
                        self.result = Err(Error::from(other));
467
                        return KeyOperation::Skip;
468
                    }
469
                }
470
            }
471
129154
        }
472

            
473
129650
        if let Some(new_mappings) = self.new_mappings.remove(&key[..]) {
474
269798
            for map::Serialized { source, value, .. } in new_mappings {
475
                // Before altering any data, verify that the key is unique if this is a unique view.
476
141048
                if self.view.unique()
477
85855
                    && !view_entry.mappings.is_empty()
478
1204
                    && view_entry.mappings[0].source.id != source.id
479
                {
480
528
                    self.result = Err(Error::Core(bonsaidb_core::Error::UniqueKeyViolation {
481
528
                        view: self.map_request.view_name.clone(),
482
528
                        conflicting_document: Box::new(source),
483
528
                        existing_document: Box::new(view_entry.mappings[0].source.clone()),
484
528
                    }));
485
528
                    return KeyOperation::Skip;
486
140580
                }
487
140580
                let entry_mapping = EntryMapping { source, value };
488
140580

            
489
140580
                // attempt to update an existing
490
140580
                // entry for this document, if
491
140580
                // present
492
140580
                let mut found = false;
493
178668
                for mapping in &mut view_entry.mappings {
494
38978
                    if mapping.source.id == entry_mapping.source.id {
495
890
                        found = true;
496
890
                        mapping.value = entry_mapping.value.clone();
497
890
                        break;
498
38088
                    }
499
                }
500

            
501
                // If an existing mapping wasn't
502
                // found, add it
503
140580
                if !found {
504
139690
                    view_entry.mappings.push(entry_mapping);
505
139690
                }
506
            }
507

            
508
            // There was a choice to be made here of whether to call
509
            // reduce()  with all of the existing values, or call it with
510
            // rereduce=true passing only the new value and the old stored
511
            // value. In this implementation, it's technically less
512
            // efficient, but we can guarantee that every value has only
513
            // been reduced once, and potentially re-reduced a single-time.
514
            // If we constantly try to update the value to optimize the size
515
            // of `mappings`, the fear is that the value computed may lose
516
            // precision in some contexts over time. Thus, the decision was
517
            // made to always call reduce() with all the mappings within a
518
            // single ViewEntry.
519
128750
            if self.has_reduce {
520
128498
                let mappings = view_entry
521
128498
                    .mappings
522
128498
                    .iter()
523
150566
                    .map(|m| (key.as_slice(), m.value.as_slice()))
524
128498
                    .collect::<Vec<_>>();
525
128498

            
526
128498
                match self.view.reduce(&mappings, false) {
527
40902
                    Ok(reduced) => {
528
40902
                        view_entry.reduced_value = Bytes::from(reduced);
529
40902
                    }
530
87626
                    Err(view::Error::Core(bonsaidb_core::Error::ReduceUnimplemented)) => {
531
87626
                        self.has_reduce = false;
532
87626
                    }
533
                    Err(other) => {
534
                        self.result = Err(Error::from(other));
535
                        return KeyOperation::Skip;
536
                    }
537
                }
538
252
            }
539
372
        }
540

            
541
129152
        let value = bincode::serialize(&view_entry).unwrap();
542
129152
        KeyOperation::Set(ArcBytes::from(value))
543
145673
    }
544
}