1
use std::{
2
    borrow::{Borrow, Cow},
3
    collections::{BTreeMap, HashMap, HashSet},
4
    convert::Infallible,
5
    ops::{self, Deref},
6
    sync::Arc,
7
    u8,
8
};
9

            
10
#[cfg(any(feature = "encryption", feature = "compression"))]
11
use bonsaidb_core::document::KeyId;
12
use bonsaidb_core::{
13
    arc_bytes::{
14
        serde::{Bytes, CowBytes},
15
        ArcBytes,
16
    },
17
    connection::{
18
        self, AccessPolicy, Connection, HasSession, LowLevelConnection, QueryKey, Range, Session,
19
        Sort, StorageConnection,
20
    },
21
    document::{BorrowedDocument, DocumentId, Header, OwnedDocument, Revision},
22
    key::Key,
23
    keyvalue::{KeyOperation, Output, Timestamp},
24
    limits::{LIST_TRANSACTIONS_DEFAULT_RESULT_COUNT, LIST_TRANSACTIONS_MAX_RESULTS},
25
    permissions::{
26
        bonsai::{
27
            collection_resource_name, database_resource_name, document_resource_name,
28
            kv_resource_name, view_resource_name, BonsaiAction, DatabaseAction, DocumentAction,
29
            TransactionAction, ViewAction,
30
        },
31
        Permissions,
32
    },
33
    schema::{
34
        self,
35
        view::{self, map::MappedSerializedValue},
36
        CollectionName, Schema, Schematic, ViewName,
37
    },
38
    transaction::{
39
        self, ChangedDocument, Changes, Command, DocumentChanges, Operation, OperationResult,
40
        Transaction,
41
    },
42
};
43
use itertools::Itertools;
44
use nebari::{
45
    io::any::AnyFile,
46
    tree::{
47
        AnyTreeRoot, BorrowByteRange, BorrowedRange, CompareSwap, Root, ScanEvaluation, TreeRoot,
48
        Unversioned, Versioned,
49
    },
50
    AbortError, ExecutingTransaction, Roots, Tree,
51
};
52
use parking_lot::Mutex;
53
use serde::{Deserialize, Serialize};
54
use watchable::Watchable;
55

            
56
#[cfg(feature = "encryption")]
57
use crate::storage::TreeVault;
58
use crate::{
59
    config::{Builder, KeyValuePersistence, StorageConfiguration},
60
    database::keyvalue::BackgroundWorkerProcessTarget,
61
    error::Error,
62
    open_trees::OpenTrees,
63
    views::{
64
        mapper, view_document_map_tree_name, view_entries_tree_name,
65
        view_invalidated_docs_tree_name, ViewEntry,
66
    },
67
    Storage,
68
};
69

            
70
pub mod keyvalue;
71

            
72
pub(crate) mod compat;
73
pub mod pubsub;
74

            
75
/// A database stored in BonsaiDb. This type blocks the current thread when
76
/// used. See [`AsyncDatabase`](crate::AsyncDatabase) for this type's async counterpart.
77
///
78
/// ## Converting between Blocking and Async Types
79
///
80
/// [`AsyncDatabase`](crate::AsyncDatabase) and [`Database`] can be converted to and from each other
81
/// using:
82
///
83
/// - [`AsyncDatabase::into_blocking()`](crate::AsyncDatabase::into_blocking)
84
/// - [`AsyncDatabase::to_blocking()`](crate::AsyncDatabase::to_blocking)
85
/// - [`AsyncDatabase::as_blocking()`](crate::AsyncDatabase::as_blocking)
86
/// - [`Database::into_async()`]
87
/// - [`Database::to_async()`]
88
/// - [`Database::into_async_with_runtime()`]
89
/// - [`Database::to_async_with_runtime()`]
90
///
91
/// ## Using `Database` to create a single database
92
///
93
/// `Database`provides an easy mechanism to open and access a single database:
94
///
95
/// ```rust
96
/// // `bonsaidb_core` is re-exported to `bonsaidb::core` or `bonsaidb_local::core`.
97
/// use bonsaidb_core::schema::Collection;
98
/// // `bonsaidb_local` is re-exported to `bonsaidb::local` if using the omnibus crate.
99
/// use bonsaidb_local::{
100
///     config::{Builder, StorageConfiguration},
101
///     Database,
102
/// };
103
/// use serde::{Deserialize, Serialize};
104
///
105
/// #[derive(Debug, Serialize, Deserialize, Collection)]
106
/// #[collection(name = "blog-posts")]
107
/// # #[collection(core = bonsaidb_core)]
108
/// struct BlogPost {
109
///     pub title: String,
110
///     pub contents: String,
111
/// }
112
///
113
/// # fn test() -> Result<(), bonsaidb_local::Error> {
114
/// let db = Database::open::<BlogPost>(StorageConfiguration::new("my-db.bonsaidb"))?;
115
/// # Ok(())
116
/// # }
117
/// ```
118
///
119
/// Under the hood, this initializes a [`Storage`] instance pointing at
120
/// "./my-db.bonsaidb". It then returns (or creates) a database named "default"
121
/// with the schema `BlogPost`.
122
///
123
/// In this example, `BlogPost` implements the [`Collection`](schema::Collection) trait, and all
124
/// collections can be used as a [`Schema`].
125
3802854
#[derive(Debug, Clone)]
126
pub struct Database {
127
    pub(crate) data: Arc<Data>,
128
    pub(crate) storage: Storage,
129
}
130

            
131
#[derive(Debug)]
132
pub struct Data {
133
    pub name: Arc<Cow<'static, str>>,
134
    context: Context,
135
    pub(crate) schema: Arc<Schematic>,
136
}
137

            
138
impl Database {
139
    /// Opens a local file as a bonsaidb.
140
123825
    pub(crate) fn new<DB: Schema, S: Into<Cow<'static, str>> + Send>(
141
123825
        name: S,
142
123825
        context: Context,
143
123825
        storage: &Storage,
144
123825
    ) -> Result<Self, Error> {
145
123825
        let name = name.into();
146
123825
        let schema = Arc::new(DB::schematic()?);
147
123825
        let db = Self {
148
123825
            storage: storage.clone(),
149
123825
            data: Arc::new(Data {
150
123825
                name: Arc::new(name),
151
123825
                context,
152
123825
                schema,
153
123825
            }),
154
123825
        };
155
123825

            
156
123825
        if storage.instance.check_view_integrity_on_database_open() {
157
16
            for view in db.data.schema.views() {
158
16
                storage.instance.tasks().spawn_integrity_check(view, &db);
159
16
            }
160
123821
        }
161

            
162
123825
        storage
163
123825
            .instance
164
123825
            .tasks()
165
123825
            .spawn_key_value_expiration_loader(&db);
166
123825

            
167
123825
        Ok(db)
168
123825
    }
169

            
170
    /// Restricts an unauthenticated instance to having `effective_permissions`.
171
    /// Returns `None` if a session has already been established.
172
    #[must_use]
173
    pub fn with_effective_permissions(&self, effective_permissions: Permissions) -> Option<Self> {
174
        self.storage
175
            .with_effective_permissions(effective_permissions)
176
            .map(|storage| Self {
177
                storage,
178
                data: self.data.clone(),
179
            })
180
    }
181

            
182
    /// Creates a `Storage` with a single-database named "default" with its data
183
    /// stored at `path`. This requires exclusive access to the storage location
184
    /// configured. Attempting to open the same path multiple times concurrently
185
    /// will lead to errors.
186
    ///
187
    /// Using this method is perfect if only one database is being used.
188
    /// However, if multiple databases are needed, it is much better to store
189
    /// multiple databases in a single [`Storage`] instance rather than creating
190
    /// multiple independent databases using this method.
191
    ///
192
    /// When opening multiple databases using this function, each database will
193
    /// have its own thread pool, cache, task worker pool, and more. By using a
194
    /// single [`Storage`] instance, BonsaiDb will use less resources and likely
195
    /// perform better.
196
29
    pub fn open<DB: Schema>(configuration: StorageConfiguration) -> Result<Self, Error> {
197
29
        let storage = Storage::open(configuration.with_schema::<DB>()?)?;
198

            
199
29
        Ok(storage.create_database::<DB>("default", true)?)
200
29
    }
201

            
202
    /// Returns the [`Schematic`] for the schema for this database.
203
    #[must_use]
204
3860325
    pub fn schematic(&self) -> &'_ Schematic {
205
3860325
        &self.data.schema
206
3860325
    }
207

            
208
2116371
    pub(crate) fn roots(&self) -> &'_ nebari::Roots<AnyFile> {
209
2116371
        &self.data.context.roots
210
2116371
    }
211

            
212
872892
    fn for_each_in_view<F: FnMut(ViewEntry) -> Result<(), bonsaidb_core::Error> + Send + Sync>(
213
872892
        &self,
214
872892
        view: &dyn view::Serialized,
215
872892
        key: Option<QueryKey<Bytes>>,
216
872892
        order: Sort,
217
872892
        limit: Option<u32>,
218
872892
        access_policy: AccessPolicy,
219
872892
        mut callback: F,
220
872892
    ) -> Result<(), bonsaidb_core::Error> {
221
872892
        if matches!(access_policy, AccessPolicy::UpdateBefore) {
222
318358
            self.storage
223
318358
                .instance
224
318358
                .tasks()
225
318358
                .update_view_if_needed(view, self)?;
226
554534
        }
227

            
228
872892
        let view_entries = self
229
872892
            .roots()
230
872892
            .tree(self.collection_tree(
231
872892
                &view.collection(),
232
872892
                view_entries_tree_name(&view.view_name()),
233
872892
            )?)
234
872892
            .map_err(Error::from)?;
235

            
236
        {
237
872892
            for entry in Self::create_view_iterator(&view_entries, key, order, limit)? {
238
719245
                callback(entry)?;
239
            }
240
        }
241

            
242
872862
        if matches!(access_policy, AccessPolicy::UpdateAfter) {
243
124
            let db = self.clone();
244
124
            let view_name = view.view_name();
245
124
            let view = db
246
124
                .data
247
124
                .schema
248
124
                .view_by_name(&view_name)
249
124
                .expect("query made with view that isn't registered with this database");
250
124
            db.storage
251
124
                .instance
252
124
                .tasks()
253
124
                .update_view_if_needed(view, &db)?;
254
872768
        }
255

            
256
872892
        Ok(())
257
872892
    }
258

            
259
554671
    fn apply_transaction_to_roots(
260
554671
        &self,
261
554671
        transaction: &Transaction,
262
554671
    ) -> Result<Vec<OperationResult>, Error> {
263
554671
        let mut open_trees = OpenTrees::default();
264
1328131
        for op in &transaction.operations {
265
773524
            if !self.data.schema.contains_collection_name(&op.collection) {
266
124
                return Err(Error::Core(bonsaidb_core::Error::CollectionNotFound));
267
773400
            }
268

            
269
            #[cfg(any(feature = "encryption", feature = "compression"))]
270
773460
            let vault = if let Some(encryption_key) =
271
773400
                self.collection_encryption_key(&op.collection).cloned()
272
            {
273
                #[cfg(feature = "encryption")]
274
3475
                if let Some(mut vault) = self.storage().tree_vault().cloned() {
275
2094
                    vault.key = Some(encryption_key);
276
2094
                    Some(vault)
277
                } else {
278
1441
                    TreeVault::new_if_needed(
279
1441
                        Some(encryption_key),
280
1441
                        self.storage().vault(),
281
1441
                        #[cfg(feature = "compression")]
282
1441
                        None,
283
1441
                    )
284
                }
285

            
286
                #[cfg(not(feature = "encryption"))]
287
                {
288
                    drop(encryption_key);
289
                    return Err(Error::EncryptionDisabled);
290
                }
291
            } else {
292
769925
                self.storage().tree_vault().cloned()
293
            };
294

            
295
773460
            open_trees.open_trees_for_document_change(
296
773460
                &op.collection,
297
773460
                &self.data.schema,
298
773460
                #[cfg(any(feature = "encryption", feature = "compression"))]
299
773460
                vault,
300
773460
            );
301
        }
302

            
303
554607
        let mut roots_transaction = self
304
554607
            .data
305
554607
            .context
306
554607
            .roots
307
554607
            .transaction::<_, dyn AnyTreeRoot<AnyFile>>(&open_trees.trees)?;
308

            
309
554607
        let mut results = Vec::new();
310
554607
        let mut changed_documents = Vec::new();
311
554607
        let mut collection_indexes = HashMap::new();
312
554607
        let mut collections = Vec::new();
313
1312106
        for op in &transaction.operations {
314
773460
            let result = self.execute_operation(
315
773460
                op,
316
773460
                &mut roots_transaction,
317
773460
                &open_trees.trees_index_by_name,
318
773460
            )?;
319

            
320
757499
            if let Some((collection, id, deleted)) = match &result {
321
713105
                OperationResult::DocumentUpdated { header, collection } => {
322
713105
                    Some((collection, header.id, false))
323
                }
324
44394
                OperationResult::DocumentDeleted { id, collection } => {
325
44394
                    Some((collection, *id, true))
326
                }
327
                OperationResult::Success => None,
328
757499
            } {
329
757499
                let collection = match collection_indexes.get(collection) {
330
216994
                    Some(index) => *index,
331
                    None => {
332
540505
                        if let Ok(id) = u16::try_from(collections.len()) {
333
540505
                            collection_indexes.insert(collection.clone(), id);
334
540505
                            collections.push(collection.clone());
335
540505
                            id
336
                        } else {
337
                            return Err(Error::TransactionTooLarge);
338
                        }
339
                    }
340
                };
341
757499
                changed_documents.push(ChangedDocument {
342
757499
                    collection,
343
757499
                    id,
344
757499
                    deleted,
345
757499
                });
346
            }
347
757499
            results.push(result);
348
        }
349

            
350
538646
        self.invalidate_changed_documents(
351
538646
            &mut roots_transaction,
352
538646
            &open_trees,
353
538646
            &collections,
354
538646
            &changed_documents,
355
538646
        )?;
356

            
357
538646
        roots_transaction
358
538646
            .entry_mut()
359
538646
            .set_data(compat::serialize_executed_transaction_changes(
360
538646
                &Changes::Documents(DocumentChanges {
361
538646
                    collections,
362
538646
                    documents: changed_documents,
363
538646
                }),
364
538646
            )?)?;
365

            
366
538646
        roots_transaction.commit()?;
367

            
368
538646
        Ok(results)
369
554731
    }
370

            
371
538616
    fn invalidate_changed_documents(
372
538616
        &self,
373
538616
        roots_transaction: &mut ExecutingTransaction<AnyFile>,
374
538616
        open_trees: &OpenTrees,
375
538616
        collections: &[CollectionName],
376
538616
        changed_documents: &[ChangedDocument],
377
538616
    ) -> Result<(), Error> {
378
540506
        for (collection, changed_documents) in &changed_documents
379
538616
            .iter()
380
757499
            .group_by(|doc| &collections[usize::from(doc.collection)])
381
        {
382
540505
            if let Some(views) = self.data.schema.views_in_collection(collection) {
383
313494
                let changed_documents = changed_documents.collect::<Vec<_>>();
384
1062464
                for view in views.into_iter().filter(|view| !view.unique()) {
385
1016507
                    let view_name = view.view_name();
386
1016507
                    let tree_name = view_invalidated_docs_tree_name(&view_name);
387
2124170
                    for changed_document in &changed_documents {
388
1107633
                        let mut invalidated_docs = roots_transaction
389
1107633
                            .tree::<Unversioned>(open_trees.trees_index_by_name[&tree_name])
390
1107633
                            .unwrap();
391
1107633
                        invalidated_docs.set(changed_document.id.as_ref().to_vec(), b"")?;
392
                    }
393
                }
394
227011
            }
395
        }
396
538646
        Ok(())
397
538646
    }
398

            
399
773490
    fn execute_operation(
400
773490
        &self,
401
773490
        operation: &Operation,
402
773490
        transaction: &mut ExecutingTransaction<AnyFile>,
403
773490
        tree_index_map: &HashMap<String, usize>,
404
773490
    ) -> Result<OperationResult, Error> {
405
773490
        match &operation.command {
406
538190
            Command::Insert { id, contents } => {
407
538190
                self.execute_insert(operation, transaction, tree_index_map, *id, contents)
408
            }
409
175628
            Command::Update { header, contents } => self.execute_update(
410
175628
                operation,
411
175628
                transaction,
412
175628
                tree_index_map,
413
175628
                header.id,
414
175628
                Some(&header.revision),
415
175628
                contents,
416
175628
            ),
417
15278
            Command::Overwrite { id, contents } => {
418
15278
                self.execute_update(operation, transaction, tree_index_map, *id, None, contents)
419
            }
420
44394
            Command::Delete { header } => {
421
44394
                self.execute_delete(operation, transaction, tree_index_map, header)
422
            }
423
        }
424
773490
    }
425

            
426
190906
    fn execute_update(
427
190906
        &self,
428
190906
        operation: &Operation,
429
190906
        transaction: &mut ExecutingTransaction<AnyFile>,
430
190906
        tree_index_map: &HashMap<String, usize>,
431
190906
        id: DocumentId,
432
190906
        check_revision: Option<&Revision>,
433
190906
        contents: &[u8],
434
190906
    ) -> Result<OperationResult, crate::Error> {
435
190906
        let mut documents = transaction
436
190906
            .tree::<Versioned>(tree_index_map[&document_tree_name(&operation.collection)])
437
190906
            .unwrap();
438
190906
        let document_id = ArcBytes::from(id.to_vec());
439
190906
        let mut result = None;
440
190906
        let mut updated = false;
441
190906
        documents.modify(
442
190906
            vec![document_id.clone()],
443
190906
            nebari::tree::Operation::CompareSwap(CompareSwap::new(&mut |_key,
444
                                                                        value: Option<
445
                ArcBytes<'_>,
446
            >| {
447
190906
                if let Some(old) = value {
448
190782
                    let doc = match deserialize_document(&old) {
449
190782
                        Ok(doc) => doc,
450
                        Err(err) => {
451
                            result = Some(Err(err));
452
                            return nebari::tree::KeyOperation::Skip;
453
                        }
454
                    };
455
190782
                    if check_revision.is_none() || Some(&doc.header.revision) == check_revision {
456
190658
                        if let Some(updated_revision) = doc.header.revision.next_revision(contents)
457
                        {
458
190410
                            let updated_header = Header {
459
190410
                                id,
460
190410
                                revision: updated_revision,
461
190410
                            };
462
190410
                            let serialized_doc = match serialize_document(&BorrowedDocument {
463
190410
                                header: updated_header.clone(),
464
190410
                                contents: CowBytes::from(contents),
465
190410
                            }) {
466
190410
                                Ok(bytes) => bytes,
467
                                Err(err) => {
468
                                    result = Some(Err(Error::from(err)));
469
                                    return nebari::tree::KeyOperation::Skip;
470
                                }
471
                            };
472
190410
                            result = Some(Ok(OperationResult::DocumentUpdated {
473
190410
                                collection: operation.collection.clone(),
474
190410
                                header: updated_header,
475
190410
                            }));
476
190410
                            updated = true;
477
190410
                            return nebari::tree::KeyOperation::Set(ArcBytes::from(serialized_doc));
478
248
                        }
479
248

            
480
248
                        // If no new revision was made, it means an attempt to
481
248
                        // save a document with the same contents was made.
482
248
                        // We'll return a success but not actually give a new
483
248
                        // version
484
248
                        result = Some(Ok(OperationResult::DocumentUpdated {
485
248
                            collection: operation.collection.clone(),
486
248
                            header: doc.header,
487
248
                        }));
488
124
                    } else {
489
124
                        result = Some(Err(Error::Core(bonsaidb_core::Error::DocumentConflict(
490
124
                            operation.collection.clone(),
491
124
                            Box::new(doc.header),
492
124
                        ))));
493
124
                    }
494
124
                } else if check_revision.is_none() {
495
                    let doc = BorrowedDocument::new(id, contents);
496
                    match serialize_document(&doc).map(|bytes| (doc, bytes)) {
497
                        Ok((doc, serialized)) => {
498
                            result = Some(Ok(OperationResult::DocumentUpdated {
499
                                collection: operation.collection.clone(),
500
                                header: doc.header,
501
                            }));
502
                            return nebari::tree::KeyOperation::Set(ArcBytes::from(serialized));
503
                        }
504
                        Err(err) => {
505
                            result = Some(Err(Error::from(err)));
506
                        }
507
                    }
508
124
                } else {
509
124
                    result = Some(Err(Error::Core(bonsaidb_core::Error::DocumentNotFound(
510
124
                        operation.collection.clone(),
511
124
                        Box::new(id),
512
124
                    ))));
513
124
                }
514
496
                nebari::tree::KeyOperation::Skip
515
190906
            })),
516
190906
        )?;
517
190906
        drop(documents);
518
190906

            
519
190906
        if updated {
520
190410
            self.update_unique_views(&document_id, operation, transaction, tree_index_map)?;
521
496
        }
522

            
523
190658
        result.expect("nebari should invoke the callback even when the key isn't found")
524
190906
    }
525

            
526
538190
    fn execute_insert(
527
538190
        &self,
528
538190
        operation: &Operation,
529
538190
        transaction: &mut ExecutingTransaction<AnyFile>,
530
538190
        tree_index_map: &HashMap<String, usize>,
531
538190
        id: Option<DocumentId>,
532
538190
        contents: &[u8],
533
538190
    ) -> Result<OperationResult, Error> {
534
538190
        let mut documents = transaction
535
538190
            .tree::<Versioned>(tree_index_map[&document_tree_name(&operation.collection)])
536
538190
            .unwrap();
537
538190
        let id = if let Some(id) = id {
538
212174
            id
539
326016
        } else if let Some(last_key) = documents.last_key()? {
540
302293
            let id = DocumentId::try_from(last_key.as_slice())?;
541
302293
            self.data
542
302293
                .schema
543
302293
                .next_id_for_collection(&operation.collection, Some(id))?
544
        } else {
545
23723
            self.data
546
23723
                .schema
547
23723
                .next_id_for_collection(&operation.collection, None)?
548
        };
549

            
550
538160
        let doc = BorrowedDocument::new(id, contents);
551
538160
        let serialized: Vec<u8> = serialize_document(&doc)?;
552
538160
        let document_id = ArcBytes::from(doc.header.id.as_ref().to_vec());
553
538160
        if let Some(document) = documents.replace(document_id.clone(), serialized)? {
554
15155
            let doc = deserialize_document(&document)?;
555
15155
            Err(Error::Core(bonsaidb_core::Error::DocumentConflict(
556
15155
                operation.collection.clone(),
557
15155
                Box::new(doc.header),
558
15155
            )))
559
        } else {
560
523005
            drop(documents);
561
523005
            self.update_unique_views(&document_id, operation, transaction, tree_index_map)?;
562

            
563
522695
            Ok(OperationResult::DocumentUpdated {
564
522695
                collection: operation.collection.clone(),
565
522695
                header: doc.header,
566
522695
            })
567
        }
568
538160
    }
569

            
570
44394
    fn execute_delete(
571
44394
        &self,
572
44394
        operation: &Operation,
573
44394
        transaction: &mut ExecutingTransaction<AnyFile>,
574
44394
        tree_index_map: &HashMap<String, usize>,
575
44394
        header: &Header,
576
44394
    ) -> Result<OperationResult, Error> {
577
44394
        let mut documents = transaction
578
44394
            .tree::<Versioned>(tree_index_map[&document_tree_name(&operation.collection)])
579
44394
            .unwrap();
580
44394
        if let Some(vec) = documents.remove(header.id.as_ref())? {
581
44394
            drop(documents);
582
44394
            let doc = deserialize_document(&vec)?;
583
44394
            if &doc.header == header {
584
44394
                self.update_unique_views(
585
44394
                    &ArcBytes::from(doc.header.id.to_vec()),
586
44394
                    operation,
587
44394
                    transaction,
588
44394
                    tree_index_map,
589
44394
                )?;
590

            
591
44394
                Ok(OperationResult::DocumentDeleted {
592
44394
                    collection: operation.collection.clone(),
593
44394
                    id: header.id,
594
44394
                })
595
            } else {
596
                Err(Error::Core(bonsaidb_core::Error::DocumentConflict(
597
                    operation.collection.clone(),
598
                    Box::new(header.clone()),
599
                )))
600
            }
601
        } else {
602
            Err(Error::Core(bonsaidb_core::Error::DocumentNotFound(
603
                operation.collection.clone(),
604
                Box::new(header.id),
605
            )))
606
        }
607
44394
    }
608

            
609
    fn update_unique_views(
610
        &self,
611
        document_id: &ArcBytes<'static>,
612
        operation: &Operation,
613
        transaction: &mut ExecutingTransaction<AnyFile>,
614
        tree_index_map: &HashMap<String, usize>,
615
    ) -> Result<(), Error> {
616
757779
        if let Some(unique_views) = self
617
757779
            .data
618
757779
            .schema
619
757779
            .unique_views_in_collection(&operation.collection)
620
        {
621
84975
            let documents = transaction
622
84975
                .unlocked_tree(tree_index_map[&document_tree_name(&operation.collection)])
623
84975
                .unwrap();
624
169452
            for view in unique_views {
625
85005
                let name = view.view_name();
626
85005
                let document_map = transaction
627
85005
                    .unlocked_tree(tree_index_map[&view_document_map_tree_name(&name)])
628
85005
                    .unwrap();
629
85005
                let view_entries = transaction
630
85005
                    .unlocked_tree(tree_index_map[&view_entries_tree_name(&name)])
631
85005
                    .unwrap();
632
85005
                mapper::DocumentRequest {
633
85005
                    database: self,
634
85005
                    document_ids: vec![document_id.clone()],
635
85005
                    map_request: &mapper::Map {
636
85005
                        database: self.data.name.clone(),
637
85005
                        collection: operation.collection.clone(),
638
85005
                        view_name: name.clone(),
639
85005
                    },
640
85005
                    document_map,
641
85005
                    documents,
642
85005
                    view_entries,
643
85005
                    view,
644
85005
                }
645
85005
                .map()?;
646
            }
647
672804
        }
648

            
649
757251
        Ok(())
650
757779
    }
651

            
652
872922
    fn create_view_iterator<'a, K: for<'k> Key<'k> + 'a>(
653
872922
        view_entries: &'a Tree<Unversioned, AnyFile>,
654
872922
        key: Option<QueryKey<K>>,
655
872922
        order: Sort,
656
872922
        limit: Option<u32>,
657
872922
    ) -> Result<Vec<ViewEntry>, Error> {
658
872922
        let mut values = Vec::new();
659
872922
        let forwards = match order {
660
872798
            Sort::Ascending => true,
661
124
            Sort::Descending => false,
662
        };
663
872922
        let mut values_read = 0;
664
872922
        if let Some(key) = key {
665
868423
            match key {
666
398
                QueryKey::Range(range) => {
667
398
                    let range = range
668
398
                        .as_ord_bytes()
669
398
                        .map_err(view::Error::key_serialization)?;
670
398
                    view_entries.scan::<Infallible, _, _, _, _>(
671
796
                        &range.map_ref(|bytes| &bytes[..]),
672
398
                        forwards,
673
668
                        |_, _, _| ScanEvaluation::ReadData,
674
398
                        |_, _| {
675
2026
                            if let Some(limit) = limit {
676
248
                                if values_read >= limit {
677
124
                                    return ScanEvaluation::Stop;
678
124
                                }
679
124
                                values_read += 1;
680
1778
                            }
681
1902
                            ScanEvaluation::ReadData
682
2026
                        },
683
1902
                        |_key, _index, value| {
684
1902
                            values.push(value);
685
1902
                            Ok(())
686
1902
                        },
687
398
                    )?;
688
                }
689
867717
                QueryKey::Matches(key) => {
690
867717
                    let key = key
691
867717
                        .as_ord_bytes()
692
867717
                        .map_err(view::Error::key_serialization)?
693
867717
                        .to_vec();
694
867717

            
695
867717
                    values.extend(view_entries.get(&key)?);
696
                }
697
308
                QueryKey::Multiple(list) => {
698
308
                    let mut list = list
699
308
                        .into_iter()
700
616
                        .map(|key| {
701
616
                            key.as_ord_bytes()
702
616
                                .map(|bytes| bytes.to_vec())
703
616
                                .map_err(view::Error::key_serialization)
704
616
                        })
705
308
                        .collect::<Result<Vec<_>, _>>()?;
706

            
707
308
                    list.sort();
708
308

            
709
308
                    values.extend(
710
308
                        view_entries
711
308
                            .get_multiple(list.iter().map(Vec::as_slice))?
712
308
                            .into_iter()
713
616
                            .map(|(_, value)| value),
714
                    );
715
                }
716
            }
717
        } else {
718
4499
            view_entries.scan::<Infallible, _, _, _, _>(
719
4499
                &(..),
720
4499
                forwards,
721
4499
                |_, _, _| ScanEvaluation::ReadData,
722
4499
                |_, _| {
723
5860
                    if let Some(limit) = limit {
724
                        if values_read >= limit {
725
                            return ScanEvaluation::Stop;
726
                        }
727
                        values_read += 1;
728
5860
                    }
729
5860
                    ScanEvaluation::ReadData
730
5939
                },
731
5939
                |_, _, value| {
732
5860
                    values.push(value);
733
5860
                    Ok(())
734
5939
                },
735
4499
            )?;
736
        }
737

            
738
872922
        values
739
872922
            .into_iter()
740
872922
            .map(|value| bincode::deserialize(&value).map_err(Error::from))
741
872922
            .collect::<Result<Vec<_>, Error>>()
742
872922
    }
743

            
744
    #[cfg(any(feature = "encryption", feature = "compression"))]
745
3123315
    pub(crate) fn collection_encryption_key(&self, collection: &CollectionName) -> Option<&KeyId> {
746
3123315
        self.schematic()
747
3123315
            .encryption_key_for_collection(collection)
748
3123315
            .or_else(|| self.storage.default_encryption_key())
749
3123315
    }
750

            
751
    #[cfg_attr(
752
        not(feature = "encryption"),
753
        allow(
754
            unused_mut,
755
            unused_variables,
756
            clippy::unused_self,
757
            clippy::let_and_return
758
        )
759
    )]
760
    #[allow(clippy::unnecessary_wraps)]
761
2349855
    pub(crate) fn collection_tree<R: Root, S: Into<Cow<'static, str>>>(
762
2349855
        &self,
763
2349855
        collection: &CollectionName,
764
2349855
        name: S,
765
2349855
    ) -> Result<TreeRoot<R, AnyFile>, Error> {
766
2349855
        let mut tree = R::tree(name);
767
2349855

            
768
2349855
        #[cfg(any(feature = "encryption", feature = "compression"))]
769
2349855
        match (
770
2349855
            self.collection_encryption_key(collection),
771
2349855
            self.storage().tree_vault().cloned(),
772
        ) {
773
31229
            (Some(override_key), Some(mut vault)) => {
774
31229
                #[cfg(feature = "encryption")]
775
31229
                {
776
31229
                    vault.key = Some(override_key.clone());
777
31229
                    tree = tree.with_vault(vault);
778
31229
                }
779

            
780
                #[cfg(not(feature = "encryption"))]
781
                {
782
                    return Err(Error::EncryptionDisabled);
783
                }
784
            }
785
561620
            (None, Some(vault)) => {
786
561620
                tree = tree.with_vault(vault);
787
561620
            }
788
1757006
            (key, None) => {
789
                #[cfg(feature = "encryption")]
790
1757006
                if let Some(vault) = TreeVault::new_if_needed(
791
1757006
                    key.cloned(),
792
1757006
                    self.storage().vault(),
793
1757006
                    #[cfg(feature = "compression")]
794
1757006
                    None,
795
1757006
                ) {
796
33093
                    tree = tree.with_vault(vault);
797
1723913
                }
798

            
799
                #[cfg(not(feature = "encryption"))]
800
                if key.is_some() {
801
                    return Err(Error::EncryptionDisabled);
802
                }
803
            }
804
        }
805

            
806
2349855
        Ok(tree)
807
2349855
    }
808

            
809
1
    pub(crate) fn update_key_expiration<'key>(
810
1
        &self,
811
1
        tree_key: impl Into<Cow<'key, str>>,
812
1
        expiration: Option<Timestamp>,
813
1
    ) {
814
1
        self.data
815
1
            .context
816
1
            .update_key_expiration(tree_key, expiration);
817
1
    }
818

            
819
    /// Converts this instance into its blocking version, which is able to be
820
    /// used without async. The returned instance uses the current Tokio runtime
821
    /// handle to spawn blocking tasks.
822
    ///
823
    /// # Panics
824
    ///
825
    /// Panics if called outside the context of a Tokio runtime.
826
    #[cfg(feature = "async")]
827
    #[must_use]
828
2151518
    pub fn into_async(self) -> crate::AsyncDatabase {
829
2151518
        self.into_async_with_runtime(tokio::runtime::Handle::current())
830
2151518
    }
831

            
832
    /// Converts this instance into its blocking version, which is able to be
833
    /// used without async. The returned instance uses the provided runtime
834
    /// handle to spawn blocking tasks.
835
    #[cfg(feature = "async")]
836
    #[must_use]
837
2151518
    pub fn into_async_with_runtime(self, runtime: tokio::runtime::Handle) -> crate::AsyncDatabase {
838
2151518
        crate::AsyncDatabase {
839
2151518
            database: self,
840
2151518
            runtime: Arc::new(runtime),
841
2151518
        }
842
2151518
    }
843

            
844
    /// Converts this instance into its blocking version, which is able to be
845
    /// used without async. The returned instance uses the current Tokio runtime
846
    /// handle to spawn blocking tasks.
847
    ///
848
    /// # Panics
849
    ///
850
    /// Panics if called outside the context of a Tokio runtime.
851
    #[cfg(feature = "async")]
852
    #[must_use]
853
    pub fn to_async(&self) -> crate::AsyncDatabase {
854
        self.clone().into_async()
855
    }
856

            
857
    /// Converts this instance into its blocking version, which is able to be
858
    /// used without async. The returned instance uses the provided runtime
859
    /// handle to spawn blocking tasks.
860
    #[cfg(feature = "async")]
861
    #[must_use]
862
    pub fn to_async_with_runtime(&self, runtime: tokio::runtime::Handle) -> crate::AsyncDatabase {
863
        self.clone().into_async_with_runtime(runtime)
864
    }
865
}
866
59
#[derive(Serialize, Deserialize)]
867
struct LegacyHeader {
868
    id: u64,
869
    revision: Revision,
870
}
871
59
#[derive(Serialize, Deserialize)]
872
struct LegacyDocument<'a> {
873
    header: LegacyHeader,
874
    #[serde(borrow)]
875
    contents: &'a [u8],
876
}
877

            
878
1160670
pub(crate) fn deserialize_document(bytes: &[u8]) -> Result<BorrowedDocument<'_>, Error> {
879
1160670
    match pot::from_slice::<BorrowedDocument<'_>>(bytes) {
880
1160611
        Ok(document) => Ok(document),
881
59
        Err(err) => match bincode::deserialize::<LegacyDocument<'_>>(bytes) {
882
59
            Ok(legacy_doc) => Ok(BorrowedDocument {
883
59
                header: Header {
884
59
                    id: DocumentId::from_u64(legacy_doc.header.id),
885
59
                    revision: legacy_doc.header.revision,
886
59
                },
887
59
                contents: CowBytes::from(legacy_doc.contents),
888
59
            }),
889
            Err(_) => Err(Error::from(err)),
890
        },
891
    }
892
1160670
}
893

            
894
728570
fn serialize_document(document: &BorrowedDocument<'_>) -> Result<Vec<u8>, bonsaidb_core::Error> {
895
728570
    pot::to_vec(document)
896
728570
        .map_err(Error::from)
897
728570
        .map_err(bonsaidb_core::Error::from)
898
728570
}
899

            
900
impl HasSession for Database {
901
4203947
    fn session(&self) -> Option<&Session> {
902
4203947
        self.storage.session()
903
4203947
    }
904
}
905

            
906
impl Connection for Database {
907
    type Storage = Storage;
908

            
909
5001187
    fn storage(&self) -> Self::Storage {
910
5001187
        self.storage.clone()
911
5001187
    }
912

            
913
    fn list_executed_transactions(
914
        &self,
915
        starting_id: Option<u64>,
916
        result_limit: Option<u32>,
917
    ) -> Result<Vec<transaction::Executed>, bonsaidb_core::Error> {
918
32304
        self.check_permission(
919
32304
            database_resource_name(self.name()),
920
32304
            &BonsaiAction::Database(DatabaseAction::Transaction(TransactionAction::ListExecuted)),
921
32304
        )?;
922
32304
        let result_limit = usize::try_from(
923
32304
            result_limit
924
32304
                .unwrap_or(LIST_TRANSACTIONS_DEFAULT_RESULT_COUNT)
925
32304
                .min(LIST_TRANSACTIONS_MAX_RESULTS),
926
32304
        )
927
32304
        .unwrap();
928
32304
        if result_limit > 0 {
929
32180
            let range = if let Some(starting_id) = starting_id {
930
16518
                Range::from(starting_id..)
931
            } else {
932
15662
                Range::from(..)
933
            };
934

            
935
32180
            let mut entries = Vec::new();
936
32180
            self.roots()
937
32180
                .transactions()
938
306973
                .scan(range, |entry| {
939
306973
                    entries.push(entry);
940
306973
                    entries.len() < result_limit
941
306973
                })
942
32180
                .map_err(Error::from)?;
943

            
944
32180
            entries
945
32180
                .into_iter()
946
32180
                .map(|entry| {
947
306973
                    if let Some(data) = entry.data() {
948
306915
                        let changes = compat::deserialize_executed_transaction_changes(data)?;
949
306915
                        Ok(Some(transaction::Executed {
950
306915
                            id: entry.id,
951
306915
                            changes,
952
306915
                        }))
953
                    } else {
954
58
                        Ok(None)
955
                    }
956
306973
                })
957
32180
                .filter_map(Result::transpose)
958
32180
                .collect::<Result<Vec<_>, Error>>()
959
32180
                .map_err(bonsaidb_core::Error::from)
960
        } else {
961
            // A request was made to return an empty result? This should probably be
962
            // an error, but technically this is a correct response.
963
124
            Ok(Vec::default())
964
        }
965
32304
    }
966

            
967
    fn last_transaction_id(&self) -> Result<Option<u64>, bonsaidb_core::Error> {
968
484158
        self.check_permission(
969
484158
            database_resource_name(self.name()),
970
484158
            &BonsaiAction::Database(DatabaseAction::Transaction(TransactionAction::GetLastId)),
971
484158
        )?;
972
484158
        Ok(self.roots().transactions().current_transaction_id())
973
484158
    }
974

            
975
    fn compact(&self) -> Result<(), bonsaidb_core::Error> {
976
124
        self.check_permission(
977
124
            database_resource_name(self.name()),
978
124
            &BonsaiAction::Database(DatabaseAction::Compact),
979
124
        )?;
980
124
        self.storage()
981
124
            .instance
982
124
            .tasks()
983
124
            .compact_database(self.clone())?;
984
124
        Ok(())
985
124
    }
986

            
987
    fn compact_key_value_store(&self) -> Result<(), bonsaidb_core::Error> {
988
124
        self.check_permission(
989
124
            kv_resource_name(self.name()),
990
124
            &BonsaiAction::Database(DatabaseAction::Compact),
991
124
        )?;
992
124
        self.storage()
993
124
            .instance
994
124
            .tasks()
995
124
            .compact_key_value_store(self.clone())?;
996
124
        Ok(())
997
124
    }
998
}
999

            
impl LowLevelConnection for Database {
19764
    fn schematic(&self) -> &Schematic {
19764
        &self.data.schema
19764
    }

            
554846
    fn apply_transaction(
554846
        &self,
554846
        transaction: Transaction,
554846
    ) -> Result<Vec<OperationResult>, bonsaidb_core::Error> {
1328460
        for op in &transaction.operations {
773729
            let (resource, action) = match &op.command {
538369
                Command::Insert { .. } => (
538369
                    collection_resource_name(self.name(), &op.collection),
538369
                    BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Insert)),
538369
                ),
175628
                Command::Update { header, .. } => (
175628
                    document_resource_name(self.name(), &op.collection, &header.id),
175628
                    BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Update)),
175628
                ),
15278
                Command::Overwrite { id, .. } => (
15278
                    document_resource_name(self.name(), &op.collection, id),
15278
                    BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Overwrite)),
15278
                ),
44454
                Command::Delete { header } => (
44454
                    document_resource_name(self.name(), &op.collection, &header.id),
44454
                    BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Delete)),
44454
                ),
            };
773729
            self.check_permission(&resource, &action)?;
        }

            
554731
        let mut unique_view_tasks = Vec::new();
556621
        for collection_name in transaction
554731
            .operations
554731
            .iter()
773614
            .map(|op| &op.collection)
554731
            .collect::<HashSet<_>>()
        {
556620
            if let Some(views) = self.data.schema.views_in_collection(collection_name) {
1454119
                for view in views {
1124694
                    if view.unique() {
46635
                        if let Some(task) = self
46635
                            .storage
46635
                            .instance
46635
                            .tasks()
46635
                            .spawn_integrity_check(view, self)
1198
                        {
1198
                            unique_view_tasks.push(task);
45407
                        }
1078059
                    }
                }
227165
            }
        }

            
554731
        let mut unique_view_mapping_tasks = Vec::new();
555929
        for task in unique_view_tasks {
1228
            if let Some(spawned_task) = task.receive().map_err(Error::from)?.map_err(Error::from)? {
1106
                unique_view_mapping_tasks.push(spawned_task);
1106
            }
        }

            
555807
        for task in unique_view_mapping_tasks {
1136
            let mut task = task.lock();
1136
            if let Some(task) = task.take() {
1136
                task.receive().map_err(Error::from)?.map_err(Error::from)?;
            }
        }

            
554671
        self.apply_transaction_to_roots(&transaction)
554671
            .map_err(bonsaidb_core::Error::from)
554821
    }

            
    fn get_from_collection(
        &self,
        id: DocumentId,
        collection: &CollectionName,
    ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error> {
499377
        self.check_permission(
499377
            document_resource_name(self.name(), collection, &id),
499377
            &BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Get)),
499377
        )?;
499377
        let tree = self
499377
            .data
499377
            .context
499377
            .roots
499377
            .tree(self.collection_tree::<Versioned, _>(collection, document_tree_name(collection))?)
499377
            .map_err(Error::from)?;
499377
        if let Some(vec) = tree.get(id.as_ref()).map_err(Error::from)? {
483974
            Ok(Some(deserialize_document(&vec)?.into_owned()))
        } else {
15402
            Ok(None)
        }
499377
    }

            
    fn list_from_collection(
        &self,
        ids: Range<DocumentId>,
        sort: Sort,
        limit: Option<u32>,
        collection: &CollectionName,
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
723
        self.check_permission(
723
            collection_resource_name(self.name(), collection),
723
            &BonsaiAction::Database(DatabaseAction::Document(DocumentAction::List)),
723
        )?;
723
        let tree = self
723
            .data
723
            .context
723
            .roots
723
            .tree(self.collection_tree::<Versioned, _>(collection, document_tree_name(collection))?)
723
            .map_err(Error::from)?;
723
        let mut found_docs = Vec::new();
723
        let mut keys_read = 0;
723
        let ids = DocumentIdRange(ids);
723
        tree.scan(
723
            &ids.borrow_as_bytes(),
723
            match sort {
599
                Sort::Ascending => true,
124
                Sort::Descending => false,
            },
24
            |_, _, _| ScanEvaluation::ReadData,
723
            |_, _| {
1111
                if let Some(limit) = limit {
248
                    if keys_read >= limit {
124
                        return ScanEvaluation::Stop;
124
                    }
124

            
124
                    keys_read += 1;
863
                }
987
                ScanEvaluation::ReadData
1111
            },
723
            |_, _, doc| {
                found_docs.push(
987
                    deserialize_document(&doc)
987
                        .map(BorrowedDocument::into_owned)
987
                        .map_err(AbortError::Other)?,
                );
987
                Ok(())
987
            },
        )
723
        .map_err(|err| match err {
            AbortError::Other(err) => err,
            AbortError::Nebari(err) => crate::Error::from(err),
723
        })?;

            
723
        Ok(found_docs)
723
    }

            
    fn list_headers_from_collection(
        &self,
        ids: Range<DocumentId>,
        sort: Sort,
        limit: Option<u32>,
        collection: &CollectionName,
    ) -> Result<Vec<Header>, bonsaidb_core::Error> {
124
        self.check_permission(
124
            collection_resource_name(self.name(), collection),
124
            &BonsaiAction::Database(DatabaseAction::Document(DocumentAction::ListHeaders)),
124
        )?;
124
        let tree = self
124
            .data
124
            .context
124
            .roots
124
            .tree(self.collection_tree::<Versioned, _>(collection, document_tree_name(collection))?)
124
            .map_err(Error::from)?;
124
        let mut found_headers = Vec::new();
124
        let mut keys_read = 0;
124
        let ids = DocumentIdRange(ids);
124
        tree.scan(
124
            &ids.borrow_as_bytes(),
124
            match sort {
124
                Sort::Ascending => true,
                Sort::Descending => false,
            },
            |_, _, _| ScanEvaluation::ReadData,
124
            |_, _| {
248
                if let Some(limit) = limit {
                    if keys_read >= limit {
                        return ScanEvaluation::Stop;
                    }

            
                    keys_read += 1;
248
                }
248
                ScanEvaluation::ReadData
248
            },
124
            |_, _, doc| {
                found_headers.push(
248
                    deserialize_document(&doc)
248
                        .map(|doc| doc.header)
248
                        .map_err(AbortError::Other)?,
                );
248
                Ok(())
248
            },
        )
124
        .map_err(|err| match err {
            AbortError::Other(err) => err,
            AbortError::Nebari(err) => crate::Error::from(err),
124
        })?;

            
124
        Ok(found_headers)
124
    }

            
    fn count_from_collection(
        &self,
        ids: Range<DocumentId>,
        collection: &CollectionName,
    ) -> Result<u64, bonsaidb_core::Error> {
248
        self.check_permission(
248
            collection_resource_name(self.name(), collection),
248
            &BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Count)),
248
        )?;
248
        let tree = self
248
            .data
248
            .context
248
            .roots
248
            .tree(self.collection_tree::<Versioned, _>(collection, document_tree_name(collection))?)
248
            .map_err(Error::from)?;
248
        let ids = DocumentIdRange(ids);
248
        let stats = tree.reduce(&ids.borrow_as_bytes()).map_err(Error::from)?;

            
248
        Ok(stats.alive_keys)
248
    }

            
281848
    fn get_multiple_from_collection(
281848
        &self,
281848
        ids: &[DocumentId],
281848
        collection: &CollectionName,
281848
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
561732
        for id in ids {
279884
            self.check_permission(
279884
                document_resource_name(self.name(), collection, id),
279884
                &BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Get)),
279884
            )?;
        }
281848
        let mut ids = ids.to_vec();
281848
        let collection = collection.clone();
281848
        let tree = self
281848
            .data
281848
            .context
281848
            .roots
281848
            .tree(
281848
                self.collection_tree::<Versioned, _>(&collection, document_tree_name(&collection))?,
            )
281848
            .map_err(Error::from)?;
281848
        ids.sort();
281848
        let keys_and_values = tree
281864
            .get_multiple(ids.iter().map(|id| id.as_ref()))
281848
            .map_err(Error::from)?;

            
281848
        keys_and_values
281848
            .into_iter()
281864
            .map(|(_, value)| deserialize_document(&value).map(BorrowedDocument::into_owned))
281848
            .collect::<Result<Vec<_>, Error>>()
281848
            .map_err(bonsaidb_core::Error::from)
281848
    }

            
    fn compact_collection_by_name(
        &self,
        collection: CollectionName,
    ) -> Result<(), bonsaidb_core::Error> {
124
        self.check_permission(
124
            collection_resource_name(self.name(), &collection),
124
            &BonsaiAction::Database(DatabaseAction::Compact),
124
        )?;
124
        self.storage()
124
            .instance
124
            .tasks()
124
            .compact_collection(self.clone(), collection)?;
124
        Ok(())
124
    }

            
306688
    fn query_by_name(
306688
        &self,
306688
        view: &ViewName,
306688
        key: Option<QueryKey<Bytes>>,
306688
        order: Sort,
306688
        limit: Option<u32>,
306688
        access_policy: AccessPolicy,
306688
    ) -> Result<Vec<schema::view::map::Serialized>, bonsaidb_core::Error> {
306688
        let view = self.schematic().view_by_name(view)?;
306688
        self.check_permission(
306688
            view_resource_name(self.name(), &view.view_name()),
306688
            &BonsaiAction::Database(DatabaseAction::View(ViewAction::Query)),
306688
        )?;
306688
        let mut results = Vec::new();
306688
        self.for_each_in_view(view, key, order, limit, access_policy, |entry| {
600188
            for mapping in entry.mappings {
300763
                results.push(bonsaidb_core::schema::view::map::Serialized {
300763
                    source: mapping.source,
300763
                    key: entry.key.clone(),
300763
                    value: mapping.value,
300763
                });
300763
            }
299425
            Ok(())
306688
        })?;

            
306688
        Ok(results)
306688
    }

            
    fn query_by_name_with_docs(
        &self,
        view: &ViewName,
        key: Option<QueryKey<Bytes>>,
        order: Sort,
        limit: Option<u32>,
        access_policy: AccessPolicy,
    ) -> Result<schema::view::map::MappedSerializedDocuments, bonsaidb_core::Error> {
        let results = self.query_by_name(view, key, order, limit, access_policy)?;
        let view = self.schematic().view_by_name(view).unwrap(); // query() will fail if it's not present

            
        let documents = self
            .get_multiple_from_collection(
                &results.iter().map(|m| m.source.id).collect::<Vec<_>>(),
                &view.collection(),
            )?
            .into_iter()
            .map(|doc| (doc.header.id, doc))
            .collect::<BTreeMap<_, _>>();

            
        Ok(
            bonsaidb_core::schema::view::map::MappedSerializedDocuments {
                mappings: results,
                documents,
            },
        )
    }

            
565790
    fn reduce_by_name(
565790
        &self,
565790
        view_name: &ViewName,
565790
        key: Option<QueryKey<Bytes>>,
565790
        access_policy: AccessPolicy,
565790
    ) -> Result<Vec<u8>, bonsaidb_core::Error> {
565790
        let mut mappings = self.reduce_grouped_by_name(view_name, key, access_policy)?;

            
565790
        let result = if mappings.len() == 1 {
414158
            mappings.pop().unwrap().value.into_vec()
        } else {
151632
            let view = self.data.schema.view_by_name(view_name)?;
151632
            view.reduce(
151632
                &mappings
151632
                    .iter()
151632
                    .map(|map| (map.key.as_ref(), map.value.as_ref()))
151632
                    .collect::<Vec<_>>(),
151632
                true,
151632
            )
151632
            .map_err(Error::from)?
        };

            
565636
        Ok(result)
565760
    }

            
566110
    fn reduce_grouped_by_name(
566110
        &self,
566110
        view_name: &ViewName,
566110
        key: Option<QueryKey<Bytes>>,
566110
        access_policy: AccessPolicy,
566110
    ) -> Result<Vec<MappedSerializedValue>, bonsaidb_core::Error> {
566110
        let view = self.data.schema.view_by_name(view_name)?;
566110
        self.check_permission(
566110
            view_resource_name(self.name(), &view.view_name()),
566110
            &BonsaiAction::Database(DatabaseAction::View(ViewAction::Reduce)),
566110
        )?;
566110
        let mut mappings = Vec::new();
566126
        self.for_each_in_view(view, key, Sort::Ascending, None, access_policy, |entry| {
419696
            mappings.push(MappedSerializedValue {
419696
                key: entry.key,
419696
                value: entry.reduced_value,
419696
            });
419696
            Ok(())
566126
        })?;

            
566110
        Ok(mappings)
566110
    }

            
124
    fn delete_docs_by_name(
124
        &self,
124
        view: &ViewName,
124
        key: Option<QueryKey<Bytes>>,
124
        access_policy: AccessPolicy,
124
    ) -> Result<u64, bonsaidb_core::Error> {
124
        let view = self.data.schema.view_by_name(view)?;
124
        let collection = view.collection();
124
        let mut transaction = Transaction::default();
124
        self.for_each_in_view(view, key, Sort::Ascending, None, access_policy, |entry| {
372
            for mapping in entry.mappings {
248
                transaction.push(Operation::delete(collection.clone(), mapping.source));
248
            }

            
124
            Ok(())
124
        })?;

            
124
        let results = LowLevelConnection::apply_transaction(self, transaction)?;

            
124
        Ok(results.len() as u64)
124
    }
}

            
type ViewIterator<'a> =
    Box<dyn Iterator<Item = Result<(ArcBytes<'static>, ArcBytes<'static>), Error>> + 'a>;

            
struct ViewEntryCollectionIterator<'a> {
    iterator: ViewIterator<'a>,
}

            
impl<'a> Iterator for ViewEntryCollectionIterator<'a> {
    type Item = Result<ViewEntry, crate::Error>;

            
    fn next(&mut self) -> Option<Self::Item> {
        self.iterator.next().map(|item| {
            item.map_err(crate::Error::from)
                .and_then(|(_, value)| bincode::deserialize(&value).map_err(Error::from))
        })
    }
}

            
2203531
#[derive(Debug, Clone)]
pub(crate) struct Context {
    data: Arc<ContextData>,
}

            
impl Deref for Context {
    type Target = ContextData;

            
3465713
    fn deref(&self) -> &Self::Target {
3465713
        &self.data
3465713
    }
}

            
#[derive(Debug)]
pub(crate) struct ContextData {
    pub(crate) roots: Roots<AnyFile>,
    key_value_state: Arc<Mutex<keyvalue::KeyValueState>>,
}

            
impl Borrow<Roots<AnyFile>> for Context {
    fn borrow(&self) -> &Roots<AnyFile> {
        &self.data.roots
    }
}

            
impl Context {
26876
    pub(crate) fn new(roots: Roots<AnyFile>, key_value_persistence: KeyValuePersistence) -> Self {
26876
        let background_worker_target = Watchable::new(BackgroundWorkerProcessTarget::Never);
26876
        let mut background_worker_target_watcher = background_worker_target.watch();
26876
        let key_value_state = Arc::new(Mutex::new(keyvalue::KeyValueState::new(
26876
            key_value_persistence,
26876
            roots.clone(),
26876
            background_worker_target,
26876
        )));
26876
        let background_worker_state = Arc::downgrade(&key_value_state);
26876
        let context = Self {
26876
            data: Arc::new(ContextData {
26876
                roots,
26876
                key_value_state,
26876
            }),
26876
        };
26876
        std::thread::Builder::new()
26876
            .name(String::from("keyvalue-worker"))
26877
            .spawn(move || {
26877
                keyvalue::background_worker(
26877
                    &background_worker_state,
26877
                    &mut background_worker_target_watcher,
26877
                );
26877
            })
26876
            .unwrap();
26876
        context
26876
    }

            
1253939
    pub(crate) fn perform_kv_operation(
1253939
        &self,
1253939
        op: KeyOperation,
1253939
    ) -> Result<Output, bonsaidb_core::Error> {
1253939
        let mut state = self.data.key_value_state.lock();
1253939
        state.perform_kv_operation(op, &self.data.key_value_state)
1253939
    }

            
14
    pub(crate) fn update_key_expiration<'key>(
14
        &self,
14
        tree_key: impl Into<Cow<'key, str>>,
14
        expiration: Option<Timestamp>,
14
    ) {
14
        let mut state = self.data.key_value_state.lock();
14
        state.update_key_expiration(tree_key, expiration);
14
    }

            
    #[cfg(test)]
1
    pub(crate) fn kv_persistence_watcher(&self) -> watchable::Watcher<Timestamp> {
1
        let state = self.data.key_value_state.lock();
1
        state.persistence_watcher()
1
    }
}

            
impl Drop for ContextData {
    fn drop(&mut self) {
24027
        if let Some(shutdown) = {
24027
            let mut state = self.key_value_state.lock();
24027
            state.shutdown(&self.key_value_state)
24027
        } {
301
            let _ = shutdown.recv();
23726
        }
24027
    }
}

            
2591094
pub fn document_tree_name(collection: &CollectionName) -> String {
2591094
    format!("collection.{:#}", collection)
2591094
}

            
pub struct DocumentIdRange(Range<DocumentId>);

            
impl<'a> BorrowByteRange<'a> for DocumentIdRange {
1095
    fn borrow_as_bytes(&'a self) -> BorrowedRange<'a> {
1095
        BorrowedRange {
1095
            start: match &self.0.start {
475
                connection::Bound::Unbounded => ops::Bound::Unbounded,
620
                connection::Bound::Included(docid) => ops::Bound::Included(docid.as_ref()),
                connection::Bound::Excluded(docid) => ops::Bound::Excluded(docid.as_ref()),
            },
1095
            end: match &self.0.end {
475
                connection::Bound::Unbounded => ops::Bound::Unbounded,
496
                connection::Bound::Included(docid) => ops::Bound::Included(docid.as_ref()),
124
                connection::Bound::Excluded(docid) => ops::Bound::Excluded(docid.as_ref()),
            },
        }
1095
    }
}

            
/// Operations that can be performed on both [`Database`] and
/// [`AsyncDatabase`](crate::AsyncDatabase).
pub trait DatabaseNonBlocking {
    /// Returns the name of the database.
    #[must_use]
    fn name(&self) -> &str;
}

            
impl DatabaseNonBlocking for Database {
4211169
    fn name(&self) -> &str {
4211169
        self.data.name.as_ref()
4211169
    }
}