1
use std::{
2
    borrow::{Borrow, Cow},
3
    collections::{BTreeMap, HashMap, HashSet},
4
    convert::Infallible,
5
    ops::{self, Deref},
6
    sync::Arc,
7
    u8,
8
};
9

            
10
#[cfg(any(feature = "encryption", feature = "compression"))]
11
use bonsaidb_core::document::KeyId;
12
use bonsaidb_core::{
13
    arc_bytes::{
14
        serde::{Bytes, CowBytes},
15
        ArcBytes,
16
    },
17
    connection::{
18
        self, AccessPolicy, Connection, HasSession, LowLevelConnection, QueryKey, Range, Session,
19
        Sort, StorageConnection,
20
    },
21
    document::{BorrowedDocument, DocumentId, Header, OwnedDocument, Revision},
22
    key::Key,
23
    keyvalue::{KeyOperation, Output, Timestamp},
24
    limits::{LIST_TRANSACTIONS_DEFAULT_RESULT_COUNT, LIST_TRANSACTIONS_MAX_RESULTS},
25
    permissions::{
26
        bonsai::{
27
            collection_resource_name, database_resource_name, document_resource_name,
28
            kv_resource_name, view_resource_name, BonsaiAction, DatabaseAction, DocumentAction,
29
            TransactionAction, ViewAction,
30
        },
31
        Permissions,
32
    },
33
    schema::{
34
        self,
35
        view::{self, map::MappedSerializedValue},
36
        CollectionName, Schema, Schematic, ViewName,
37
    },
38
    transaction::{
39
        self, ChangedDocument, Changes, Command, DocumentChanges, Operation, OperationResult,
40
        Transaction,
41
    },
42
};
43
use itertools::Itertools;
44
use nebari::{
45
    io::any::AnyFile,
46
    tree::{
47
        AnyTreeRoot, BorrowByteRange, BorrowedRange, CompareSwap, Root, ScanEvaluation, TreeRoot,
48
        Unversioned, Versioned,
49
    },
50
    AbortError, ExecutingTransaction, Roots, Tree,
51
};
52
use parking_lot::Mutex;
53
use serde::{Deserialize, Serialize};
54
use watchable::Watchable;
55

            
56
#[cfg(feature = "encryption")]
57
use crate::storage::TreeVault;
58
use crate::{
59
    config::{Builder, KeyValuePersistence, StorageConfiguration},
60
    database::keyvalue::BackgroundWorkerProcessTarget,
61
    error::Error,
62
    open_trees::OpenTrees,
63
    views::{
64
        mapper, view_document_map_tree_name, view_entries_tree_name,
65
        view_invalidated_docs_tree_name, ViewEntry,
66
    },
67
    Storage,
68
};
69

            
70
pub mod keyvalue;
71

            
72
pub(crate) mod compat;
73
pub mod pubsub;
74

            
75
/// A database stored in BonsaiDb. This type blocks the current thread when
76
/// used. See [`AsyncDatabase`](crate::AsyncDatabase) for this type's async counterpart.
77
///
78
/// ## Converting between Blocking and Async Types
79
///
80
/// [`AsyncDatabase`](crate::AsyncDatabase) and [`Database`] can be converted to and from each other
81
/// using:
82
///
83
/// - [`AsyncDatabase::into_blocking()`](crate::AsyncDatabase::into_blocking)
84
/// - [`AsyncDatabase::to_blocking()`](crate::AsyncDatabase::to_blocking)
85
/// - [`AsyncDatabase::as_blocking()`](crate::AsyncDatabase::as_blocking)
86
/// - [`Database::into_async()`]
87
/// - [`Database::to_async()`]
88
/// - [`Database::into_async_with_runtime()`]
89
/// - [`Database::to_async_with_runtime()`]
90
///
91
/// ## Using `Database` to create a single database
92
///
93
/// `Database`provides an easy mechanism to open and access a single database:
94
///
95
/// ```rust
96
/// // `bonsaidb_core` is re-exported to `bonsaidb::core` or `bonsaidb_local::core`.
97
/// use bonsaidb_core::schema::Collection;
98
/// // `bonsaidb_local` is re-exported to `bonsaidb::local` if using the omnibus crate.
99
/// use bonsaidb_local::{
100
///     config::{Builder, StorageConfiguration},
101
///     Database,
102
/// };
103
/// use serde::{Deserialize, Serialize};
104
///
105
/// #[derive(Debug, Serialize, Deserialize, Collection)]
106
/// #[collection(name = "blog-posts")]
107
/// # #[collection(core = bonsaidb_core)]
108
/// struct BlogPost {
109
///     pub title: String,
110
///     pub contents: String,
111
/// }
112
///
113
/// # fn test() -> Result<(), bonsaidb_local::Error> {
114
/// let db = Database::open::<BlogPost>(StorageConfiguration::new("my-db.bonsaidb"))?;
115
/// # Ok(())
116
/// # }
117
/// ```
118
///
119
/// Under the hood, this initializes a [`Storage`] instance pointing at
120
/// "./my-db.bonsaidb". It then returns (or creates) a database named "default"
121
/// with the schema `BlogPost`.
122
///
123
/// In this example, `BlogPost` implements the [`Collection`](schema::Collection) trait, and all
124
/// collections can be used as a [`Schema`].
125
3684215
#[derive(Debug, Clone)]
126
pub struct Database {
127
    pub(crate) data: Arc<Data>,
128
    pub(crate) storage: Storage,
129
}
130

            
131
#[derive(Debug)]
132
pub struct Data {
133
    pub name: Arc<Cow<'static, str>>,
134
    context: Context,
135
    pub(crate) schema: Arc<Schematic>,
136
}
137

            
138
impl Database {
139
    /// Opens a local file as a bonsaidb.
140
122050
    pub(crate) fn new<DB: Schema, S: Into<Cow<'static, str>> + Send>(
141
122050
        name: S,
142
122050
        context: Context,
143
122050
        storage: &Storage,
144
122050
    ) -> Result<Self, Error> {
145
122050
        let name = name.into();
146
122050
        let schema = Arc::new(DB::schematic()?);
147
122050
        let db = Self {
148
122050
            storage: storage.clone(),
149
122050
            data: Arc::new(Data {
150
122050
                name: Arc::new(name),
151
122050
                context,
152
122050
                schema,
153
122050
            }),
154
122050
        };
155
122050

            
156
122050
        if storage.instance.check_view_integrity_on_database_open() {
157
16
            for view in db.data.schema.views() {
158
16
                storage.instance.tasks().spawn_integrity_check(view, &db);
159
16
            }
160
122046
        }
161

            
162
122050
        storage
163
122050
            .instance
164
122050
            .tasks()
165
122050
            .spawn_key_value_expiration_loader(&db);
166
122050

            
167
122050
        Ok(db)
168
122050
    }
169

            
170
    /// Restricts an unauthenticated instance to having `effective_permissions`.
171
    /// Returns `None` if a session has already been established.
172
    #[must_use]
173
    pub fn with_effective_permissions(&self, effective_permissions: Permissions) -> Option<Self> {
174
        self.storage
175
            .with_effective_permissions(effective_permissions)
176
            .map(|storage| Self {
177
                storage,
178
                data: self.data.clone(),
179
            })
180
    }
181

            
182
    /// Creates a `Storage` with a single-database named "default" with its data
183
    /// stored at `path`. This requires exclusive access to the storage location
184
    /// configured. Attempting to open the same path multiple times concurrently
185
    /// will lead to errors.
186
    ///
187
    /// Using this method is perfect if only one database is being used.
188
    /// However, if multiple databases are needed, it is much better to store
189
    /// multiple databases in a single [`Storage`] instance rather than creating
190
    /// multiple independent databases using this method.
191
    ///
192
    /// When opening multiple databases using this function, each database will
193
    /// have its own thread pool, cache, task worker pool, and more. By using a
194
    /// single [`Storage`] instance, BonsaiDb will use less resources and likely
195
    /// perform better.
196
29
    pub fn open<DB: Schema>(configuration: StorageConfiguration) -> Result<Self, Error> {
197
29
        let storage = Storage::open(configuration.with_schema::<DB>()?)?;
198

            
199
29
        Ok(storage.create_database::<DB>("default", true)?)
200
29
    }
201

            
202
    /// Returns the [`Schematic`] for the schema for this database.
203
    #[must_use]
204
3654455
    pub fn schematic(&self) -> &'_ Schematic {
205
3654455
        &self.data.schema
206
3654455
    }
207

            
208
2024051
    pub(crate) fn roots(&self) -> &'_ nebari::Roots<AnyFile> {
209
2024051
        &self.data.context.roots
210
2024051
    }
211

            
212
823130
    fn for_each_in_view<F: FnMut(ViewEntry) -> Result<(), bonsaidb_core::Error> + Send + Sync>(
213
823130
        &self,
214
823130
        view: &dyn view::Serialized,
215
823130
        key: Option<QueryKey<Bytes>>,
216
823130
        order: Sort,
217
823130
        limit: Option<u32>,
218
823130
        access_policy: AccessPolicy,
219
823130
        mut callback: F,
220
823130
    ) -> Result<(), bonsaidb_core::Error> {
221
823130
        if matches!(access_policy, AccessPolicy::UpdateBefore) {
222
306516
            self.storage
223
306516
                .instance
224
306516
                .tasks()
225
306516
                .update_view_if_needed(view, self)?;
226
516614
        }
227

            
228
823130
        let view_entries = self
229
823130
            .roots()
230
823130
            .tree(self.collection_tree(
231
823130
                &view.collection(),
232
823130
                view_entries_tree_name(&view.view_name()),
233
823130
            )?)
234
823130
            .map_err(Error::from)?;
235

            
236
        {
237
823130
            for entry in Self::create_view_iterator(&view_entries, key, order, limit)? {
238
508113
                callback(entry)?;
239
            }
240
        }
241

            
242
823130
        if matches!(access_policy, AccessPolicy::UpdateAfter) {
243
124
            let db = self.clone();
244
124
            let view_name = view.view_name();
245
124
            let view = db
246
124
                .data
247
124
                .schema
248
124
                .view_by_name(&view_name)
249
124
                .expect("query made with view that isn't registered with this database");
250
124
            db.storage
251
124
                .instance
252
124
                .tasks()
253
124
                .update_view_if_needed(view, &db)?;
254
822976
        }
255

            
256
823100
        Ok(())
257
823100
    }
258

            
259
545500
    fn apply_transaction_to_roots(
260
545500
        &self,
261
545500
        transaction: &Transaction,
262
545500
    ) -> Result<Vec<OperationResult>, Error> {
263
545500
        let mut open_trees = OpenTrees::default();
264
1271958
        for op in &transaction.operations {
265
726582
            if !self.data.schema.contains_collection_name(&op.collection) {
266
124
                return Err(Error::Core(bonsaidb_core::Error::CollectionNotFound));
267
726458
            }
268

            
269
            #[cfg(any(feature = "encryption", feature = "compression"))]
270
726458
            let vault = if let Some(encryption_key) =
271
726458
                self.collection_encryption_key(&op.collection).cloned()
272
            {
273
                #[cfg(feature = "encryption")]
274
3535
                if let Some(mut vault) = self.storage().tree_vault().cloned() {
275
2094
                    vault.key = Some(encryption_key);
276
2094
                    Some(vault)
277
                } else {
278
1441
                    TreeVault::new_if_needed(
279
1441
                        Some(encryption_key),
280
1441
                        self.storage().vault(),
281
1441
                        #[cfg(feature = "compression")]
282
1441
                        None,
283
1441
                    )
284
                }
285

            
286
                #[cfg(not(feature = "encryption"))]
287
                {
288
                    drop(encryption_key);
289
                    return Err(Error::EncryptionDisabled);
290
                }
291
            } else {
292
722923
                self.storage().tree_vault().cloned()
293
            };
294

            
295
726458
            open_trees.open_trees_for_document_change(
296
726458
                &op.collection,
297
726458
                &self.data.schema,
298
726458
                #[cfg(any(feature = "encryption", feature = "compression"))]
299
726458
                vault,
300
726458
            );
301
        }
302

            
303
545376
        let mut roots_transaction = self
304
545376
            .data
305
545376
            .context
306
545376
            .roots
307
545376
            .transaction::<_, dyn AnyTreeRoot<AnyFile>>(&open_trees.trees)?;
308

            
309
545376
        let mut results = Vec::new();
310
545376
        let mut changed_documents = Vec::new();
311
545376
        let mut collection_indexes = HashMap::new();
312
545376
        let mut collections = Vec::new();
313
1255843
        for op in &transaction.operations {
314
726428
            let result = self.execute_operation(
315
726428
                op,
316
726428
                &mut roots_transaction,
317
726428
                &open_trees.trees_index_by_name,
318
726428
            )?;
319

            
320
710497
            if let Some((collection, id, deleted)) = match &result {
321
665709
                OperationResult::DocumentUpdated { header, collection } => {
322
665709
                    Some((collection, header.id, false))
323
                }
324
44758
                OperationResult::DocumentDeleted { id, collection } => {
325
44758
                    Some((collection, *id, true))
326
                }
327
                OperationResult::Success => None,
328
710467
            } {
329
710497
                let collection = match collection_indexes.get(collection) {
330
179194
                    Some(index) => *index,
331
                    None => {
332
531303
                        if let Ok(id) = u16::try_from(collections.len()) {
333
531273
                            collection_indexes.insert(collection.clone(), id);
334
531273
                            collections.push(collection.clone());
335
531273
                            id
336
                        } else {
337
                            return Err(Error::TransactionTooLarge);
338
                        }
339
                    }
340
                };
341
710467
                changed_documents.push(ChangedDocument {
342
710467
                    collection,
343
710467
                    id,
344
710467
                    deleted,
345
710467
                });
346
            }
347
710467
            results.push(result);
348
        }
349

            
350
529415
        self.invalidate_changed_documents(
351
529415
            &mut roots_transaction,
352
529415
            &open_trees,
353
529415
            &collections,
354
529415
            &changed_documents,
355
529415
        )?;
356

            
357
529415
        roots_transaction
358
529415
            .entry_mut()
359
529415
            .set_data(compat::serialize_executed_transaction_changes(
360
529415
                &Changes::Documents(DocumentChanges {
361
529415
                    collections,
362
529415
                    documents: changed_documents,
363
529415
                }),
364
529415
            )?)?;
365

            
366
529415
        roots_transaction.commit()?;
367

            
368
529415
        Ok(results)
369
545500
    }
370

            
371
529415
    fn invalidate_changed_documents(
372
529415
        &self,
373
529415
        roots_transaction: &mut ExecutingTransaction<AnyFile>,
374
529415
        open_trees: &OpenTrees,
375
529415
        collections: &[CollectionName],
376
529415
        changed_documents: &[ChangedDocument],
377
529415
    ) -> Result<(), Error> {
378
531305
        for (collection, changed_documents) in &changed_documents
379
529415
            .iter()
380
710497
            .group_by(|doc| &collections[usize::from(doc.collection)])
381
        {
382
531303
            if let Some(views) = self.data.schema.views_in_collection(collection) {
383
313652
                let changed_documents = changed_documents.collect::<Vec<_>>();
384
1063456
                for view in views.into_iter().filter(|view| !view.unique()) {
385
1017379
                    let view_name = view.view_name();
386
1017379
                    let tree_name = view_invalidated_docs_tree_name(&view_name);
387
2115474
                    for changed_document in &changed_documents {
388
1098095
                        let mut invalidated_docs = roots_transaction
389
1098095
                            .tree::<Unversioned>(open_trees.trees_index_by_name[&tree_name])
390
1098095
                            .unwrap();
391
1098095
                        invalidated_docs.set(changed_document.id.as_ref().to_vec(), b"")?;
392
                    }
393
                }
394
217651
            }
395
        }
396
529415
        Ok(())
397
529415
    }
398

            
399
726458
    fn execute_operation(
400
726458
        &self,
401
726458
        operation: &Operation,
402
726458
        transaction: &mut ExecutingTransaction<AnyFile>,
403
726458
        tree_index_map: &HashMap<String, usize>,
404
726458
    ) -> Result<OperationResult, Error> {
405
726458
        match &operation.command {
406
500754
            Command::Insert { id, contents } => {
407
500754
                self.execute_insert(operation, transaction, tree_index_map, *id, contents)
408
            }
409
165668
            Command::Update { header, contents } => self.execute_update(
410
165668
                operation,
411
165668
                transaction,
412
165668
                tree_index_map,
413
165668
                header.id,
414
165668
                Some(&header.revision),
415
165668
                contents,
416
165668
            ),
417
15278
            Command::Overwrite { id, contents } => {
418
15278
                self.execute_update(operation, transaction, tree_index_map, *id, None, contents)
419
            }
420
44758
            Command::Delete { header } => {
421
44758
                self.execute_delete(operation, transaction, tree_index_map, header)
422
            }
423
        }
424
726458
    }
425

            
426
180946
    fn execute_update(
427
180946
        &self,
428
180946
        operation: &Operation,
429
180946
        transaction: &mut ExecutingTransaction<AnyFile>,
430
180946
        tree_index_map: &HashMap<String, usize>,
431
180946
        id: DocumentId,
432
180946
        check_revision: Option<&Revision>,
433
180946
        contents: &[u8],
434
180946
    ) -> Result<OperationResult, crate::Error> {
435
180946
        let mut documents = transaction
436
180946
            .tree::<Versioned>(tree_index_map[&document_tree_name(&operation.collection)])
437
180946
            .unwrap();
438
180946
        let document_id = ArcBytes::from(id.to_vec());
439
180946
        let mut result = None;
440
180946
        let mut updated = false;
441
180946
        documents.modify(
442
180946
            vec![document_id.clone()],
443
180946
            nebari::tree::Operation::CompareSwap(CompareSwap::new(&mut |_key,
444
                                                                        value: Option<
445
                ArcBytes<'_>,
446
            >| {
447
180946
                if let Some(old) = value {
448
180822
                    let doc = match deserialize_document(&old) {
449
180822
                        Ok(doc) => doc,
450
                        Err(err) => {
451
                            result = Some(Err(err));
452
                            return nebari::tree::KeyOperation::Skip;
453
                        }
454
                    };
455
180822
                    if check_revision.is_none() || Some(&doc.header.revision) == check_revision {
456
180698
                        if let Some(updated_revision) = doc.header.revision.next_revision(contents)
457
                        {
458
180450
                            let updated_header = Header {
459
180450
                                id,
460
180450
                                revision: updated_revision,
461
180450
                            };
462
180450
                            let serialized_doc = match serialize_document(&BorrowedDocument {
463
180450
                                header: updated_header.clone(),
464
180450
                                contents: CowBytes::from(contents),
465
180450
                            }) {
466
180450
                                Ok(bytes) => bytes,
467
                                Err(err) => {
468
                                    result = Some(Err(Error::from(err)));
469
                                    return nebari::tree::KeyOperation::Skip;
470
                                }
471
                            };
472
180450
                            result = Some(Ok(OperationResult::DocumentUpdated {
473
180450
                                collection: operation.collection.clone(),
474
180450
                                header: updated_header,
475
180450
                            }));
476
180450
                            updated = true;
477
180450
                            return nebari::tree::KeyOperation::Set(ArcBytes::from(serialized_doc));
478
248
                        }
479
248

            
480
248
                        // If no new revision was made, it means an attempt to
481
248
                        // save a document with the same contents was made.
482
248
                        // We'll return a success but not actually give a new
483
248
                        // version
484
248
                        result = Some(Ok(OperationResult::DocumentUpdated {
485
248
                            collection: operation.collection.clone(),
486
248
                            header: doc.header,
487
248
                        }));
488
124
                    } else {
489
124
                        result = Some(Err(Error::Core(bonsaidb_core::Error::DocumentConflict(
490
124
                            operation.collection.clone(),
491
124
                            Box::new(doc.header),
492
124
                        ))));
493
124
                    }
494
124
                } else if check_revision.is_none() {
495
                    let doc = BorrowedDocument::new(id, contents);
496
                    match serialize_document(&doc).map(|bytes| (doc, bytes)) {
497
                        Ok((doc, serialized)) => {
498
                            result = Some(Ok(OperationResult::DocumentUpdated {
499
                                collection: operation.collection.clone(),
500
                                header: doc.header,
501
                            }));
502
                            return nebari::tree::KeyOperation::Set(ArcBytes::from(serialized));
503
                        }
504
                        Err(err) => {
505
                            result = Some(Err(Error::from(err)));
506
                        }
507
                    }
508
124
                } else {
509
124
                    result = Some(Err(Error::Core(bonsaidb_core::Error::DocumentNotFound(
510
124
                        operation.collection.clone(),
511
124
                        Box::new(id),
512
124
                    ))));
513
124
                }
514
496
                nebari::tree::KeyOperation::Skip
515
180946
            })),
516
180946
        )?;
517
180946
        drop(documents);
518
180946

            
519
180946
        if updated {
520
180450
            self.update_unique_views(&document_id, operation, transaction, tree_index_map)?;
521
496
        }
522

            
523
180698
        result.expect("nebari should invoke the callback even when the key isn't found")
524
180946
    }
525

            
526
500754
    fn execute_insert(
527
500754
        &self,
528
500754
        operation: &Operation,
529
500754
        transaction: &mut ExecutingTransaction<AnyFile>,
530
500754
        tree_index_map: &HashMap<String, usize>,
531
500754
        id: Option<DocumentId>,
532
500754
        contents: &[u8],
533
500754
    ) -> Result<OperationResult, Error> {
534
500754
        let mut documents = transaction
535
500754
            .tree::<Versioned>(tree_index_map[&document_tree_name(&operation.collection)])
536
500754
            .unwrap();
537
500754
        let id = if let Some(id) = id {
538
200414
            id
539
300340
        } else if let Some(last_key) = documents.last_key()? {
540
276617
            let id = DocumentId::try_from(last_key.as_slice())?;
541
276617
            self.data
542
276617
                .schema
543
276617
                .next_id_for_collection(&operation.collection, Some(id))?
544
        } else {
545
23723
            self.data
546
23723
                .schema
547
23723
                .next_id_for_collection(&operation.collection, None)?
548
        };
549

            
550
500724
        let doc = BorrowedDocument::new(id, contents);
551
500724
        let serialized: Vec<u8> = serialize_document(&doc)?;
552
500724
        let document_id = ArcBytes::from(doc.header.id.as_ref().to_vec());
553
500724
        if let Some(document) = documents.replace(document_id.clone(), serialized)? {
554
15155
            let doc = deserialize_document(&document)?;
555
15155
            Err(Error::Core(bonsaidb_core::Error::DocumentConflict(
556
15155
                operation.collection.clone(),
557
15155
                Box::new(doc.header),
558
15155
            )))
559
        } else {
560
485569
            drop(documents);
561
485569
            self.update_unique_views(&document_id, operation, transaction, tree_index_map)?;
562

            
563
485289
            Ok(OperationResult::DocumentUpdated {
564
485289
                collection: operation.collection.clone(),
565
485289
                header: doc.header,
566
485289
            })
567
        }
568
500754
    }
569

            
570
44758
    fn execute_delete(
571
44758
        &self,
572
44758
        operation: &Operation,
573
44758
        transaction: &mut ExecutingTransaction<AnyFile>,
574
44758
        tree_index_map: &HashMap<String, usize>,
575
44758
        header: &Header,
576
44758
    ) -> Result<OperationResult, Error> {
577
44758
        let mut documents = transaction
578
44758
            .tree::<Versioned>(tree_index_map[&document_tree_name(&operation.collection)])
579
44758
            .unwrap();
580
44758
        if let Some(vec) = documents.remove(header.id.as_ref())? {
581
44758
            drop(documents);
582
44758
            let doc = deserialize_document(&vec)?;
583
44758
            if &doc.header == header {
584
44758
                self.update_unique_views(
585
44758
                    &ArcBytes::from(doc.header.id.to_vec()),
586
44758
                    operation,
587
44758
                    transaction,
588
44758
                    tree_index_map,
589
44758
                )?;
590

            
591
44758
                Ok(OperationResult::DocumentDeleted {
592
44758
                    collection: operation.collection.clone(),
593
44758
                    id: header.id,
594
44758
                })
595
            } else {
596
                Err(Error::Core(bonsaidb_core::Error::DocumentConflict(
597
                    operation.collection.clone(),
598
                    Box::new(header.clone()),
599
                )))
600
            }
601
        } else {
602
            Err(Error::Core(bonsaidb_core::Error::DocumentNotFound(
603
                operation.collection.clone(),
604
                Box::new(header.id),
605
            )))
606
        }
607
44758
    }
608

            
609
    fn update_unique_views(
610
        &self,
611
        document_id: &ArcBytes<'static>,
612
        operation: &Operation,
613
        transaction: &mut ExecutingTransaction<AnyFile>,
614
        tree_index_map: &HashMap<String, usize>,
615
    ) -> Result<(), Error> {
616
710777
        if let Some(unique_views) = self
617
710777
            .data
618
710777
            .schema
619
710777
            .unique_views_in_collection(&operation.collection)
620
        {
621
100605
            let documents = transaction
622
100605
                .unlocked_tree(tree_index_map[&document_tree_name(&operation.collection)])
623
100605
                .unwrap();
624
200682
            for view in unique_views {
625
100605
                let name = view.view_name();
626
100605
                let document_map = transaction
627
100605
                    .unlocked_tree(tree_index_map[&view_document_map_tree_name(&name)])
628
100605
                    .unwrap();
629
100605
                let view_entries = transaction
630
100605
                    .unlocked_tree(tree_index_map[&view_entries_tree_name(&name)])
631
100605
                    .unwrap();
632
100605
                mapper::DocumentRequest {
633
100605
                    database: self,
634
100605
                    document_ids: vec![document_id.clone()],
635
100605
                    map_request: &mapper::Map {
636
100605
                        database: self.data.name.clone(),
637
100605
                        collection: operation.collection.clone(),
638
100605
                        view_name: name.clone(),
639
100605
                    },
640
100605
                    document_map,
641
100605
                    documents,
642
100605
                    view_entries,
643
100605
                    view,
644
100605
                }
645
100605
                .map()?;
646
            }
647
610172
        }
648

            
649
710249
        Ok(())
650
710777
    }
651

            
652
823130
    fn create_view_iterator<'a, K: for<'k> Key<'k> + 'a>(
653
823130
        view_entries: &'a Tree<Unversioned, AnyFile>,
654
823130
        key: Option<QueryKey<K>>,
655
823130
        order: Sort,
656
823130
        limit: Option<u32>,
657
823130
    ) -> Result<Vec<ViewEntry>, Error> {
658
823130
        let mut values = Vec::new();
659
823130
        let forwards = match order {
660
823006
            Sort::Ascending => true,
661
124
            Sort::Descending => false,
662
        };
663
823130
        let mut values_read = 0;
664
823130
        if let Some(key) = key {
665
818383
            match key {
666
398
                QueryKey::Range(range) => {
667
398
                    let range = range
668
398
                        .as_ord_bytes()
669
398
                        .map_err(view::Error::key_serialization)?;
670
398
                    view_entries.scan::<Infallible, _, _, _, _>(
671
796
                        &range.map_ref(|bytes| &bytes[..]),
672
398
                        forwards,
673
668
                        |_, _, _| ScanEvaluation::ReadData,
674
398
                        |_, _| {
675
2026
                            if let Some(limit) = limit {
676
248
                                if values_read >= limit {
677
124
                                    return ScanEvaluation::Stop;
678
124
                                }
679
124
                                values_read += 1;
680
1778
                            }
681
1902
                            ScanEvaluation::ReadData
682
2026
                        },
683
1902
                        |_key, _index, value| {
684
1902
                            values.push(value);
685
1902
                            Ok(())
686
1902
                        },
687
398
                    )?;
688
                }
689
817677
                QueryKey::Matches(key) => {
690
817677
                    let key = key
691
817677
                        .as_ord_bytes()
692
817677
                        .map_err(view::Error::key_serialization)?
693
817677
                        .to_vec();
694
817677

            
695
817677
                    values.extend(view_entries.get(&key)?);
696
                }
697
308
                QueryKey::Multiple(list) => {
698
308
                    let mut list = list
699
308
                        .into_iter()
700
616
                        .map(|key| {
701
616
                            key.as_ord_bytes()
702
616
                                .map(|bytes| bytes.to_vec())
703
616
                                .map_err(view::Error::key_serialization)
704
616
                        })
705
308
                        .collect::<Result<Vec<_>, _>>()?;
706

            
707
308
                    list.sort();
708
308

            
709
308
                    values.extend(
710
308
                        view_entries
711
308
                            .get_multiple(list.iter().map(Vec::as_slice))?
712
308
                            .into_iter()
713
616
                            .map(|(_, value)| value),
714
                    );
715
                }
716
            }
717
        } else {
718
4747
            view_entries.scan::<Infallible, _, _, _, _>(
719
4747
                &(..),
720
4747
                forwards,
721
4747
                |_, _, _| ScanEvaluation::ReadData,
722
4747
                |_, _| {
723
6108
                    if let Some(limit) = limit {
724
                        if values_read >= limit {
725
                            return ScanEvaluation::Stop;
726
                        }
727
                        values_read += 1;
728
6108
                    }
729
6108
                    ScanEvaluation::ReadData
730
6187
                },
731
6187
                |_, _, value| {
732
6108
                    values.push(value);
733
6108
                    Ok(())
734
6187
                },
735
4747
            )?;
736
        }
737

            
738
823130
        values
739
823130
            .into_iter()
740
823130
            .map(|value| bincode::deserialize(&value).map_err(Error::from))
741
823130
            .collect::<Result<Vec<_>, Error>>()
742
823130
    }
743

            
744
    #[cfg(any(feature = "encryption", feature = "compression"))]
745
2954275
    pub(crate) fn collection_encryption_key(&self, collection: &CollectionName) -> Option<&KeyId> {
746
2954275
        self.schematic()
747
2954275
            .encryption_key_for_collection(collection)
748
2954275
            .or_else(|| self.storage.default_encryption_key())
749
2954275
    }
750

            
751
    #[cfg_attr(
752
        not(feature = "encryption"),
753
        allow(
754
            unused_mut,
755
            unused_variables,
756
            clippy::unused_self,
757
            clippy::let_and_return
758
        )
759
    )]
760
    #[allow(clippy::unnecessary_wraps)]
761
2227817
    pub(crate) fn collection_tree<R: Root, S: Into<Cow<'static, str>>>(
762
2227817
        &self,
763
2227817
        collection: &CollectionName,
764
2227817
        name: S,
765
2227817
    ) -> Result<TreeRoot<R, AnyFile>, Error> {
766
2227817
        let mut tree = R::tree(name);
767
2227817

            
768
2227817
        #[cfg(any(feature = "encryption", feature = "compression"))]
769
2227817
        match (
770
2227817
            self.collection_encryption_key(collection),
771
2227817
            self.storage().tree_vault().cloned(),
772
        ) {
773
31229
            (Some(override_key), Some(mut vault)) => {
774
31229
                #[cfg(feature = "encryption")]
775
31229
                {
776
31229
                    vault.key = Some(override_key.clone());
777
31229
                    tree = tree.with_vault(vault);
778
31229
                }
779

            
780
                #[cfg(not(feature = "encryption"))]
781
                {
782
                    return Err(Error::EncryptionDisabled);
783
                }
784
            }
785
530996
            (None, Some(vault)) => {
786
530996
                tree = tree.with_vault(vault);
787
530996
            }
788
1665622
            (key, None) => {
789
                #[cfg(feature = "encryption")]
790
1665622
                if let Some(vault) = TreeVault::new_if_needed(
791
1665622
                    key.cloned(),
792
1665622
                    self.storage().vault(),
793
1665622
                    #[cfg(feature = "compression")]
794
1665622
                    None,
795
1665622
                ) {
796
33093
                    tree = tree.with_vault(vault);
797
1632529
                }
798

            
799
                #[cfg(not(feature = "encryption"))]
800
                if key.is_some() {
801
                    return Err(Error::EncryptionDisabled);
802
                }
803
            }
804
        }
805

            
806
2227847
        Ok(tree)
807
2227847
    }
808

            
809
1
    pub(crate) fn update_key_expiration<'key>(
810
1
        &self,
811
1
        tree_key: impl Into<Cow<'key, str>>,
812
1
        expiration: Option<Timestamp>,
813
1
    ) {
814
1
        self.data
815
1
            .context
816
1
            .update_key_expiration(tree_key, expiration);
817
1
    }
818

            
819
    /// Converts this instance into its blocking version, which is able to be
820
    /// used without async. The returned instance uses the current Tokio runtime
821
    /// handle to spawn blocking tasks.
822
    ///
823
    /// # Panics
824
    ///
825
    /// Panics if called outside the context of a Tokio runtime.
826
    #[cfg(feature = "async")]
827
    #[must_use]
828
2098208
    pub fn into_async(self) -> crate::AsyncDatabase {
829
2098208
        self.into_async_with_runtime(tokio::runtime::Handle::current())
830
2098208
    }
831

            
832
    /// Converts this instance into its blocking version, which is able to be
833
    /// used without async. The returned instance uses the provided runtime
834
    /// handle to spawn blocking tasks.
835
    #[cfg(feature = "async")]
836
    #[must_use]
837
2098208
    pub fn into_async_with_runtime(self, runtime: tokio::runtime::Handle) -> crate::AsyncDatabase {
838
2098208
        crate::AsyncDatabase {
839
2098208
            database: self,
840
2098208
            runtime: Arc::new(runtime),
841
2098208
        }
842
2098208
    }
843

            
844
    /// Converts this instance into its blocking version, which is able to be
845
    /// used without async. The returned instance uses the current Tokio runtime
846
    /// handle to spawn blocking tasks.
847
    ///
848
    /// # Panics
849
    ///
850
    /// Panics if called outside the context of a Tokio runtime.
851
    #[cfg(feature = "async")]
852
    #[must_use]
853
    pub fn to_async(&self) -> crate::AsyncDatabase {
854
        self.clone().into_async()
855
    }
856

            
857
    /// Converts this instance into its blocking version, which is able to be
858
    /// used without async. The returned instance uses the provided runtime
859
    /// handle to spawn blocking tasks.
860
    #[cfg(feature = "async")]
861
    #[must_use]
862
    pub fn to_async_with_runtime(&self, runtime: tokio::runtime::Handle) -> crate::AsyncDatabase {
863
        self.clone().into_async_with_runtime(runtime)
864
    }
865
}
866
59
#[derive(Serialize, Deserialize)]
867
struct LegacyHeader {
868
    id: u64,
869
    revision: Revision,
870
}
871
59
#[derive(Serialize, Deserialize)]
872
struct LegacyDocument<'a> {
873
    header: LegacyHeader,
874
    #[serde(borrow)]
875
    contents: &'a [u8],
876
}
877

            
878
1092998
pub(crate) fn deserialize_document(bytes: &[u8]) -> Result<BorrowedDocument<'_>, Error> {
879
1092998
    match pot::from_slice::<BorrowedDocument<'_>>(bytes) {
880
1092939
        Ok(document) => Ok(document),
881
59
        Err(err) => match bincode::deserialize::<LegacyDocument<'_>>(bytes) {
882
59
            Ok(legacy_doc) => Ok(BorrowedDocument {
883
59
                header: Header {
884
59
                    id: DocumentId::from_u64(legacy_doc.header.id),
885
59
                    revision: legacy_doc.header.revision,
886
59
                },
887
59
                contents: CowBytes::from(legacy_doc.contents),
888
59
            }),
889
            Err(_) => Err(Error::from(err)),
890
        },
891
    }
892
1092998
}
893

            
894
681174
fn serialize_document(document: &BorrowedDocument<'_>) -> Result<Vec<u8>, bonsaidb_core::Error> {
895
681174
    pot::to_vec(document)
896
681174
        .map_err(Error::from)
897
681174
        .map_err(bonsaidb_core::Error::from)
898
681174
}
899

            
900
impl HasSession for Database {
901
4041343
    fn session(&self) -> Option<&Session> {
902
4041343
        self.storage.session()
903
4041343
    }
904
}
905

            
906
impl Connection for Database {
907
    type Storage = Storage;
908

            
909
4756367
    fn storage(&self) -> Self::Storage {
910
4756367
        self.storage.clone()
911
4756367
    }
912

            
913
    fn list_executed_transactions(
914
        &self,
915
        starting_id: Option<u64>,
916
        result_limit: Option<u32>,
917
    ) -> Result<Vec<transaction::Executed>, bonsaidb_core::Error> {
918
32304
        self.check_permission(
919
32304
            database_resource_name(self.name()),
920
32304
            &BonsaiAction::Database(DatabaseAction::Transaction(TransactionAction::ListExecuted)),
921
32304
        )?;
922
32304
        let result_limit = usize::try_from(
923
32304
            result_limit
924
32304
                .unwrap_or(LIST_TRANSACTIONS_DEFAULT_RESULT_COUNT)
925
32304
                .min(LIST_TRANSACTIONS_MAX_RESULTS),
926
32304
        )
927
32304
        .unwrap();
928
32304
        if result_limit > 0 {
929
32180
            let range = if let Some(starting_id) = starting_id {
930
16518
                Range::from(starting_id..)
931
            } else {
932
15662
                Range::from(..)
933
            };
934

            
935
32180
            let mut entries = Vec::new();
936
32180
            self.roots()
937
32180
                .transactions()
938
306913
                .scan(range, |entry| {
939
306913
                    entries.push(entry);
940
306913
                    entries.len() < result_limit
941
306913
                })
942
32180
                .map_err(Error::from)?;
943

            
944
32180
            entries
945
32180
                .into_iter()
946
32180
                .map(|entry| {
947
306913
                    if let Some(data) = entry.data() {
948
306855
                        let changes = compat::deserialize_executed_transaction_changes(data)?;
949
306855
                        Ok(Some(transaction::Executed {
950
306855
                            id: entry.id,
951
306855
                            changes,
952
306855
                        }))
953
                    } else {
954
58
                        Ok(None)
955
                    }
956
306913
                })
957
32180
                .filter_map(Result::transpose)
958
32180
                .collect::<Result<Vec<_>, Error>>()
959
32180
                .map_err(bonsaidb_core::Error::from)
960
        } else {
961
            // A request was made to return an empty result? This should probably be
962
            // an error, but technically this is a correct response.
963
124
            Ok(Vec::default())
964
        }
965
32304
    }
966

            
967
    fn last_transaction_id(&self) -> Result<Option<u64>, bonsaidb_core::Error> {
968
466142
        self.check_permission(
969
466142
            database_resource_name(self.name()),
970
466142
            &BonsaiAction::Database(DatabaseAction::Transaction(TransactionAction::GetLastId)),
971
466142
        )?;
972
466142
        Ok(self.roots().transactions().current_transaction_id())
973
466142
    }
974

            
975
    fn compact(&self) -> Result<(), bonsaidb_core::Error> {
976
124
        self.check_permission(
977
124
            database_resource_name(self.name()),
978
124
            &BonsaiAction::Database(DatabaseAction::Compact),
979
124
        )?;
980
124
        self.storage()
981
124
            .instance
982
124
            .tasks()
983
124
            .compact_database(self.clone())?;
984
124
        Ok(())
985
124
    }
986

            
987
    fn compact_key_value_store(&self) -> Result<(), bonsaidb_core::Error> {
988
124
        self.check_permission(
989
124
            kv_resource_name(self.name()),
990
124
            &BonsaiAction::Database(DatabaseAction::Compact),
991
124
        )?;
992
124
        self.storage()
993
124
            .instance
994
124
            .tasks()
995
124
            .compact_key_value_store(self.clone())?;
996
124
        Ok(())
997
124
    }
998
}
999

            
impl LowLevelConnection for Database {
19768
    fn schematic(&self) -> &Schematic {
19768
        &self.data.schema
19768
    }

            
545590
    fn apply_transaction(
545590
        &self,
545590
        transaction: Transaction,
545590
    ) -> Result<Vec<OperationResult>, bonsaidb_core::Error> {
1272172
        for op in &transaction.operations {
726672
            let (resource, action) = match &op.command {
500908
                Command::Insert { .. } => (
500908
                    collection_resource_name(self.name(), &op.collection),
500908
                    BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Insert)),
500908
                ),
165668
                Command::Update { header, .. } => (
165668
                    document_resource_name(self.name(), &op.collection, &header.id),
165668
                    BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Update)),
165668
                ),
15278
                Command::Overwrite { id, .. } => (
15278
                    document_resource_name(self.name(), &op.collection, id),
15278
                    BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Overwrite)),
15278
                ),
44818
                Command::Delete { header } => (
44818
                    document_resource_name(self.name(), &op.collection, &header.id),
44818
                    BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Delete)),
44818
                ),
            };
726672
            self.check_permission(&resource, &action)?;
        }

            
545500
        let mut unique_view_tasks = Vec::new();
547390
        for collection_name in transaction
545500
            .operations
545500
            .iter()
726582
            .map(|op| &op.collection)
545500
            .collect::<HashSet<_>>()
        {
547388
            if let Some(views) = self.data.schema.views_in_collection(collection_name) {
1455115
                for view in views {
1125533
                    if view.unique() {
46606
                        if let Some(task) = self
46606
                            .storage
46606
                            .instance
46606
                            .tasks()
46606
                            .spawn_integrity_check(view, self)
1198
                        {
1198
                            unique_view_tasks.push(task);
45407
                        }
1078927
                    }
                }
217805
            }
        }

            
545499
        let mut unique_view_mapping_tasks = Vec::new();
546697
        for task in unique_view_tasks {
1198
            if let Some(spawned_task) = task.receive().map_err(Error::from)?.map_err(Error::from)? {
1106
                unique_view_mapping_tasks.push(spawned_task);
1106
            }
        }

            
546605
        for task in unique_view_mapping_tasks {
1106
            let mut task = task.lock();
1106
            if let Some(task) = task.take() {
1106
                task.receive().map_err(Error::from)?.map_err(Error::from)?;
            }
        }

            
545499
        self.apply_transaction_to_roots(&transaction)
545499
            .map_err(bonsaidb_core::Error::from)
545619
    }

            
    fn get_from_collection(
        &self,
        id: DocumentId,
        collection: &CollectionName,
    ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error> {
463737
        self.check_permission(
463737
            document_resource_name(self.name(), collection, &id),
463737
            &BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Get)),
463737
        )?;
463737
        let tree = self
463737
            .data
463737
            .context
463737
            .roots
463737
            .tree(self.collection_tree::<Versioned, _>(collection, document_tree_name(collection))?)
463737
            .map_err(Error::from)?;
463737
        if let Some(vec) = tree.get(id.as_ref()).map_err(Error::from)? {
448334
            Ok(Some(deserialize_document(&vec)?.into_owned()))
        } else {
15402
            Ok(None)
        }
463707
    }

            
    fn list_from_collection(
        &self,
        ids: Range<DocumentId>,
        sort: Sort,
        limit: Option<u32>,
        collection: &CollectionName,
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
723
        self.check_permission(
723
            collection_resource_name(self.name(), collection),
723
            &BonsaiAction::Database(DatabaseAction::Document(DocumentAction::List)),
723
        )?;
723
        let tree = self
723
            .data
723
            .context
723
            .roots
723
            .tree(self.collection_tree::<Versioned, _>(collection, document_tree_name(collection))?)
723
            .map_err(Error::from)?;
723
        let mut found_docs = Vec::new();
723
        let mut keys_read = 0;
723
        let ids = DocumentIdRange(ids);
723
        tree.scan(
723
            &ids.borrow_as_bytes(),
723
            match sort {
599
                Sort::Ascending => true,
124
                Sort::Descending => false,
            },
24
            |_, _, _| ScanEvaluation::ReadData,
723
            |_, _| {
1111
                if let Some(limit) = limit {
248
                    if keys_read >= limit {
124
                        return ScanEvaluation::Stop;
124
                    }
124

            
124
                    keys_read += 1;
863
                }
987
                ScanEvaluation::ReadData
1111
            },
723
            |_, _, doc| {
                found_docs.push(
987
                    deserialize_document(&doc)
987
                        .map(BorrowedDocument::into_owned)
987
                        .map_err(AbortError::Other)?,
                );
987
                Ok(())
987
            },
        )
723
        .map_err(|err| match err {
            AbortError::Other(err) => err,
            AbortError::Nebari(err) => crate::Error::from(err),
723
        })?;

            
723
        Ok(found_docs)
723
    }

            
    fn list_headers_from_collection(
        &self,
        ids: Range<DocumentId>,
        sort: Sort,
        limit: Option<u32>,
        collection: &CollectionName,
    ) -> Result<Vec<Header>, bonsaidb_core::Error> {
124
        self.check_permission(
124
            collection_resource_name(self.name(), collection),
124
            &BonsaiAction::Database(DatabaseAction::Document(DocumentAction::ListHeaders)),
124
        )?;
124
        let tree = self
124
            .data
124
            .context
124
            .roots
124
            .tree(self.collection_tree::<Versioned, _>(collection, document_tree_name(collection))?)
124
            .map_err(Error::from)?;
124
        let mut found_headers = Vec::new();
124
        let mut keys_read = 0;
124
        let ids = DocumentIdRange(ids);
124
        tree.scan(
124
            &ids.borrow_as_bytes(),
124
            match sort {
124
                Sort::Ascending => true,
                Sort::Descending => false,
            },
            |_, _, _| ScanEvaluation::ReadData,
124
            |_, _| {
248
                if let Some(limit) = limit {
                    if keys_read >= limit {
                        return ScanEvaluation::Stop;
                    }

            
                    keys_read += 1;
248
                }
248
                ScanEvaluation::ReadData
248
            },
124
            |_, _, doc| {
                found_headers.push(
248
                    deserialize_document(&doc)
248
                        .map(|doc| doc.header)
248
                        .map_err(AbortError::Other)?,
                );
248
                Ok(())
248
            },
        )
124
        .map_err(|err| match err {
            AbortError::Other(err) => err,
            AbortError::Nebari(err) => crate::Error::from(err),
124
        })?;

            
124
        Ok(found_headers)
124
    }

            
    fn count_from_collection(
        &self,
        ids: Range<DocumentId>,
        collection: &CollectionName,
    ) -> Result<u64, bonsaidb_core::Error> {
248
        self.check_permission(
248
            collection_resource_name(self.name(), collection),
248
            &BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Count)),
248
        )?;
248
        let tree = self
248
            .data
248
            .context
248
            .roots
248
            .tree(self.collection_tree::<Versioned, _>(collection, document_tree_name(collection))?)
248
            .map_err(Error::from)?;
248
        let ids = DocumentIdRange(ids);
248
        let stats = tree.reduce(&ids.borrow_as_bytes()).map_err(Error::from)?;

            
248
        Ok(stats.alive_keys)
248
    }

            
269848
    fn get_multiple_from_collection(
269848
        &self,
269848
        ids: &[DocumentId],
269848
        collection: &CollectionName,
269848
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
537732
        for id in ids {
267884
            self.check_permission(
267884
                document_resource_name(self.name(), collection, id),
267884
                &BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Get)),
267884
            )?;
        }
269848
        let mut ids = ids.to_vec();
269848
        let collection = collection.clone();
269848
        let tree = self
269848
            .data
269848
            .context
269848
            .roots
269848
            .tree(
269848
                self.collection_tree::<Versioned, _>(&collection, document_tree_name(&collection))?,
            )
269848
            .map_err(Error::from)?;
269848
        ids.sort();
269848
        let keys_and_values = tree
269864
            .get_multiple(ids.iter().map(|id| id.as_ref()))
269848
            .map_err(Error::from)?;

            
269848
        keys_and_values
269848
            .into_iter()
269864
            .map(|(_, value)| deserialize_document(&value).map(BorrowedDocument::into_owned))
269848
            .collect::<Result<Vec<_>, Error>>()
269848
            .map_err(bonsaidb_core::Error::from)
269848
    }

            
    fn compact_collection_by_name(
        &self,
        collection: CollectionName,
    ) -> Result<(), bonsaidb_core::Error> {
124
        self.check_permission(
124
            collection_resource_name(self.name(), &collection),
124
            &BonsaiAction::Database(DatabaseAction::Compact),
124
        )?;
124
        self.storage()
124
            .instance
124
            .tasks()
124
            .compact_collection(self.clone(), collection)?;
124
        Ok(())
124
    }

            
294812
    fn query_by_name(
294812
        &self,
294812
        view: &ViewName,
294812
        key: Option<QueryKey<Bytes>>,
294812
        order: Sort,
294812
        limit: Option<u32>,
294812
        access_policy: AccessPolicy,
294812
    ) -> Result<Vec<schema::view::map::Serialized>, bonsaidb_core::Error> {
294812
        let view = self.schematic().view_by_name(view)?;
294812
        self.check_permission(
294812
            view_resource_name(self.name(), &view.view_name()),
294812
            &BonsaiAction::Database(DatabaseAction::View(ViewAction::Query)),
294812
        )?;
294812
        let mut results = Vec::new();
294812
        self.for_each_in_view(view, key, order, limit, access_policy, |entry| {
576556
            for mapping in entry.mappings {
289007
                results.push(bonsaidb_core::schema::view::map::Serialized {
289007
                    source: mapping.source,
289007
                    key: entry.key.clone(),
289007
                    value: mapping.value,
289007
                });
289007
            }
287549
            Ok(())
294812
        })?;

            
294812
        Ok(results)
294812
    }

            
    fn query_by_name_with_docs(
        &self,
        view: &ViewName,
        key: Option<QueryKey<Bytes>>,
        order: Sort,
        limit: Option<u32>,
        access_policy: AccessPolicy,
    ) -> Result<schema::view::map::MappedSerializedDocuments, bonsaidb_core::Error> {
        let results = self.query_by_name(view, key, order, limit, access_policy)?;
        let view = self.schematic().view_by_name(view).unwrap(); // query() will fail if it's not present

            
        let documents = self
            .get_multiple_from_collection(
                &results.iter().map(|m| m.source.id).collect::<Vec<_>>(),
                &view.collection(),
            )?
            .into_iter()
            .map(|doc| (doc.header.id, doc))
            .collect::<BTreeMap<_, _>>();

            
        Ok(
            bonsaidb_core::schema::view::map::MappedSerializedDocuments {
                mappings: results,
                documents,
            },
        )
    }

            
527750
    fn reduce_by_name(
527750
        &self,
527750
        view_name: &ViewName,
527750
        key: Option<QueryKey<Bytes>>,
527750
        access_policy: AccessPolicy,
527750
    ) -> Result<Vec<u8>, bonsaidb_core::Error> {
527750
        let mut mappings = self.reduce_grouped_by_name(view_name, key, access_policy)?;

            
527750
        let result = if mappings.len() == 1 {
214778
            mappings.pop().unwrap().value.into_vec()
        } else {
312972
            let view = self.data.schema.view_by_name(view_name)?;
312972
            view.reduce(
312972
                &mappings
312972
                    .iter()
312972
                    .map(|map| (map.key.as_ref(), map.value.as_ref()))
312972
                    .collect::<Vec<_>>(),
312972
                true,
312972
            )
312972
            .map_err(Error::from)?
        };

            
527626
        Ok(result)
527750
    }

            
528070
    fn reduce_grouped_by_name(
528070
        &self,
528070
        view_name: &ViewName,
528070
        key: Option<QueryKey<Bytes>>,
528070
        access_policy: AccessPolicy,
528070
    ) -> Result<Vec<MappedSerializedValue>, bonsaidb_core::Error> {
528070
        let view = self.data.schema.view_by_name(view_name)?;
528070
        self.check_permission(
528070
            view_resource_name(self.name(), &view.view_name()),
528070
            &BonsaiAction::Database(DatabaseAction::View(ViewAction::Reduce)),
528070
        )?;
528070
        let mut mappings = Vec::new();
528086
        self.for_each_in_view(view, key, Sort::Ascending, None, access_policy, |entry| {
220316
            mappings.push(MappedSerializedValue {
220316
                key: entry.key,
220316
                value: entry.reduced_value,
220316
            });
220316
            Ok(())
528086
        })?;

            
528070
        Ok(mappings)
528070
    }

            
248
    fn delete_docs_by_name(
248
        &self,
248
        view: &ViewName,
248
        key: Option<QueryKey<Bytes>>,
248
        access_policy: AccessPolicy,
248
    ) -> Result<u64, bonsaidb_core::Error> {
248
        let view = self.data.schema.view_by_name(view)?;
248
        let collection = view.collection();
248
        let mut transaction = Transaction::default();
248
        self.for_each_in_view(view, key, Sort::Ascending, None, access_policy, |entry| {
620
            for mapping in entry.mappings {
372
                transaction.push(Operation::delete(collection.clone(), mapping.source));
372
            }

            
248
            Ok(())
248
        })?;

            
248
        let results = LowLevelConnection::apply_transaction(self, transaction)?;

            
248
        Ok(results.len() as u64)
248
    }
}

            
type ViewIterator<'a> =
    Box<dyn Iterator<Item = Result<(ArcBytes<'static>, ArcBytes<'static>), Error>> + 'a>;

            
struct ViewEntryCollectionIterator<'a> {
    iterator: ViewIterator<'a>,
}

            
impl<'a> Iterator for ViewEntryCollectionIterator<'a> {
    type Item = Result<ViewEntry, crate::Error>;

            
    fn next(&mut self) -> Option<Self::Item> {
        self.iterator.next().map(|item| {
            item.map_err(crate::Error::from)
                .and_then(|(_, value)| bincode::deserialize(&value).map_err(Error::from))
        })
    }
}

            
2150223
#[derive(Debug, Clone)]
pub(crate) struct Context {
    data: Arc<ContextData>,
}

            
impl Deref for Context {
    type Target = ContextData;

            
3316492
    fn deref(&self) -> &Self::Target {
3316492
        &self.data
3316492
    }
}

            
#[derive(Debug)]
pub(crate) struct ContextData {
    pub(crate) roots: Roots<AnyFile>,
    key_value_state: Arc<Mutex<keyvalue::KeyValueState>>,
}

            
impl Borrow<Roots<AnyFile>> for Context {
    fn borrow(&self) -> &Roots<AnyFile> {
        &self.data.roots
    }
}

            
impl Context {
26877
    pub(crate) fn new(roots: Roots<AnyFile>, key_value_persistence: KeyValuePersistence) -> Self {
26877
        let background_worker_target = Watchable::new(BackgroundWorkerProcessTarget::Never);
26877
        let mut background_worker_target_watcher = background_worker_target.watch();
26877
        let key_value_state = Arc::new(Mutex::new(keyvalue::KeyValueState::new(
26877
            key_value_persistence,
26877
            roots.clone(),
26877
            background_worker_target,
26877
        )));
26877
        let background_worker_state = Arc::downgrade(&key_value_state);
26877
        let context = Self {
26877
            data: Arc::new(ContextData {
26877
                roots,
26877
                key_value_state,
26877
            }),
26877
        };
26877
        std::thread::Builder::new()
26877
            .name(String::from("keyvalue-worker"))
26877
            .spawn(move || {
26847
                keyvalue::background_worker(
26847
                    &background_worker_state,
26847
                    &mut background_worker_target_watcher,
26847
                );
26877
            })
26877
            .unwrap();
26877
        context
26877
    }

            
1253819
    pub(crate) fn perform_kv_operation(
1253819
        &self,
1253819
        op: KeyOperation,
1253819
    ) -> Result<Output, bonsaidb_core::Error> {
1253819
        let mut state = self.data.key_value_state.lock();
1253819
        state.perform_kv_operation(op, &self.data.key_value_state)
1253819
    }

            
14
    pub(crate) fn update_key_expiration<'key>(
14
        &self,
14
        tree_key: impl Into<Cow<'key, str>>,
14
        expiration: Option<Timestamp>,
14
    ) {
14
        let mut state = self.data.key_value_state.lock();
14
        state.update_key_expiration(tree_key, expiration);
14
    }

            
    #[cfg(test)]
1
    pub(crate) fn kv_persistence_watcher(&self) -> watchable::Watcher<Timestamp> {
1
        let state = self.data.key_value_state.lock();
1
        state.persistence_watcher()
1
    }
}

            
impl Drop for ContextData {
    fn drop(&mut self) {
24027
        if let Some(shutdown) = {
24027
            let mut state = self.key_value_state.lock();
24027
            state.shutdown(&self.key_value_state)
24027
        } {
331
            let _ = shutdown.recv();
23696
        }
24027
    }
}

            
2458876
pub fn document_tree_name(collection: &CollectionName) -> String {
2458876
    format!("collection.{:#}", collection)
2458876
}

            
pub struct DocumentIdRange(Range<DocumentId>);

            
impl<'a> BorrowByteRange<'a> for DocumentIdRange {
1095
    fn borrow_as_bytes(&'a self) -> BorrowedRange<'a> {
1095
        BorrowedRange {
1095
            start: match &self.0.start {
475
                connection::Bound::Unbounded => ops::Bound::Unbounded,
620
                connection::Bound::Included(docid) => ops::Bound::Included(docid.as_ref()),
                connection::Bound::Excluded(docid) => ops::Bound::Excluded(docid.as_ref()),
            },
1095
            end: match &self.0.end {
475
                connection::Bound::Unbounded => ops::Bound::Unbounded,
496
                connection::Bound::Included(docid) => ops::Bound::Included(docid.as_ref()),
124
                connection::Bound::Excluded(docid) => ops::Bound::Excluded(docid.as_ref()),
            },
        }
1095
    }
}

            
/// Operations that can be performed on both [`Database`] and
/// [`AsyncDatabase`](crate::AsyncDatabase).
pub trait DatabaseNonBlocking {
    /// Returns the name of the database.
    #[must_use]
    fn name(&self) -> &str;
}

            
impl DatabaseNonBlocking for Database {
4048515
    fn name(&self) -> &str {
4048515
        self.data.name.as_ref()
4048515
    }
}