1
use std::{
2
    borrow::Cow,
3
    collections::{hash_map::RandomState, BTreeMap, BTreeSet, HashSet},
4
    sync::Arc,
5
};
6

            
7
use async_trait::async_trait;
8
use bonsaidb_core::{
9
    arc_bytes::{serde::Bytes, ArcBytes, OwnedBytes},
10
    connection::Connection,
11
    schema::{
12
        view::{self, map, Serialized},
13
        CollectionName, ViewName,
14
    },
15
};
16
use easy_parallel::Parallel;
17
use nebari::{
18
    io::any::AnyFile,
19
    tree::{AnyTreeRoot, CompareSwap, KeyOperation, Operation, Unversioned, Versioned},
20
    LockedTransactionTree, Tree, UnlockedTransactionTree,
21
};
22

            
23
use crate::{
24
    database::{deserialize_document, document_tree_name, Database},
25
    tasks::{Job, Keyed, Task},
26
    views::{
27
        view_document_map_tree_name, view_entries_tree_name, view_invalidated_docs_tree_name,
28
        EntryMapping, ViewEntry,
29
    },
30
    Error,
31
};
32

            
33
#[derive(Debug)]
34
pub struct Mapper {
35
    pub database: Database,
36
    pub map: Map,
37
}
38

            
39
445430
#[derive(Debug, Hash, Eq, PartialEq, Clone)]
40
pub struct Map {
41
    pub database: Arc<Cow<'static, str>>,
42
    pub collection: CollectionName,
43
    pub view_name: ViewName,
44
}
45

            
46
#[async_trait]
47
impl Job for Mapper {
48
    type Output = u64;
49
    type Error = Error;
50

            
51
428985
    #[cfg_attr(feature = "tracing", tracing::instrument)]
52
    #[allow(clippy::too_many_lines)]
53
142995
    async fn execute(&mut self) -> Result<Self::Output, Error> {
54
142995
        let documents =
55
142995
            self.database
56
142995
                .roots()
57
142995
                .tree(self.database.collection_tree::<Versioned, _>(
58
142995
                    &self.map.collection,
59
142995
                    document_tree_name(&self.map.collection),
60
142995
                )?)?;
61

            
62
142995
        let view_entries =
63
142995
            self.database
64
142995
                .roots()
65
142995
                .tree(self.database.collection_tree::<Unversioned, _>(
66
142995
                    &self.map.collection,
67
142995
                    view_entries_tree_name(&self.map.view_name),
68
142995
                )?)?;
69

            
70
142995
        let document_map =
71
142995
            self.database
72
142995
                .roots()
73
142995
                .tree(self.database.collection_tree::<Unversioned, _>(
74
142995
                    &self.map.collection,
75
142995
                    view_document_map_tree_name(&self.map.view_name),
76
142995
                )?)?;
77

            
78
142995
        let invalidated_entries =
79
142995
            self.database
80
142995
                .roots()
81
142995
                .tree(self.database.collection_tree::<Unversioned, _>(
82
142995
                    &self.map.collection,
83
142995
                    view_invalidated_docs_tree_name(&self.map.view_name),
84
142995
                )?)?;
85

            
86
142995
        let transaction_id = self
87
142995
            .database
88
142995
            .last_transaction_id()
89
            .await?
90
142995
            .expect("no way to have documents without a transaction");
91
142995

            
92
142995
        let storage = self.database.clone();
93
142995
        let map_request = self.map.clone();
94
142995

            
95
142995
        tokio::task::spawn_blocking(move || {
96
142995
            map_view(
97
142995
                &invalidated_entries,
98
142995
                &document_map,
99
142995
                &documents,
100
142995
                &view_entries,
101
142995
                &storage,
102
142995
                &map_request,
103
142995
            )
104
142995
        })
105
96869
        .await??;
106

            
107
142995
        self.database
108
142995
            .data
109
142995
            .storage
110
142995
            .tasks()
111
142995
            .mark_view_updated(
112
142995
                self.map.database.clone(),
113
142995
                self.map.collection.clone(),
114
142995
                self.map.view_name.clone(),
115
142995
                transaction_id,
116
142995
            )
117
26
            .await;
118

            
119
142995
        Ok(transaction_id)
120
285990
    }
121
}
122

            
123
142995
fn map_view(
124
142995
    invalidated_entries: &Tree<Unversioned, AnyFile>,
125
142995
    document_map: &Tree<Unversioned, AnyFile>,
126
142995
    documents: &Tree<Versioned, AnyFile>,
127
142995
    view_entries: &Tree<Unversioned, AnyFile>,
128
142995
    database: &Database,
129
142995
    map_request: &Map,
130
142995
) -> Result<(), Error> {
131
    const CHUNK_SIZE: usize = 100_000;
132
    // Only do any work if there are invalidated documents to process
133
142995
    let mut invalidated_ids = invalidated_entries
134
142995
        .get_range(&(..))?
135
142995
        .into_iter()
136
142995
        .map(|(key, _)| key)
137
142995
        .collect::<Vec<_>>();
138
153076
    while !invalidated_ids.is_empty() {
139
10081
        let transaction = database
140
10081
            .roots()
141
10081
            .transaction::<_, dyn AnyTreeRoot<AnyFile>>(&[
142
10081
                Box::new(invalidated_entries.clone()) as Box<dyn AnyTreeRoot<AnyFile>>,
143
10081
                Box::new(document_map.clone()),
144
10081
                Box::new(documents.clone()),
145
10081
                Box::new(view_entries.clone()),
146
10081
            ])?;
147
        {
148
10081
            let view = database
149
10081
                .data
150
10081
                .schema
151
10081
                .view_by_name(&map_request.view_name)
152
10081
                .unwrap();
153
10081

            
154
10081
            let document_ids = invalidated_ids
155
10081
                .drain(invalidated_ids.len().saturating_sub(CHUNK_SIZE)..)
156
10081
                .collect::<Vec<_>>();
157
10081
            let document_map = transaction.unlocked_tree(1).unwrap();
158
10081
            let documents = transaction.unlocked_tree(2).unwrap();
159
10081
            let view_entries = transaction.unlocked_tree(3).unwrap();
160
10081
            DocumentRequest {
161
10081
                document_ids: document_ids.clone(),
162
10081
                map_request,
163
10081
                database,
164
10081
                document_map,
165
10081
                documents,
166
10081
                view_entries,
167
10081
                view,
168
10081
            }
169
10081
            .map()?;
170

            
171
10081
            let mut invalidated_entries = transaction.tree::<Unversioned>(0).unwrap();
172
10081
            invalidated_entries.modify(document_ids, nebari::tree::Operation::Remove)?;
173
        }
174
10081
        transaction.commit()?;
175
    }
176

            
177
142995
    Ok(())
178
142995
}
179

            
180
pub struct DocumentRequest<'a> {
181
    pub document_ids: Vec<ArcBytes<'static>>,
182
    pub map_request: &'a Map,
183
    pub database: &'a Database,
184

            
185
    pub document_map: &'a UnlockedTransactionTree<AnyFile>,
186
    pub documents: &'a UnlockedTransactionTree<AnyFile>,
187
    pub view_entries: &'a UnlockedTransactionTree<AnyFile>,
188
    pub view: &'a dyn Serialized,
189
}
190

            
191
type DocumentIdPayload = (ArcBytes<'static>, Option<ArcBytes<'static>>);
192
type BatchPayload = (Vec<ArcBytes<'static>>, flume::Receiver<DocumentIdPayload>);
193

            
194
impl<'a> DocumentRequest<'a> {
195
95620
    fn generate_batches(
196
95620
        batch_sender: flume::Sender<BatchPayload>,
197
95620
        document_ids: &[ArcBytes<'static>],
198
95620
        documents: &UnlockedTransactionTree<AnyFile>,
199
95620
    ) -> Result<(), Error> {
200
95620
        // Generate batches
201
95620
        let mut documents = documents.lock::<Versioned>();
202
95620
        for chunk in document_ids.chunks(1024) {
203
95620
            let (document_id_sender, document_id_receiver) = flume::bounded(chunk.len());
204
95620
            batch_sender
205
95620
                .send((chunk.to_vec(), document_id_receiver))
206
95620
                .unwrap();
207
95620
            let mut documents = documents.get_multiple(chunk.iter().map(ArcBytes::as_slice))?;
208
95620
            documents.sort_by(|a, b| a.0.cmp(&b.0));
209

            
210
123392
            for document_id in chunk.iter().rev() {
211
123392
                let document = documents
212
123392
                    .last()
213
123392
                    .map_or(false, |(key, _)| (key == document_id))
214
123392
                    .then(|| documents.pop().unwrap().1);
215
123392

            
216
123392
                document_id_sender
217
123392
                    .send((document_id.clone(), document))
218
123392
                    .unwrap();
219
123392
            }
220

            
221
95620
            drop(document_id_sender);
222
        }
223
95620
        drop(batch_sender);
224
95620
        Ok(())
225
95620
    }
226

            
227
95620
    fn map_batches(
228
95620
        batch_receiver: &flume::Receiver<BatchPayload>,
229
95620
        mapped_sender: flume::Sender<Batch>,
230
95620
        view: &dyn Serialized,
231
95620
        parallelization: usize,
232
95620
    ) -> Result<(), Error> {
233
        // Process batches
234
191240
        while let Ok((document_ids, document_id_receiver)) = batch_receiver.recv() {
235
95620
            let mut batch = Batch {
236
95620
                document_ids,
237
95620
                ..Batch::default()
238
95620
            };
239
191240
            for result in Parallel::new()
240
191240
                .each(1..=parallelization, |_| -> Result<_, Error> {
241
191240
                    let mut results = Vec::new();
242
314632
                    while let Ok((document_id, document)) = document_id_receiver.recv() {
243
123392
                        let map_result = if let Some(document) = document {
244
110018
                            let document = deserialize_document(&document)?;
245

            
246
                            // Call the schema map function
247
110018
                            view.map(&document).map_err(bonsaidb_core::Error::from)?
248
                        } else {
249
                            // Get multiple didn't return this document ID.
250
13374
                            Vec::new()
251
                        };
252
123392
                        let keys: HashSet<OwnedBytes> = map_result
253
123392
                            .iter()
254
123392
                            .map(|map| OwnedBytes::from(map.key.as_slice()))
255
123392
                            .collect();
256
123392
                        let new_keys = ArcBytes::from(bincode::serialize(&keys)?);
257

            
258
123392
                        results.push((document_id, new_keys, keys, map_result));
259
                    }
260

            
261
191240
                    Ok(results)
262
191240
                })
263
95620
                .run()
264
            {
265
191240
                for (document_id, new_keys, keys, map_result) in result? {
266
233382
                    for key in &keys {
267
110016
                        batch.all_keys.insert(key.0.clone());
268
110016
                    }
269
123366
                    batch.document_maps.insert(document_id.clone(), new_keys);
270
123366
                    batch.document_keys.insert(document_id.clone(), keys);
271
233356
                    for mapping in map_result {
272
109990
                        let key_mappings = batch
273
109990
                            .new_mappings
274
109990
                            .entry(ArcBytes::from(mapping.key.to_vec()))
275
109990
                            .or_insert_with(Vec::default);
276
109990
                        key_mappings.push(mapping);
277
109990
                    }
278
                }
279
            }
280
95620
            mapped_sender.send(batch).unwrap();
281
        }
282
95620
        drop(mapped_sender);
283
95620
        Ok(())
284
95620
    }
285

            
286
95620
    fn update_document_map(
287
95620
        document_ids: Vec<ArcBytes<'static>>,
288
95620
        document_map: &mut LockedTransactionTree<'_, Unversioned, AnyFile>,
289
95620
        document_maps: &BTreeMap<ArcBytes<'static>, ArcBytes<'static>>,
290
95620
        mut document_keys: BTreeMap<ArcBytes<'static>, HashSet<OwnedBytes>>,
291
95620
        all_keys: &mut BTreeSet<ArcBytes<'static>>,
292
95620
    ) -> Result<BTreeMap<ArcBytes<'static>, HashSet<ArcBytes<'static>>>, Error> {
293
95620
        // We need to store a record of all the mappings this document produced.
294
95620
        let mut maps_to_clear = Vec::new();
295
95620
        document_map.modify(
296
95620
            document_ids,
297
95620
            nebari::tree::Operation::CompareSwap(CompareSwap::new(&mut |key, value| {
298
123392
                if let Some(existing_map) = value {
299
14285
                    maps_to_clear.push((key.to_owned(), existing_map));
300
109107
                }
301
123392
                let new_map = document_maps.get(key).unwrap();
302
123392
                KeyOperation::Set(new_map.clone())
303
123392
            })),
304
95620
        )?;
305
95620
        let mut view_entries_to_clean = BTreeMap::new();
306
109905
        for (document_id, existing_map) in maps_to_clear {
307
14285
            let existing_keys = bincode::deserialize::<HashSet<OwnedBytes>>(&existing_map)?;
308
14285
            let new_keys = document_keys.remove(&document_id).unwrap();
309
14285
            for key in existing_keys.difference(&new_keys) {
310
13935
                all_keys.insert(key.clone().0);
311
13935
                let key_documents = view_entries_to_clean
312
13935
                    .entry(key.clone().0)
313
13935
                    .or_insert_with(HashSet::<_, RandomState>::default);
314
13935
                key_documents.insert(document_id.clone());
315
13935
            }
316
        }
317
95620
        Ok(view_entries_to_clean)
318
95620
    }
319

            
320
95620
    fn update_view_entries(
321
95620
        view: &dyn Serialized,
322
95620
        map_request: &Map,
323
95620
        view_entries: &mut LockedTransactionTree<'_, Unversioned, AnyFile>,
324
95620
        all_keys: BTreeSet<ArcBytes<'static>>,
325
95620
        view_entries_to_clean: BTreeMap<ArcBytes<'static>, HashSet<ArcBytes<'static>>>,
326
95620
        new_mappings: BTreeMap<ArcBytes<'static>, Vec<map::Serialized>>,
327
95620
    ) -> Result<(), Error> {
328
95620
        let mut updater = ViewEntryUpdater {
329
95620
            view,
330
95620
            map_request,
331
95620
            view_entries_to_clean,
332
95620
            new_mappings,
333
95620
            result: Ok(()),
334
95620
            has_reduce: true,
335
95620
        };
336
95620
        view_entries
337
95620
            .modify(
338
95620
                all_keys.into_iter().collect(),
339
117521
                Operation::CompareSwap(CompareSwap::new(&mut |key, view_entries| {
340
117521
                    updater.compare_swap_view_entry(key, view_entries)
341
117521
                })),
342
95620
            )
343
95620
            .map_err(Error::from)
344
95620
            .and(updater.result)
345
95620
    }
346

            
347
95620
    fn save_mappings(
348
95620
        mapped_receiver: &flume::Receiver<Batch>,
349
95620
        view: &dyn Serialized,
350
95620
        map_request: &Map,
351
95620
        document_map: &mut LockedTransactionTree<'_, Unversioned, AnyFile>,
352
95620
        view_entries: &mut LockedTransactionTree<'_, Unversioned, AnyFile>,
353
95620
    ) -> Result<(), Error> {
354
        while let Ok(Batch {
355
95620
            document_ids,
356
95620
            document_maps,
357
95620
            document_keys,
358
95620
            new_mappings,
359
95620
            mut all_keys,
360
190942
        }) = mapped_receiver.recv()
361
        {
362
95620
            let view_entries_to_clean = Self::update_document_map(
363
95620
                document_ids,
364
95620
                document_map,
365
95620
                &document_maps,
366
95620
                document_keys,
367
95620
                &mut all_keys,
368
95620
            )?;
369

            
370
95620
            Self::update_view_entries(
371
95620
                view,
372
95620
                map_request,
373
95620
                view_entries,
374
95620
                all_keys,
375
95620
                view_entries_to_clean,
376
95620
                new_mappings,
377
95620
            )?;
378
        }
379
95322
        Ok(())
380
95620
    }
381

            
382
95620
    pub fn map(&mut self) -> Result<(), Error> {
383
95620
        let (batch_sender, batch_receiver) = flume::bounded(1);
384
95620
        let (mapped_sender, mapped_receiver) = flume::bounded(1);
385

            
386
286860
        for result in Parallel::new()
387
95620
            .add(|| Self::generate_batches(batch_sender, &self.document_ids, self.documents))
388
95620
            .add(|| {
389
95620
                Self::map_batches(
390
95620
                    &batch_receiver,
391
95620
                    mapped_sender,
392
95620
                    self.view,
393
95620
                    self.database.storage().parallelization(),
394
95620
                )
395
95620
            })
396
95620
            .add(|| {
397
95620
                let mut document_map = self.document_map.lock();
398
95620
                let mut view_entries = self.view_entries.lock();
399
95620
                Self::save_mappings(
400
95620
                    &mapped_receiver,
401
95620
                    self.view,
402
95620
                    self.map_request,
403
95620
                    &mut document_map,
404
95620
                    &mut view_entries,
405
95620
                )
406
95620
            })
407
95620
            .run()
408
        {
409
286860
            result?;
410
        }
411

            
412
95322
        Ok(())
413
95620
    }
414
}
415

            
416
95620
#[derive(Default)]
417
struct Batch {
418
    document_ids: Vec<ArcBytes<'static>>,
419
    document_maps: BTreeMap<ArcBytes<'static>, ArcBytes<'static>>,
420
    document_keys: BTreeMap<ArcBytes<'static>, HashSet<OwnedBytes>>,
421
    new_mappings: BTreeMap<ArcBytes<'static>, Vec<map::Serialized>>,
422
    all_keys: BTreeSet<ArcBytes<'static>>,
423
}
424

            
425
impl Keyed<Task> for Mapper {
426
159440
    fn key(&self) -> Task {
427
159440
        Task::ViewMap(self.map.clone())
428
159440
    }
429
}
430

            
431
struct ViewEntryUpdater<'a> {
432
    view: &'a dyn Serialized,
433
    map_request: &'a Map,
434
    view_entries_to_clean: BTreeMap<ArcBytes<'static>, HashSet<ArcBytes<'static>>>,
435
    new_mappings: BTreeMap<ArcBytes<'static>, Vec<map::Serialized>>,
436
    result: Result<(), Error>,
437
    has_reduce: bool,
438
}
439

            
440
impl<'a> ViewEntryUpdater<'a> {
441
117521
    fn compare_swap_view_entry(
442
117521
        &mut self,
443
117521
        key: &ArcBytes<'_>,
444
117521
        view_entries: Option<ArcBytes<'static>>,
445
117521
    ) -> KeyOperation<ArcBytes<'static>> {
446
117521
        let mut view_entry = view_entries
447
117521
            .and_then(|view_entries| bincode::deserialize::<ViewEntry>(&view_entries).ok())
448
117521
            .unwrap_or_else(|| ViewEntry {
449
99246
                key: Bytes::from(key.to_vec()),
450
99246
                view_version: self.view.version(),
451
99246
                mappings: vec![],
452
99246
                reduced_value: Bytes::default(),
453
117521
            });
454
117521
        let key = key.to_owned();
455
117521
        if let Some(document_ids) = self.view_entries_to_clean.remove(&key) {
456
13855
            view_entry
457
13855
                .mappings
458
14175
                .retain(|m| !document_ids.contains(m.source.id.as_ref()));
459
13855

            
460
13855
            if view_entry.mappings.is_empty() {
461
13615
                return KeyOperation::Remove;
462
240
            } else if self.has_reduce {
463
240
                let mappings = view_entry
464
240
                    .mappings
465
240
                    .iter()
466
240
                    .map(|m| (&key[..], m.value.as_slice()))
467
240
                    .collect::<Vec<_>>();
468
240

            
469
240
                match self.view.reduce(&mappings, false) {
470
240
                    Ok(reduced) => {
471
240
                        view_entry.reduced_value = Bytes::from(reduced);
472
240
                    }
473
                    Err(view::Error::Core(bonsaidb_core::Error::ReduceUnimplemented)) => {
474
                        self.has_reduce = false;
475
                    }
476
                    Err(other) => {
477
                        self.result = Err(Error::from(other));
478
                        return KeyOperation::Skip;
479
                    }
480
                }
481
            }
482
103666
        }
483

            
484
103906
        if let Some(new_mappings) = self.new_mappings.remove(&key[..]) {
485
213384
            for map::Serialized { source, value, .. } in new_mappings {
486
                // Before altering any data, verify that the key is unique if this is a unique view.
487
110016
                if self.view.unique()
488
72423
                    && !view_entry.mappings.is_empty()
489
644
                    && view_entry.mappings[0].source.id != source.id
490
                {
491
298
                    self.result = Err(Error::Core(bonsaidb_core::Error::UniqueKeyViolation {
492
298
                        view: self.map_request.view_name.clone(),
493
298
                        conflicting_document: Box::new(source),
494
298
                        existing_document: Box::new(view_entry.mappings[0].source.clone()),
495
298
                    }));
496
298
                    return KeyOperation::Skip;
497
109718
                }
498
109718
                let entry_mapping = EntryMapping { source, value };
499
109718

            
500
109718
                // attempt to update an existing
501
109718
                // entry for this document, if
502
109718
                // present
503
109718
                let mut found = false;
504
134480
                for mapping in &mut view_entry.mappings {
505
25214
                    if mapping.source.id == entry_mapping.source.id {
506
452
                        found = true;
507
452
                        mapping.value = entry_mapping.value.clone();
508
452
                        break;
509
24762
                    }
510
                }
511

            
512
                // If an existing mapping wasn't
513
                // found, add it
514
109718
                if !found {
515
109266
                    view_entry.mappings.push(entry_mapping);
516
109266
                }
517
            }
518

            
519
            // There was a choice to be made here of whether to call
520
            // reduce()  with all of the existing values, or call it with
521
            // rereduce=true passing only the new value and the old stored
522
            // value. In this implementation, it's technically less
523
            // efficient, but we can guarantee that every value has only
524
            // been reduced once, and potentially re-reduced a single-time.
525
            // If we constantly try to update the value to optimize the size
526
            // of `mappings`, the fear is that the value computed may lose
527
            // precision in some contexts over time. Thus, the decision was
528
            // made to always call reduce() with all the mappings within a
529
            // single ViewEntry.
530
103368
            if self.has_reduce {
531
103356
                let mappings = view_entry
532
103356
                    .mappings
533
103356
                    .iter()
534
116054
                    .map(|m| (key.as_slice(), m.value.as_slice()))
535
103356
                    .collect::<Vec<_>>();
536
103356

            
537
103356
                match self.view.reduce(&mappings, false) {
538
29318
                    Ok(reduced) => {
539
29318
                        view_entry.reduced_value = Bytes::from(reduced);
540
29318
                    }
541
74038
                    Err(view::Error::Core(bonsaidb_core::Error::ReduceUnimplemented)) => {
542
74038
                        self.has_reduce = false;
543
74038
                    }
544
                    Err(other) => {
545
                        self.result = Err(Error::from(other));
546
                        return KeyOperation::Skip;
547
                    }
548
                }
549
12
            }
550
240
        }
551

            
552
103608
        let value = bincode::serialize(&view_entry).unwrap();
553
103608
        KeyOperation::Set(ArcBytes::from(value))
554
117521
    }
555
}