1
use std::borrow::{Borrow, Cow};
2
use std::collections::{BTreeMap, HashMap, HashSet};
3
use std::convert::Infallible;
4
use std::ops::{self, Deref};
5
use std::sync::Arc;
6
use std::u8;
7

            
8
use bonsaidb_core::arc_bytes::serde::CowBytes;
9
use bonsaidb_core::arc_bytes::ArcBytes;
10
use bonsaidb_core::connection::{
11
    self, AccessPolicy, Connection, HasSchema, HasSession, LowLevelConnection, Range,
12
    SerializedQueryKey, Session, Sort, StorageConnection,
13
};
14
#[cfg(any(feature = "encryption", feature = "compression"))]
15
use bonsaidb_core::document::KeyId;
16
use bonsaidb_core::document::{BorrowedDocument, DocumentId, Header, OwnedDocument, Revision};
17
use bonsaidb_core::keyvalue::{KeyOperation, Output, Timestamp};
18
use bonsaidb_core::limits::{
19
    LIST_TRANSACTIONS_DEFAULT_RESULT_COUNT, LIST_TRANSACTIONS_MAX_RESULTS,
20
};
21
use bonsaidb_core::permissions::bonsai::{
22
    collection_resource_name, database_resource_name, document_resource_name, kv_resource_name,
23
    view_resource_name, BonsaiAction, DatabaseAction, DocumentAction, TransactionAction,
24
    ViewAction,
25
};
26
use bonsaidb_core::permissions::Permissions;
27
use bonsaidb_core::schema::view::map::MappedSerializedValue;
28
use bonsaidb_core::schema::view::{self};
29
use bonsaidb_core::schema::{self, CollectionName, Schema, Schematic, ViewName};
30
use bonsaidb_core::transaction::{
31
    self, ChangedDocument, Changes, Command, DocumentChanges, Operation, OperationResult,
32
    Transaction,
33
};
34
use itertools::Itertools;
35
use nebari::io::any::AnyFile;
36
use nebari::tree::{
37
    AnyTreeRoot, BorrowByteRange, BorrowedRange, CompareSwap, Root, ScanEvaluation, TreeRoot,
38
    Unversioned, Versioned,
39
};
40
use nebari::{AbortError, ExecutingTransaction, Roots, Tree};
41
use parking_lot::Mutex;
42
use serde::{Deserialize, Serialize};
43
use watchable::Watchable;
44

            
45
use crate::config::{Builder, KeyValuePersistence, StorageConfiguration};
46
use crate::database::keyvalue::BackgroundWorkerProcessTarget;
47
use crate::error::Error;
48
use crate::open_trees::OpenTrees;
49
use crate::storage::StorageLock;
50
#[cfg(feature = "encryption")]
51
use crate::storage::TreeVault;
52
use crate::views::{
53
    mapper, view_document_map_tree_name, view_entries_tree_name, view_invalidated_docs_tree_name,
54
    ViewEntry,
55
};
56
use crate::Storage;
57

            
58
pub mod keyvalue;
59

            
60
pub(crate) mod compat;
61
pub mod pubsub;
62

            
63
/// A database stored in BonsaiDb. This type blocks the current thread when
64
/// used. See [`AsyncDatabase`](crate::AsyncDatabase) for this type's async counterpart.
65
///
66
/// ## Converting between Blocking and Async Types
67
///
68
/// [`AsyncDatabase`](crate::AsyncDatabase) and [`Database`] can be converted to and from each other
69
/// using:
70
///
71
/// - [`AsyncDatabase::into_blocking()`](crate::AsyncDatabase::into_blocking)
72
/// - [`AsyncDatabase::to_blocking()`](crate::AsyncDatabase::to_blocking)
73
/// - [`AsyncDatabase::as_blocking()`](crate::AsyncDatabase::as_blocking)
74
/// - [`Database::into_async()`]
75
/// - [`Database::to_async()`]
76
/// - [`Database::into_async_with_runtime()`]
77
/// - [`Database::to_async_with_runtime()`]
78
///
79
/// ## Using `Database` to create a single database
80
///
81
/// `Database`provides an easy mechanism to open and access a single database:
82
///
83
/// ```rust
84
/// // `bonsaidb_core` is re-exported to `bonsaidb::core` or `bonsaidb_local::core`.
85
/// use bonsaidb_core::schema::Collection;
86
/// // `bonsaidb_local` is re-exported to `bonsaidb::local` if using the omnibus crate.
87
/// use bonsaidb_local::{
88
///     config::{Builder, StorageConfiguration},
89
///     Database,
90
/// };
91
/// use serde::{Deserialize, Serialize};
92
///
93
/// #[derive(Debug, Serialize, Deserialize, Collection)]
94
/// #[collection(name = "blog-posts")]
95
/// # #[collection(core = bonsaidb_core)]
96
/// struct BlogPost {
97
///     pub title: String,
98
///     pub contents: String,
99
/// }
100
///
101
/// # fn test() -> Result<(), bonsaidb_local::Error> {
102
/// let db = Database::open::<BlogPost>(StorageConfiguration::new("my-db.bonsaidb"))?;
103
/// # Ok(())
104
/// # }
105
/// ```
106
///
107
/// Under the hood, this initializes a [`Storage`] instance pointing at
108
/// "./my-db.bonsaidb". It then returns (or creates) a database named "default"
109
/// with the schema `BlogPost`.
110
///
111
/// In this example, `BlogPost` implements the [`Collection`](schema::Collection) trait, and all
112
/// collections can be used as a [`Schema`].
113
4641266
#[derive(Debug, Clone)]
114
pub struct Database {
115
    pub(crate) data: Arc<Data>,
116
    pub(crate) storage: Storage,
117
}
118

            
119
#[derive(Debug)]
120
pub struct Data {
121
    pub name: Arc<Cow<'static, str>>,
122
    context: Context,
123
    pub(crate) schema: Arc<Schematic>,
124
}
125

            
126
impl Database {
127
    /// Opens a local file as a bonsaidb.
128
141033
    pub(crate) fn new<DB: Schema, S: Into<Cow<'static, str>> + Send>(
129
141033
        name: S,
130
141033
        context: Context,
131
141033
        storage: &Storage,
132
141033
    ) -> Result<Self, Error> {
133
141033
        let name = name.into();
134
141033
        let schema = Arc::new(DB::schematic()?);
135
141033
        let db = Self {
136
141033
            storage: storage.clone(),
137
141033
            data: Arc::new(Data {
138
141033
                name: Arc::new(name),
139
141033
                context,
140
141033
                schema,
141
141033
            }),
142
141033
        };
143
141033

            
144
141033
        if storage.instance.check_view_integrity_on_database_open() {
145
18
            for view in db.data.schema.views() {
146
18
                storage.instance.tasks().spawn_integrity_check(view, &db);
147
18
            }
148
141029
        }
149

            
150
141033
        storage
151
141033
            .instance
152
141033
            .tasks()
153
141033
            .spawn_key_value_expiration_loader(&db);
154
141033

            
155
141033
        Ok(db)
156
141033
    }
157

            
158
    /// Restricts an unauthenticated instance to having `effective_permissions`.
159
    /// Returns `None` if a session has already been established.
160
    #[must_use]
161
    pub fn with_effective_permissions(&self, effective_permissions: Permissions) -> Option<Self> {
162
        self.storage
163
            .with_effective_permissions(effective_permissions)
164
            .map(|storage| Self {
165
                storage,
166
                data: self.data.clone(),
167
            })
168
    }
169

            
170
    /// Creates a `Storage` with a single-database named "default" with its data
171
    /// stored at `path`. This requires exclusive access to the storage location
172
    /// configured. Attempting to open the same path multiple times concurrently
173
    /// will lead to errors.
174
    ///
175
    /// Using this method is perfect if only one database is being used.
176
    /// However, if multiple databases are needed, it is much better to store
177
    /// multiple databases in a single [`Storage`] instance rather than creating
178
    /// multiple independent databases using this method.
179
    ///
180
    /// When opening multiple databases using this function, each database will
181
    /// have its own thread pool, cache, task worker pool, and more. By using a
182
    /// single [`Storage`] instance, BonsaiDb will use less resources and likely
183
    /// perform better.
184
48
    pub fn open<DB: Schema>(configuration: StorageConfiguration) -> Result<Self, Error> {
185
48
        let storage = Storage::open(configuration.with_schema::<DB>()?)?;
186

            
187
48
        Ok(storage.create_database::<DB>("default", true)?)
188
48
    }
189

            
190
    /// Returns the [`Schematic`] for the schema for this database.
191
    #[must_use]
192
4934309
    pub fn schematic(&self) -> &'_ Schematic {
193
4934309
        &self.data.schema
194
4934309
    }
195

            
196
2679855
    pub(crate) fn roots(&self) -> &'_ nebari::Roots<AnyFile> {
197
2679855
        &self.data.context.roots
198
2679855
    }
199

            
200
1073884
    fn for_each_in_view<F: FnMut(ViewEntry) -> Result<(), bonsaidb_core::Error> + Send + Sync>(
201
1073884
        &self,
202
1073884
        view: &dyn view::Serialized,
203
1073884
        key: Option<SerializedQueryKey>,
204
1073884
        order: Sort,
205
1073884
        limit: Option<u32>,
206
1073884
        access_policy: AccessPolicy,
207
1073884
        mut callback: F,
208
1073884
    ) -> Result<(), bonsaidb_core::Error> {
209
1073884
        if matches!(access_policy, AccessPolicy::UpdateBefore) {
210
401374
            self.storage
211
401374
                .instance
212
401374
                .tasks()
213
401374
                .update_view_if_needed(view, self, true)?;
214
672510
        } else if let Some(integrity_check) = self
215
672510
            .storage
216
672510
            .instance
217
672510
            .tasks()
218
672510
            .spawn_integrity_check(view, self)
219
        {
220
1009
            integrity_check
221
1009
                .receive()
222
1009
                .map_err(Error::from)?
223
1009
                .map_err(Error::from)?;
224
671501
        }
225

            
226
1073884
        let view_entries = self
227
1073884
            .roots()
228
1073884
            .tree(self.collection_tree(
229
1073884
                &view.collection(),
230
1073884
                view_entries_tree_name(&view.view_name()),
231
1073884
            )?)
232
1073884
            .map_err(Error::from)?;
233

            
234
        {
235
1073884
            for entry in Self::create_view_iterator(&view_entries, key, order, limit)? {
236
905347
                callback(entry)?;
237
            }
238
        }
239

            
240
1073884
        if matches!(access_policy, AccessPolicy::UpdateAfter) {
241
148
            let db = self.clone();
242
148
            let view_name = view.view_name();
243
148
            let view = db
244
148
                .data
245
148
                .schema
246
148
                .view_by_name(&view_name)
247
148
                .expect("query made with view that isn't registered with this database");
248
148
            db.storage
249
148
                .instance
250
148
                .tasks()
251
148
                .update_view_if_needed(view, &db, false)?;
252
1073736
        }
253

            
254
1073884
        Ok(())
255
1073884
    }
256

            
257
677313
    fn open_trees_for_transaction(&self, transaction: &Transaction) -> Result<OpenTrees, Error> {
258
677313
        let mut open_trees = OpenTrees::default();
259
1768685
        for op in &transaction.operations {
260
1091520
            if self
261
1091520
                .data
262
1091520
                .schema
263
1091520
                .collection_primary_key_description(&op.collection)
264
1091520
                .is_none()
265
            {
266
148
                return Err(Error::Core(bonsaidb_core::Error::CollectionNotFound));
267
1091372
            }
268

            
269
            #[cfg(any(feature = "encryption", feature = "compression"))]
270
1091372
            let vault = if let Some(encryption_key) =
271
1091372
                self.collection_encryption_key(&op.collection).cloned()
272
            {
273
                #[cfg(feature = "encryption")]
274
4853
                if let Some(mut vault) = self.storage().tree_vault().cloned() {
275
2800
                    vault.key = Some(encryption_key);
276
2800
                    Some(vault)
277
                } else {
278
2053
                    TreeVault::new_if_needed(
279
2053
                        Some(encryption_key),
280
2053
                        self.storage().vault(),
281
2053
                        #[cfg(feature = "compression")]
282
2053
                        None,
283
2053
                    )
284
                }
285

            
286
                #[cfg(not(feature = "encryption"))]
287
                {
288
                    drop(encryption_key);
289
                    return Err(Error::EncryptionDisabled);
290
                }
291
            } else {
292
1086519
                self.storage().tree_vault().cloned()
293
            };
294

            
295
1091372
            open_trees.open_trees_for_document_change(
296
1091372
                &op.collection,
297
1091372
                &self.data.schema,
298
1091372
                #[cfg(any(feature = "encryption", feature = "compression"))]
299
1091372
                vault,
300
1091372
            );
301
        }
302

            
303
677165
        Ok(open_trees)
304
677313
    }
305

            
306
677313
    fn apply_transaction_to_roots(
307
677313
        &self,
308
677313
        transaction: &Transaction,
309
677313
    ) -> Result<Vec<OperationResult>, Error> {
310
677313
        let open_trees = self.open_trees_for_transaction(transaction)?;
311

            
312
677165
        let mut roots_transaction = self
313
677165
            .data
314
677165
            .context
315
677165
            .roots
316
677165
            .transaction::<_, dyn AnyTreeRoot<AnyFile>>(&open_trees.trees)?;
317

            
318
677165
        let mut results = Vec::new();
319
677165
        let mut changed_documents = Vec::new();
320
677165
        let mut collection_indexes = HashMap::new();
321
677165
        let mut collections = Vec::new();
322
1748726
        for op in &transaction.operations {
323
1091076
            let result = self.execute_operation(
324
1091076
                op,
325
1091076
                &mut roots_transaction,
326
1091076
                &open_trees.trees_index_by_name,
327
1091076
            )?;
328

            
329
1071561
            if let Some((collection, id, deleted)) = match &result {
330
1011865
                OperationResult::DocumentUpdated { header, collection } => {
331
1011865
                    Some((collection, header.id.clone(), false))
332
                }
333
58536
                OperationResult::DocumentDeleted { id, collection } => {
334
58536
                    Some((collection, id.clone(), true))
335
                }
336
1160
                OperationResult::Success => None,
337
1070401
            } {
338
1070401
                let collection = match collection_indexes.get(collection) {
339
410556
                    Some(index) => *index,
340
                    None => {
341
659845
                        if let Ok(id) = u16::try_from(collections.len()) {
342
659845
                            collection_indexes.insert(collection.clone(), id);
343
659845
                            collections.push(collection.clone());
344
659845
                            id
345
                        } else {
346
                            return Err(Error::TransactionTooLarge);
347
                        }
348
                    }
349
                };
350
1070401
                changed_documents.push(ChangedDocument {
351
1070401
                    collection,
352
1070401
                    id,
353
1070401
                    deleted,
354
1070401
                });
355
1160
            }
356
1071561
            results.push(result);
357
        }
358

            
359
657650
        self.invalidate_changed_documents(
360
657650
            &mut roots_transaction,
361
657650
            &open_trees,
362
657650
            &collections,
363
657650
            &changed_documents,
364
657650
        )?;
365

            
366
657650
        roots_transaction
367
657650
            .entry_mut()
368
657650
            .set_data(compat::serialize_executed_transaction_changes(
369
657650
                &Changes::Documents(DocumentChanges {
370
657650
                    collections,
371
657650
                    documents: changed_documents,
372
657650
                }),
373
657650
            )?)?;
374

            
375
657650
        roots_transaction.commit()?;
376

            
377
657650
        Ok(results)
378
677313
    }
379

            
380
    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
381
657650
    fn invalidate_changed_documents(
382
657650
        &self,
383
657650
        roots_transaction: &mut ExecutingTransaction<AnyFile>,
384
657650
        open_trees: &OpenTrees,
385
657650
        collections: &[CollectionName],
386
657650
        changed_documents: &[ChangedDocument],
387
657650
    ) -> Result<(), Error> {
388
659846
        for (collection, changed_documents) in &changed_documents
389
657650
            .iter()
390
1070401
            .group_by(|doc| &collections[usize::from(doc.collection)])
391
        {
392
659845
            let mut views = self
393
659845
                .data
394
659845
                .schema
395
659845
                .views_in_collection(collection)
396
1888354
                .filter(|view| !view.update_policy().is_eager())
397
659845
                .peekable();
398
659845
            if views.peek().is_some() {
399
328805
                let changed_documents = changed_documents.collect::<Vec<_>>();
400
1857404
                for view in views {
401
1528599
                    let view_name = view.view_name();
402
1528599
                    let tree_name = view_invalidated_docs_tree_name(&view_name);
403
3272682
                    for changed_document in &changed_documents {
404
1744083
                        let mut invalidated_docs = roots_transaction
405
1744083
                            .tree::<Unversioned>(open_trees.trees_index_by_name[&tree_name])
406
1744083
                            .unwrap();
407
1744083
                        invalidated_docs.set(changed_document.id.as_ref().to_vec(), b"")?;
408
                    }
409
                }
410
331040
            }
411
        }
412
657650
        Ok(())
413
657650
    }
414

            
415
1091076
    fn execute_operation(
416
1091076
        &self,
417
1091076
        operation: &Operation,
418
1091076
        transaction: &mut ExecutingTransaction<AnyFile>,
419
1091076
        tree_index_map: &HashMap<String, usize>,
420
1091076
    ) -> Result<OperationResult, Error> {
421
1091076
        match &operation.command {
422
819812
            Command::Insert { id, contents } => {
423
819812
                self.execute_insert(operation, transaction, tree_index_map, id.clone(), contents)
424
            }
425
210680
            Command::Update { header, contents } => self.execute_update(
426
210680
                operation,
427
210680
                transaction,
428
210680
                tree_index_map,
429
210680
                &header.id,
430
210680
                Some(&header.revision),
431
210680
                contents,
432
210680
            ),
433
592
            Command::Overwrite { id, contents } => {
434
592
                self.execute_update(operation, transaction, tree_index_map, id, None, contents)
435
            }
436
58536
            Command::Delete { header } => {
437
58536
                self.execute_delete(operation, transaction, tree_index_map, header)
438
            }
439
1456
            Command::Check { id, revision } => Self::execute_check(
440
1456
                operation,
441
1456
                transaction,
442
1456
                tree_index_map,
443
1456
                id.clone(),
444
1456
                *revision,
445
1456
            ),
446
        }
447
1091076
    }
448

            
449
    #[cfg_attr(
450
        feature = "tracing",
451
        tracing::instrument(
452
            level = "trace",
453
            skip(self, operation, transaction, tree_index_map, contents),
454
            fields(
455
                database = self.name(),
456
                collection.name = operation.collection.name.as_ref(),
457
                collection.authority = operation.collection.authority.as_ref()
458
            )
459
        )
460
    )]
461
211272
    fn execute_update(
462
211272
        &self,
463
211272
        operation: &Operation,
464
211272
        transaction: &mut ExecutingTransaction<AnyFile>,
465
211272
        tree_index_map: &HashMap<String, usize>,
466
211272
        id: &DocumentId,
467
211272
        check_revision: Option<&Revision>,
468
211272
        contents: &[u8],
469
211272
    ) -> Result<OperationResult, crate::Error> {
470
211272
        let mut documents = transaction
471
211272
            .tree::<Versioned>(tree_index_map[&document_tree_name(&operation.collection)])
472
211272
            .unwrap();
473
211272
        let document_id = ArcBytes::from(id.to_vec());
474
211272
        let mut result = None;
475
211272
        let mut updated = false;
476
211272
        documents.modify(
477
211272
            vec![document_id.clone()],
478
211272
            nebari::tree::Operation::CompareSwap(CompareSwap::new(&mut |_key,
479
                                                                        value: Option<
480
                ArcBytes<'_>,
481
            >| {
482
211272
                if let Some(old) = value {
483
210976
                    let doc = match deserialize_document(&old) {
484
210976
                        Ok(doc) => doc,
485
                        Err(err) => {
486
                            result = Some(Err(err));
487
                            return nebari::tree::KeyOperation::Skip;
488
                        }
489
                    };
490
210976
                    if check_revision.is_none() || Some(&doc.header.revision) == check_revision {
491
210828
                        if let Some(updated_revision) = doc.header.revision.next_revision(contents)
492
                        {
493
210532
                            let updated_header = Header {
494
210532
                                id: id.clone(),
495
210532
                                revision: updated_revision,
496
210532
                            };
497
210532
                            let serialized_doc = match serialize_document(&BorrowedDocument {
498
210532
                                header: updated_header.clone(),
499
210532
                                contents: CowBytes::from(contents),
500
210532
                            }) {
501
210532
                                Ok(bytes) => bytes,
502
                                Err(err) => {
503
                                    result = Some(Err(Error::from(err)));
504
                                    return nebari::tree::KeyOperation::Skip;
505
                                }
506
                            };
507
210532
                            result = Some(Ok(OperationResult::DocumentUpdated {
508
210532
                                collection: operation.collection.clone(),
509
210532
                                header: updated_header,
510
210532
                            }));
511
210532
                            updated = true;
512
210532
                            return nebari::tree::KeyOperation::Set(ArcBytes::from(serialized_doc));
513
296
                        }
514
296

            
515
296
                        // If no new revision was made, it means an attempt to
516
296
                        // save a document with the same contents was made.
517
296
                        // We'll return a success but not actually give a new
518
296
                        // version
519
296
                        result = Some(Ok(OperationResult::DocumentUpdated {
520
296
                            collection: operation.collection.clone(),
521
296
                            header: doc.header,
522
296
                        }));
523
148
                    } else {
524
148
                        result = Some(Err(Error::Core(bonsaidb_core::Error::DocumentConflict(
525
148
                            operation.collection.clone(),
526
148
                            Box::new(doc.header),
527
148
                        ))));
528
148
                    }
529
296
                } else if check_revision.is_none() {
530
148
                    let doc = BorrowedDocument::new(id.clone(), contents);
531
148
                    match serialize_document(&doc).map(|bytes| (doc, bytes)) {
532
148
                        Ok((doc, serialized)) => {
533
148
                            result = Some(Ok(OperationResult::DocumentUpdated {
534
148
                                collection: operation.collection.clone(),
535
148
                                header: doc.header,
536
148
                            }));
537
148
                            updated = true;
538
148
                            return nebari::tree::KeyOperation::Set(ArcBytes::from(serialized));
539
                        }
540
                        Err(err) => {
541
                            result = Some(Err(Error::from(err)));
542
                        }
543
                    }
544
148
                } else {
545
148
                    result = Some(Err(Error::Core(bonsaidb_core::Error::DocumentNotFound(
546
148
                        operation.collection.clone(),
547
148
                        Box::new(id.clone()),
548
148
                    ))));
549
148
                }
550
592
                nebari::tree::KeyOperation::Skip
551
211272
            })),
552
211272
        )?;
553
211272
        drop(documents);
554
211272

            
555
211272
        if updated {
556
210680
            self.update_eager_views(&document_id, operation, transaction, tree_index_map)?;
557
592
        }
558

            
559
210976
        result.expect("nebari should invoke the callback even when the key isn't found")
560
211272
    }
561

            
562
    #[cfg_attr(
563
        feature = "tracing",
564
        tracing::instrument(
565
            level = "trace",
566
            skip(self, operation, transaction, tree_index_map, contents),
567
            fields(
568
                database = self.name(),
569
                collection.name = operation.collection.name.as_ref(),
570
                collection.authority = operation.collection.authority.as_ref()
571
            )
572
        )
573
    )]
574
819812
    fn execute_insert(
575
819812
        &self,
576
819812
        operation: &Operation,
577
819812
        transaction: &mut ExecutingTransaction<AnyFile>,
578
819812
        tree_index_map: &HashMap<String, usize>,
579
819812
        id: Option<DocumentId>,
580
819812
        contents: &[u8],
581
819812
    ) -> Result<OperationResult, Error> {
582
819812
        let mut documents = transaction
583
819812
            .tree::<Versioned>(tree_index_map[&document_tree_name(&operation.collection)])
584
819812
            .unwrap();
585
819812
        let id = if let Some(id) = id {
586
379326
            id
587
440486
        } else if let Some(last_key) = documents.last_key()? {
588
409143
            let id = DocumentId::try_from(last_key.as_slice())?;
589
409143
            self.data
590
409143
                .schema
591
409143
                .next_id_for_collection(&operation.collection, Some(id))?
592
        } else {
593
31343
            self.data
594
31343
                .schema
595
31343
                .next_id_for_collection(&operation.collection, None)?
596
        };
597

            
598
819776
        let doc = BorrowedDocument::new(id, contents);
599
819776
        let serialized: Vec<u8> = serialize_document(&doc)?;
600
819776
        let document_id = ArcBytes::from(doc.header.id.as_ref().to_vec());
601
819776
        if let Some(document) = documents.replace(document_id.clone(), serialized)? {
602
18185
            let doc = deserialize_document(&document)?;
603
18185
            Err(Error::Core(bonsaidb_core::Error::DocumentConflict(
604
18185
                operation.collection.clone(),
605
18185
                Box::new(doc.header),
606
18185
            )))
607
        } else {
608
801591
            drop(documents);
609
801591
            self.update_eager_views(&document_id, operation, transaction, tree_index_map)?;
610

            
611
801185
            Ok(OperationResult::DocumentUpdated {
612
801185
                collection: operation.collection.clone(),
613
801185
                header: doc.header,
614
801185
            })
615
        }
616
819812
    }
617

            
618
    #[cfg_attr(feature = "tracing", tracing::instrument(
619
        level = "trace",
620
        skip(self, operation, transaction, tree_index_map),
621
        fields(
622
            database = self.name(),
623
            collection.name = operation.collection.name.as_ref(),
624
            collection.authority = operation.collection.authority.as_ref()
625
        )
626
    ))]
627
58536
    fn execute_delete(
628
58536
        &self,
629
58536
        operation: &Operation,
630
58536
        transaction: &mut ExecutingTransaction<AnyFile>,
631
58536
        tree_index_map: &HashMap<String, usize>,
632
58536
        header: &Header,
633
58536
    ) -> Result<OperationResult, Error> {
634
58536
        let mut documents = transaction
635
58536
            .tree::<Versioned>(tree_index_map[&document_tree_name(&operation.collection)])
636
58536
            .unwrap();
637
58536
        if let Some(vec) = documents.remove(header.id.as_ref())? {
638
58536
            drop(documents);
639
58536
            let doc = deserialize_document(&vec)?;
640
58536
            if &doc.header == header {
641
58536
                self.update_eager_views(
642
58536
                    &ArcBytes::from(doc.header.id.to_vec()),
643
58536
                    operation,
644
58536
                    transaction,
645
58536
                    tree_index_map,
646
58536
                )?;
647

            
648
58536
                Ok(OperationResult::DocumentDeleted {
649
58536
                    collection: operation.collection.clone(),
650
58536
                    id: header.id.clone(),
651
58536
                })
652
            } else {
653
                Err(Error::Core(bonsaidb_core::Error::DocumentConflict(
654
                    operation.collection.clone(),
655
                    Box::new(header.clone()),
656
                )))
657
            }
658
        } else {
659
            Err(Error::Core(bonsaidb_core::Error::DocumentNotFound(
660
                operation.collection.clone(),
661
                Box::new(header.id.clone()),
662
            )))
663
        }
664
58536
    }
665

            
666
    #[cfg_attr(feature = "tracing", tracing::instrument(
667
        level = "trace",
668
        skip(self, operation, transaction, tree_index_map),
669
        fields(
670
            database = self.name(),
671
            collection.name = operation.collection.name.as_ref(),
672
            collection.authority = operation.collection.authority.as_ref()
673
        )
674
    ))]
675
1070807
    fn update_eager_views(
676
1070807
        &self,
677
1070807
        document_id: &ArcBytes<'static>,
678
1070807
        operation: &Operation,
679
1070807
        transaction: &mut ExecutingTransaction<AnyFile>,
680
1070807
        tree_index_map: &HashMap<String, usize>,
681
1070807
    ) -> Result<(), Error> {
682
1070807
        let mut eager_views = self
683
1070807
            .data
684
1070807
            .schema
685
1070807
            .eager_views_in_collection(&operation.collection)
686
1070807
            .peekable();
687
1070807
        if eager_views.peek().is_some() {
688
431345
            let documents = transaction
689
431345
                .unlocked_tree(tree_index_map[&document_tree_name(&operation.collection)])
690
431345
                .unwrap();
691
861988
            for view in eager_views {
692
431345
                let name = view.view_name();
693
431345
                let document_map = transaction
694
431345
                    .unlocked_tree(tree_index_map[&view_document_map_tree_name(&name)])
695
431345
                    .unwrap();
696
431345
                let view_entries = transaction
697
431345
                    .unlocked_tree(tree_index_map[&view_entries_tree_name(&name)])
698
431345
                    .unwrap();
699
431345
                mapper::DocumentRequest {
700
431345
                    database: self,
701
431345
                    document_ids: vec![document_id.clone()],
702
431345
                    map_request: &mapper::Map {
703
431345
                        database: self.data.name.clone(),
704
431345
                        collection: operation.collection.clone(),
705
431345
                        view_name: name.clone(),
706
431345
                    },
707
431345
                    document_map,
708
431345
                    documents,
709
431345
                    view_entries,
710
431345
                    view,
711
431345
                }
712
431345
                .map()?;
713
            }
714
639462
        }
715

            
716
1070105
        Ok(())
717
1070807
    }
718

            
719
    #[cfg_attr(feature = "tracing", tracing::instrument(
720
        level = "trace",
721
        skip(operation, transaction, tree_index_map),
722
        fields(
723
            collection.name = operation.collection.name.as_ref(),
724
            collection.authority = operation.collection.authority.as_ref(),
725
        ),
726
    ))]
727
1456
    fn execute_check(
728
1456
        operation: &Operation,
729
1456
        transaction: &mut ExecutingTransaction<AnyFile>,
730
1456
        tree_index_map: &HashMap<String, usize>,
731
1456
        id: DocumentId,
732
1456
        revision: Option<Revision>,
733
1456
    ) -> Result<OperationResult, Error> {
734
1456
        let mut documents = transaction
735
1456
            .tree::<Versioned>(tree_index_map[&document_tree_name(&operation.collection)])
736
1456
            .unwrap();
737
1456
        if let Some(vec) = documents.get(id.as_ref())? {
738
1308
            drop(documents);
739

            
740
1308
            if let Some(revision) = revision {
741
296
                let doc = deserialize_document(&vec)?;
742
296
                if doc.header.revision != revision {
743
148
                    return Err(Error::Core(bonsaidb_core::Error::DocumentConflict(
744
148
                        operation.collection.clone(),
745
148
                        Box::new(Header { id, revision }),
746
148
                    )));
747
148
                }
748
1012
            }
749

            
750
1160
            Ok(OperationResult::Success)
751
        } else {
752
148
            Err(Error::Core(bonsaidb_core::Error::DocumentNotFound(
753
148
                operation.collection.clone(),
754
148
                Box::new(id),
755
148
            )))
756
        }
757
1456
    }
758

            
759
1073884
    fn create_view_iterator(
760
1073884
        view_entries: &Tree<Unversioned, AnyFile>,
761
1073884
        key: Option<SerializedQueryKey>,
762
1073884
        order: Sort,
763
1073884
        limit: Option<u32>,
764
1073884
    ) -> Result<Vec<ViewEntry>, Error> {
765
1073884
        let mut values = Vec::new();
766
1073884
        let forwards = match order {
767
1073736
            Sort::Ascending => true,
768
148
            Sort::Descending => false,
769
        };
770
1073884
        let mut values_read = 0;
771
1073884
        if let Some(key) = key {
772
1066939
            match key {
773
1412
                SerializedQueryKey::Range(range) => {
774
1412
                    view_entries.scan::<Infallible, _, _, _, _>(
775
2752
                        &range.map_ref(|bytes| &bytes[..]),
776
1412
                        forwards,
777
1412
                        |_, _, _| ScanEvaluation::ReadData,
778
1412
                        |_, _| {
779
3436
                            if let Some(limit) = limit {
780
296
                                if values_read >= limit {
781
148
                                    return ScanEvaluation::Stop;
782
148
                                }
783
148
                                values_read += 1;
784
3140
                            }
785
3288
                            ScanEvaluation::ReadData
786
3436
                        },
787
3288
                        |_key, _index, value| {
788
3288
                            values.push(value);
789
3288
                            Ok(())
790
3288
                        },
791
1412
                    )?;
792
                }
793
1064799
                SerializedQueryKey::Matches(key) => {
794
1064799
                    values.extend(view_entries.get(&key)?);
795
                }
796
728
                SerializedQueryKey::Multiple(mut list) => {
797
728
                    list.sort();
798
728

            
799
728
                    values.extend(
800
728
                        view_entries
801
1168
                            .get_multiple(list.iter().map(|bytes| bytes.as_slice()))?
802
728
                            .into_iter()
803
1168
                            .map(|(_, value)| value),
804
                    );
805
                }
806
            }
807
        } else {
808
6945
            view_entries.scan::<Infallible, _, _, _, _>(
809
6945
                &(..),
810
6945
                forwards,
811
6945
                |_, _, _| ScanEvaluation::ReadData,
812
6945
                |_, _| {
813
7820
                    if let Some(limit) = limit {
814
                        if values_read >= limit {
815
                            return ScanEvaluation::Stop;
816
                        }
817
                        values_read += 1;
818
7820
                    }
819
7820
                    ScanEvaluation::ReadData
820
7917
                },
821
7917
                |_, _, value| {
822
7820
                    values.push(value);
823
7820
                    Ok(())
824
7917
                },
825
6945
            )?;
826
        }
827

            
828
1073884
        values
829
1073884
            .into_iter()
830
1073884
            .map(|value| bincode::deserialize(&value).map_err(Error::from))
831
1073884
            .collect::<Result<Vec<_>, Error>>()
832
1073884
    }
833

            
834
    #[cfg(any(feature = "encryption", feature = "compression"))]
835
4020209
    pub(crate) fn collection_encryption_key(&self, collection: &CollectionName) -> Option<&KeyId> {
836
4020209
        self.schematic()
837
4020209
            .encryption_key_for_collection(collection)
838
4020209
            .or_else(|| self.storage.default_encryption_key())
839
4020209
    }
840

            
841
    #[cfg_attr(
842
        not(feature = "encryption"),
843
        allow(
844
            unused_mut,
845
            unused_variables,
846
            clippy::unused_self,
847
            clippy::let_and_return
848
        )
849
    )]
850
    #[allow(clippy::unnecessary_wraps)]
851
2928837
    pub(crate) fn collection_tree<R: Root, S: Into<Cow<'static, str>>>(
852
2928837
        &self,
853
2928837
        collection: &CollectionName,
854
2928837
        name: S,
855
2928837
    ) -> Result<TreeRoot<R, AnyFile>, Error> {
856
2928837
        let mut tree = R::tree(name);
857
2928837

            
858
2928837
        #[cfg(any(feature = "encryption", feature = "compression"))]
859
2928837
        match (
860
2928837
            self.collection_encryption_key(collection),
861
2928837
            self.storage().tree_vault().cloned(),
862
        ) {
863
42363
            (Some(override_key), Some(mut vault)) => {
864
42363
                #[cfg(feature = "encryption")]
865
42363
                {
866
42363
                    vault.key = Some(override_key.clone());
867
42363
                    tree = tree.with_vault(vault);
868
42363
                }
869

            
870
                #[cfg(not(feature = "encryption"))]
871
                {
872
                    return Err(Error::EncryptionDisabled);
873
                }
874
            }
875
710524
            (None, Some(vault)) => {
876
710524
                tree = tree.with_vault(vault);
877
710524
            }
878
2175950
            (key, None) => {
879
                #[cfg(feature = "encryption")]
880
2175950
                if let Some(vault) = TreeVault::new_if_needed(
881
2175950
                    key.cloned(),
882
2175950
                    self.storage().vault(),
883
2175950
                    #[cfg(feature = "compression")]
884
2175950
                    None,
885
2175950
                ) {
886
45399
                    tree = tree.with_vault(vault);
887
2130551
                }
888

            
889
                #[cfg(not(feature = "encryption"))]
890
                if key.is_some() {
891
                    return Err(Error::EncryptionDisabled);
892
                }
893
            }
894
        }
895

            
896
2928837
        Ok(tree)
897
2928837
    }
898

            
899
1
    pub(crate) fn update_key_expiration<'key>(
900
1
        &self,
901
1
        tree_key: impl Into<Cow<'key, str>>,
902
1
        expiration: Option<Timestamp>,
903
1
    ) {
904
1
        self.data
905
1
            .context
906
1
            .update_key_expiration(tree_key, expiration);
907
1
    }
908

            
909
    /// Converts this instance into its blocking version, which is able to be
910
    /// used without async. The returned instance uses the current Tokio runtime
911
    /// handle to spawn blocking tasks.
912
    ///
913
    /// # Panics
914
    ///
915
    /// Panics if called outside the context of a Tokio runtime.
916
    #[cfg(feature = "async")]
917
    #[must_use]
918
2602194
    pub fn into_async(self) -> crate::AsyncDatabase {
919
2602194
        self.into_async_with_runtime(tokio::runtime::Handle::current())
920
2602194
    }
921

            
922
    /// Converts this instance into its blocking version, which is able to be
923
    /// used without async. The returned instance uses the provided runtime
924
    /// handle to spawn blocking tasks.
925
    #[cfg(feature = "async")]
926
    #[must_use]
927
2602194
    pub fn into_async_with_runtime(self, runtime: tokio::runtime::Handle) -> crate::AsyncDatabase {
928
2602194
        crate::AsyncDatabase {
929
2602194
            database: self,
930
2602194
            runtime: Arc::new(runtime),
931
2602194
        }
932
2602194
    }
933

            
934
    /// Converts this instance into its blocking version, which is able to be
935
    /// used without async. The returned instance uses the current Tokio runtime
936
    /// handle to spawn blocking tasks.
937
    ///
938
    /// # Panics
939
    ///
940
    /// Panics if called outside the context of a Tokio runtime.
941
    #[cfg(feature = "async")]
942
    #[must_use]
943
    pub fn to_async(&self) -> crate::AsyncDatabase {
944
        self.clone().into_async()
945
    }
946

            
947
    /// Converts this instance into its blocking version, which is able to be
948
    /// used without async. The returned instance uses the provided runtime
949
    /// handle to spawn blocking tasks.
950
    #[cfg(feature = "async")]
951
    #[must_use]
952
    pub fn to_async_with_runtime(&self, runtime: tokio::runtime::Handle) -> crate::AsyncDatabase {
953
        self.clone().into_async_with_runtime(runtime)
954
    }
955
}
956
59
#[derive(Serialize, Deserialize)]
957
struct LegacyHeader {
958
    id: u64,
959
    revision: Revision,
960
}
961
59
#[derive(Serialize, Deserialize)]
962
struct LegacyDocument<'a> {
963
    header: LegacyHeader,
964
    #[serde(borrow)]
965
    contents: &'a [u8],
966
}
967

            
968
1710192
pub(crate) fn deserialize_document(bytes: &[u8]) -> Result<BorrowedDocument<'_>, Error> {
969
1710192
    match pot::from_slice::<BorrowedDocument<'_>>(bytes) {
970
1710133
        Ok(document) => Ok(document),
971
59
        Err(err) => match bincode::deserialize::<LegacyDocument<'_>>(bytes) {
972
59
            Ok(legacy_doc) => Ok(BorrowedDocument {
973
59
                header: Header {
974
59
                    id: DocumentId::from_u64(legacy_doc.header.id),
975
59
                    revision: legacy_doc.header.revision,
976
59
                },
977
59
                contents: CowBytes::from(legacy_doc.contents),
978
59
            }),
979
            Err(_) => Err(Error::from(err)),
980
        },
981
    }
982
1710192
}
983

            
984
1030456
fn serialize_document(document: &BorrowedDocument<'_>) -> Result<Vec<u8>, bonsaidb_core::Error> {
985
1030456
    pot::to_vec(document)
986
1030456
        .map_err(Error::from)
987
1030456
        .map_err(bonsaidb_core::Error::from)
988
1030456
}
989

            
990
impl HasSession for Database {
991
5271324
    fn session(&self) -> Option<&Session> {
992
5271324
        self.storage.session()
993
5271324
    }
994
}
995

            
996
impl Connection for Database {
997
    type Storage = Storage;
998

            
999
6679374
    fn storage(&self) -> Self::Storage {
6679374
        self.storage.clone()
6679374
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(
        level = "trace",
        skip(self),
        fields(
            database = self.name(),
        )
    ))]
    fn list_executed_transactions(
        &self,
        starting_id: Option<u64>,
        result_limit: Option<u32>,
    ) -> Result<Vec<transaction::Executed>, bonsaidb_core::Error> {
38748
        self.check_permission(
38748
            database_resource_name(self.name()),
38748
            &BonsaiAction::Database(DatabaseAction::Transaction(TransactionAction::ListExecuted)),
38748
        )?;
38748
        let result_limit = usize::try_from(
38748
            result_limit
38748
                .unwrap_or(LIST_TRANSACTIONS_DEFAULT_RESULT_COUNT)
38748
                .min(LIST_TRANSACTIONS_MAX_RESULTS),
38748
        )
38748
        .unwrap();
38748
        if result_limit > 0 {
38600
            let range = if let Some(starting_id) = starting_id {
19812
                Range::from(starting_id..)
            } else {
18788
                Range::from(..)
            };

            
38600
            let mut entries = Vec::new();
38600
            self.roots()
38600
                .transactions()
367492
                .scan(range, |entry| {
367492
                    if entry.data().is_some() {
366446
                        entries.push(entry);
366446
                    }
367492
                    entries.len() < result_limit
367492
                })
38600
                .map_err(Error::from)?;

            
38600
            entries
38600
                .into_iter()
38600
                .map(|entry| {
366446
                    if let Some(data) = entry.data() {
366446
                        let changes = compat::deserialize_executed_transaction_changes(data)?;
366446
                        Ok(Some(transaction::Executed {
366446
                            id: entry.id,
366446
                            changes,
366446
                        }))
                    } else {
                        Ok(None)
                    }
366446
                })
38600
                .filter_map(Result::transpose)
38600
                .collect::<Result<Vec<_>, Error>>()
38600
                .map_err(bonsaidb_core::Error::from)
        } else {
            // A request was made to return an empty result? This should probably be
            // an error, but technically this is a correct response.
148
            Ok(Vec::default())
        }
38748
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(
        level = "trace",
        skip(self),
        fields(
            database = self.name(),
        )
    ))]
    fn last_transaction_id(&self) -> Result<Option<u64>, bonsaidb_core::Error> {
614057
        self.check_permission(
614057
            database_resource_name(self.name()),
614057
            &BonsaiAction::Database(DatabaseAction::Transaction(TransactionAction::GetLastId)),
614057
        )?;
614057
        Ok(self.roots().transactions().current_transaction_id())
614057
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(
        level = "trace",
        skip(self),
        fields(
            database = self.name(),
        )
    ))]
    fn compact(&self) -> Result<(), bonsaidb_core::Error> {
148
        self.check_permission(
148
            database_resource_name(self.name()),
148
            &BonsaiAction::Database(DatabaseAction::Compact),
148
        )?;
148
        self.storage()
148
            .instance
148
            .tasks()
148
            .compact_database(self.clone())?;
148
        Ok(())
148
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(
        level = "trace",
        skip(self),
        fields(
            database = self.name(),
        )
    ))]
    fn compact_key_value_store(&self) -> Result<(), bonsaidb_core::Error> {
148
        self.check_permission(
148
            kv_resource_name(self.name()),
148
            &BonsaiAction::Database(DatabaseAction::Compact),
148
        )?;
148
        self.storage()
148
            .instance
148
            .tasks()
148
            .compact_key_value_store(self.clone())?;
148
        Ok(())
148
    }
}

            
impl LowLevelConnection for Database {
    #[cfg_attr(feature = "tracing", tracing::instrument(
        level = "trace",
        skip(self,  transaction),
        fields(
            database = self.name(),
        )
    ))]
677457
    fn apply_transaction(
677457
        &self,
677457
        transaction: Transaction,
677457
    ) -> Result<Vec<OperationResult>, bonsaidb_core::Error> {
1768977
        for op in &transaction.operations {
1091664
            let (resource, action) = match &op.command {
820032
                Command::Insert { .. } => (
820032
                    collection_resource_name(self.name(), &op.collection),
820032
                    BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Insert)),
820032
                ),
210680
                Command::Update { header, .. } => (
210680
                    document_resource_name(self.name(), &op.collection, &header.id),
210680
                    BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Update)),
210680
                ),
592
                Command::Overwrite { id, .. } => (
592
                    document_resource_name(self.name(), &op.collection, id),
592
                    BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Overwrite)),
592
                ),
58608
                Command::Delete { header } => (
58608
                    document_resource_name(self.name(), &op.collection, &header.id),
58608
                    BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Delete)),
58608
                ),
1752
                Command::Check { id, .. } => (
1752
                    document_resource_name(self.name(), &op.collection, id),
1752
                    BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Get)),
1752
                ),
            };
1091664
            self.check_permission(resource, &action)?;
        }

            
677313
        let mut eager_view_tasks = Vec::new();
680373
        for collection_name in transaction
677313
            .operations
677313
            .iter()
1091520
            .map(|op| &op.collection)
677313
            .collect::<HashSet<_>>()
        {
680372
            for view in self.data.schema.eager_views_in_collection(collection_name) {
380098
                if let Some(task) = self
380098
                    .storage
380098
                    .instance
380098
                    .tasks()
380098
                    .spawn_integrity_check(view, self)
6498
                {
6498
                    eager_view_tasks.push(task);
373600
                }
            }
        }

            
677313
        let mut eager_view_mapping_tasks = Vec::new();
683811
        for task in eager_view_tasks {
6498
            if let Some(spawned_task) = task.receive().map_err(Error::from)?.map_err(Error::from)? {
6280
                eager_view_mapping_tasks.push(spawned_task);
6280
            }
        }

            
683593
        for task in eager_view_mapping_tasks {
6280
            let mut task = task.lock();
6280
            if let Some(task) = task.take() {
4706
                task.receive().map_err(Error::from)?.map_err(Error::from)?;
1574
            }
        }

            
677313
        self.apply_transaction_to_roots(&transaction)
677313
            .map_err(bonsaidb_core::Error::from)
677457
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(
        level = "trace",
        skip(self, collection),
        fields(
            database = self.name(),
            collection.name = collection.name.as_ref(),
            collection.authority = collection.authority.as_ref(),
        )
    ))]
    fn get_from_collection(
        &self,
        id: DocumentId,
        collection: &CollectionName,
    ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error> {
597115
        self.check_permission(
597115
            document_resource_name(self.name(), collection, &id),
597115
            &BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Get)),
597115
        )?;
597115
        let tree = self
597115
            .data
597115
            .context
597115
            .roots
597115
            .tree(self.collection_tree::<Versioned, _>(collection, document_tree_name(collection))?)
597115
            .map_err(Error::from)?;
597115
        if let Some(vec) = tree.get(id.as_ref()).map_err(Error::from)? {
578562
            Ok(Some(deserialize_document(&vec)?.into_owned()))
        } else {
18552
            Ok(None)
        }
597115
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(
        level = "trace",
        skip(self, collection),
        fields(
            database = self.name(),
            collection.name = collection.name.as_ref(),
            collection.authority = collection.authority.as_ref(),
        )
    ))]
    fn list_from_collection(
        &self,
        ids: Range<DocumentId>,
        sort: Sort,
        limit: Option<u32>,
        collection: &CollectionName,
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
898
        self.check_permission(
898
            collection_resource_name(self.name(), collection),
898
            &BonsaiAction::Database(DatabaseAction::Document(DocumentAction::List)),
898
        )?;
898
        let tree = self
898
            .data
898
            .context
898
            .roots
898
            .tree(self.collection_tree::<Versioned, _>(collection, document_tree_name(collection))?)
898
            .map_err(Error::from)?;
898
        let mut found_docs = Vec::new();
898
        let mut keys_read = 0;
898
        let ids = DocumentIdRange(ids);
898
        tree.scan(
898
            &ids.borrow_as_bytes(),
898
            match sort {
750
                Sort::Ascending => true,
148
                Sort::Descending => false,
            },
24
            |_, _, _| ScanEvaluation::ReadData,
898
            |_, _| {
1315
                if let Some(limit) = limit {
296
                    if keys_read >= limit {
148
                        return ScanEvaluation::Stop;
148
                    }
148

            
148
                    keys_read += 1;
1019
                }
1167
                ScanEvaluation::ReadData
1315
            },
898
            |_, _, doc| {
                found_docs.push(
1167
                    deserialize_document(&doc)
1167
                        .map(BorrowedDocument::into_owned)
1167
                        .map_err(AbortError::Other)?,
                );
1167
                Ok(())
1167
            },
        )
898
        .map_err(|err| match err {
            AbortError::Other(err) => err,
            AbortError::Nebari(err) => crate::Error::from(err),
898
        })?;

            
898
        Ok(found_docs)
898
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(
        level = "trace",
        skip(self, collection),
        fields(
            database = self.name(),
            collection.name = collection.name.as_ref(),
            collection.authority = collection.authority.as_ref(),
        )
    ))]
    fn list_headers_from_collection(
        &self,
        ids: Range<DocumentId>,
        sort: Sort,
        limit: Option<u32>,
        collection: &CollectionName,
    ) -> Result<Vec<Header>, bonsaidb_core::Error> {
148
        self.check_permission(
148
            collection_resource_name(self.name(), collection),
148
            &BonsaiAction::Database(DatabaseAction::Document(DocumentAction::ListHeaders)),
148
        )?;
148
        let tree = self
148
            .data
148
            .context
148
            .roots
148
            .tree(self.collection_tree::<Versioned, _>(collection, document_tree_name(collection))?)
148
            .map_err(Error::from)?;
148
        let mut found_headers = Vec::new();
148
        let mut keys_read = 0;
148
        let ids = DocumentIdRange(ids);
148
        tree.scan(
148
            &ids.borrow_as_bytes(),
148
            match sort {
148
                Sort::Ascending => true,
                Sort::Descending => false,
            },
            |_, _, _| ScanEvaluation::ReadData,
148
            |_, _| {
296
                if let Some(limit) = limit {
                    if keys_read >= limit {
                        return ScanEvaluation::Stop;
                    }

            
                    keys_read += 1;
296
                }
296
                ScanEvaluation::ReadData
296
            },
148
            |_, _, doc| {
                found_headers.push(
296
                    deserialize_document(&doc)
296
                        .map(|doc| doc.header)
296
                        .map_err(AbortError::Other)?,
                );
296
                Ok(())
296
            },
        )
148
        .map_err(|err| match err {
            AbortError::Other(err) => err,
            AbortError::Nebari(err) => crate::Error::from(err),
148
        })?;

            
148
        Ok(found_headers)
148
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(
        level = "trace",
        skip(self, collection),
        fields(
            database = self.name(),
            collection.name = collection.name.as_ref(),
            collection.authority = collection.authority.as_ref(),
        )
    ))]
    fn count_from_collection(
        &self,
        ids: Range<DocumentId>,
        collection: &CollectionName,
    ) -> Result<u64, bonsaidb_core::Error> {
296
        self.check_permission(
296
            collection_resource_name(self.name(), collection),
296
            &BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Count)),
296
        )?;
296
        let tree = self
296
            .data
296
            .context
296
            .roots
296
            .tree(self.collection_tree::<Versioned, _>(collection, document_tree_name(collection))?)
296
            .map_err(Error::from)?;
296
        let ids = DocumentIdRange(ids);
296
        let stats = tree.reduce(&ids.borrow_as_bytes()).map_err(Error::from)?;

            
296
        Ok(stats.alive_keys)
296
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(
        level = "trace",
        skip(self, collection),
        fields(
            database = self.name(),
            collection.name = collection.name.as_ref(),
            collection.authority = collection.authority.as_ref(),
        )
    ))]
351354
    fn get_multiple_from_collection(
351354
        &self,
351354
        ids: &[DocumentId],
351354
        collection: &CollectionName,
351354
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
701564
        for id in ids {
350210
            self.check_permission(
350210
                document_resource_name(self.name(), collection, id),
350210
                &BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Get)),
350210
            )?;
        }
351354
        let mut ids = ids.to_vec();
351354
        let collection = collection.clone();
351354
        let tree = self
351354
            .data
351354
            .context
351354
            .roots
351354
            .tree(
351354
                self.collection_tree::<Versioned, _>(&collection, document_tree_name(&collection))?,
            )
351354
            .map_err(Error::from)?;
351354
        ids.sort();
351354
        let keys_and_values = tree
351362
            .get_multiple(ids.iter().map(|id| id.as_ref()))
351354
            .map_err(Error::from)?;

            
351354
        keys_and_values
351354
            .into_iter()
351362
            .map(|(_, value)| deserialize_document(&value).map(BorrowedDocument::into_owned))
351354
            .collect::<Result<Vec<_>, Error>>()
351354
            .map_err(bonsaidb_core::Error::from)
351354
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(
        level = "trace",
        skip(self, collection),
        fields(
            database = self.name(),
            collection.name = collection.name.as_ref(),
            collection.authority = collection.authority.as_ref(),
        )
    ))]
    fn compact_collection_by_name(
        &self,
        collection: CollectionName,
    ) -> Result<(), bonsaidb_core::Error> {
148
        self.check_permission(
148
            collection_resource_name(self.name(), &collection),
148
            &BonsaiAction::Database(DatabaseAction::Compact),
148
        )?;
148
        self.storage()
148
            .instance
148
            .tasks()
148
            .compact_collection(self.clone(), collection)?;
148
        Ok(())
148
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(
        level = "trace",
        skip(self, view),
        fields(
            database = self.name(),
            view.collection.name = view.collection.name.as_ref(),
            view.collection.authority = view.collection.authority.as_ref(),
            view.name = view.name.as_ref(),
        )
    ))]
385006
    fn query_by_name(
385006
        &self,
385006
        view: &ViewName,
385006
        key: Option<SerializedQueryKey>,
385006
        order: Sort,
385006
        limit: Option<u32>,
385006
        access_policy: AccessPolicy,
385006
    ) -> Result<Vec<schema::view::map::Serialized>, bonsaidb_core::Error> {
385006
        let view = self.schematic().view_by_name(view)?;
385006
        self.check_permission(
385006
            view_resource_name(self.name(), &view.view_name()),
385006
            &BonsaiAction::Database(DatabaseAction::View(ViewAction::Query)),
385006
        )?;
385006
        let mut results = Vec::new();
385006
        self.for_each_in_view(view, key, order, limit, access_policy, |entry| {
756994
            for mapping in entry.mappings {
382465
                results.push(bonsaidb_core::schema::view::map::Serialized {
382465
                    source: mapping.source,
382465
                    key: entry.key.clone(),
382465
                    value: mapping.value,
382465
                });
382465
            }
374529
            Ok(())
385006
        })?;

            
385006
        Ok(results)
385006
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(
        level = "trace",
        skip(self, view),
        fields(
            database = self.name(),
            view.collection.name = view.collection.name.as_ref(),
            view.collection.authority = view.collection.authority.as_ref(),
            view.name = view.name.as_ref(),
        )
    ))]
    fn query_by_name_with_docs(
        &self,
        view: &ViewName,
        key: Option<SerializedQueryKey>,
        order: Sort,
        limit: Option<u32>,
        access_policy: AccessPolicy,
    ) -> Result<schema::view::map::MappedSerializedDocuments, bonsaidb_core::Error> {
        let results = self.query_by_name(view, key, order, limit, access_policy)?;
        let view = self.schematic().view_by_name(view).unwrap(); // query() will fail if it's not present

            
        let documents = self
            .get_multiple_from_collection(
                &results
                    .iter()
                    .map(|m| m.source.id.clone())
                    .collect::<Vec<_>>(),
                &view.collection(),
            )?
            .into_iter()
            .map(|doc| (doc.header.id.clone(), doc))
            .collect::<BTreeMap<_, _>>();

            
        Ok(
            bonsaidb_core::schema::view::map::MappedSerializedDocuments {
                mappings: results,
                documents,
            },
        )
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(
        level = "trace",
        skip(self, view_name),
        fields(
            database = self.name(),
            view.collection.name = view_name.collection.name.as_ref(),
            view.collection.authority = view_name.collection.authority.as_ref(),
            view.name = view_name.name.as_ref(),
        )
    ))]
687986
    fn reduce_by_name(
687986
        &self,
687986
        view_name: &ViewName,
687986
        key: Option<SerializedQueryKey>,
687986
        access_policy: AccessPolicy,
687986
    ) -> Result<Vec<u8>, bonsaidb_core::Error> {
687986
        let mut mappings = self.reduce_grouped_by_name(view_name, key, access_policy)?;

            
687986
        let result = if mappings.len() == 1 {
523158
            mappings.pop().unwrap().value.into_vec()
        } else {
164828
            let view = self.data.schema.view_by_name(view_name)?;
164828
            view.reduce(
164828
                &mappings
164828
                    .iter()
164828
                    .map(|map| (map.key.as_ref(), map.value.as_ref()))
164828
                    .collect::<Vec<_>>(),
164828
                true,
164828
            )
164828
            .map_err(Error::from)?
        };

            
687838
        Ok(result)
687986
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(
        level = "trace",
        skip(self, view_name),
        fields(
            database = self.name(),
            view.collection.name = view_name.collection.name.as_ref(),
            view.collection.authority = view_name.collection.authority.as_ref(),
            view.name = view_name.name.as_ref(),
        )
    ))]
688366
    fn reduce_grouped_by_name(
688366
        &self,
688366
        view_name: &ViewName,
688366
        key: Option<SerializedQueryKey>,
688366
        access_policy: AccessPolicy,
688366
    ) -> Result<Vec<MappedSerializedValue>, bonsaidb_core::Error> {
688366
        let view = self.data.schema.view_by_name(view_name)?;
688366
        self.check_permission(
688366
            view_resource_name(self.name(), &view.view_name()),
688366
            &BonsaiAction::Database(DatabaseAction::View(ViewAction::Reduce)),
688366
        )?;
688366
        let mut mappings = Vec::new();
688382
        self.for_each_in_view(view, key, Sort::Ascending, None, access_policy, |entry| {
530378
            mappings.push(MappedSerializedValue {
530378
                key: entry.key,
530378
                value: entry.reduced_value,
530378
            });
530378
            Ok(())
688382
        })?;

            
688366
        Ok(mappings)
688366
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(
        level = "trace",
        skip(self, view),
        fields(
            database = self.name(),
            view.collection.name = view.collection.name.as_ref(),
            view.collection.authority = view.collection.authority.as_ref(),
            view.name = view.name.as_ref(),
        )
    ))]
512
    fn delete_docs_by_name(
512
        &self,
512
        view: &ViewName,
512
        key: Option<SerializedQueryKey>,
512
        access_policy: AccessPolicy,
512
    ) -> Result<u64, bonsaidb_core::Error> {
512
        let view = self.data.schema.view_by_name(view)?;
512
        let collection = view.collection();
512
        let mut transaction = Transaction::default();
512
        self.for_each_in_view(view, key, Sort::Ascending, None, access_policy, |entry| {
3408
            for mapping in entry.mappings {
2968
                transaction.push(Operation::delete(collection.clone(), mapping.source));
2968
            }

            
440
            Ok(())
512
        })?;

            
512
        let results = LowLevelConnection::apply_transaction(self, transaction)?;

            
512
        Ok(results.len() as u64)
512
    }
}

            
impl HasSchema for Database {
27016
    fn schematic(&self) -> &Schematic {
27016
        &self.data.schema
27016
    }
}

            
type ViewIterator<'a> =
    Box<dyn Iterator<Item = Result<(ArcBytes<'static>, ArcBytes<'static>), Error>> + 'a>;

            
struct ViewEntryCollectionIterator<'a> {
    iterator: ViewIterator<'a>,
}

            
impl<'a> Iterator for ViewEntryCollectionIterator<'a> {
    type Item = Result<ViewEntry, crate::Error>;

            
    fn next(&mut self) -> Option<Self::Item> {
        self.iterator.next().map(|item| {
            item.map_err(crate::Error::from)
                .and_then(|(_, value)| bincode::deserialize(&value).map_err(Error::from))
        })
    }
}

            
2670763
#[derive(Debug, Clone)]
pub(crate) struct Context {
    data: Arc<ContextData>,
}

            
impl Deref for Context {
    type Target = ContextData;

            
4324371
    fn deref(&self) -> &Self::Target {
4324371
        &self.data
4324371
    }
}

            
#[derive(Debug)]
pub(crate) struct ContextData {
    pub(crate) roots: Roots<AnyFile>,
    key_value_state: Arc<Mutex<keyvalue::KeyValueState>>,
}

            
impl Borrow<Roots<AnyFile>> for Context {
    fn borrow(&self) -> &Roots<AnyFile> {
        &self.data.roots
    }
}

            
impl Context {
34893
    pub(crate) fn new(
34893
        roots: Roots<AnyFile>,
34893
        key_value_persistence: KeyValuePersistence,
34893
        storage_lock: Option<StorageLock>,
34893
    ) -> Self {
34893
        let background_worker_target = Watchable::new(BackgroundWorkerProcessTarget::Never);
34893
        let mut background_worker_target_watcher = background_worker_target.watch();
34893
        let key_value_state = Arc::new(Mutex::new(keyvalue::KeyValueState::new(
34893
            key_value_persistence,
34893
            roots.clone(),
34893
            background_worker_target,
34893
        )));
34893
        let background_worker_state = Arc::downgrade(&key_value_state);
34893
        let context = Self {
34893
            data: Arc::new(ContextData {
34893
                roots,
34893
                key_value_state,
34893
            }),
34893
        };
34893
        std::thread::Builder::new()
34893
            .name(String::from("keyvalue-worker"))
34893
            .spawn(move || {
34893
                keyvalue::background_worker(
34893
                    &background_worker_state,
34893
                    &mut background_worker_target_watcher,
34893
                    storage_lock,
34893
                );
34893
            })
34893
            .unwrap();
34893
        context
34893
    }

            
1496680
    pub(crate) fn perform_kv_operation(
1496680
        &self,
1496680
        op: KeyOperation,
1496680
    ) -> Result<Output, bonsaidb_core::Error> {
1496680
        let mut state = self.data.key_value_state.lock();
1496680
        state.perform_kv_operation(op, &self.data.key_value_state)
1496680
    }

            
14
    pub(crate) fn update_key_expiration<'key>(
14
        &self,
14
        tree_key: impl Into<Cow<'key, str>>,
14
        expiration: Option<Timestamp>,
14
    ) {
14
        let mut state = self.data.key_value_state.lock();
14
        state.update_key_expiration(tree_key, expiration);
14
    }

            
    #[cfg(test)]
5
    pub(crate) fn kv_persistence_watcher(&self) -> watchable::Watcher<Timestamp> {
5
        let state = self.data.key_value_state.lock();
5
        state.persistence_watcher()
5
    }
}

            
impl Drop for ContextData {
    fn drop(&mut self) {
31041
        if let Some(shutdown) = {
31041
            let mut state = self.key_value_state.lock();
31041
            state.shutdown(&self.key_value_state)
31041
        } {
361
            let _: Result<_, _> = shutdown.recv();
30680
        }
31041
    }
}

            
3795392
pub fn document_tree_name(collection: &CollectionName) -> String {
3795392
    format!("collection.{collection:#}")
3795392
}

            
pub struct DocumentIdRange(Range<DocumentId>);

            
impl<'a> BorrowByteRange<'a> for DocumentIdRange {
1342
    fn borrow_as_bytes(&'a self) -> BorrowedRange<'a> {
1342
        BorrowedRange {
1342
            start: match &self.0.start {
602
                connection::Bound::Unbounded => ops::Bound::Unbounded,
740
                connection::Bound::Included(docid) => ops::Bound::Included(docid.as_ref()),
                connection::Bound::Excluded(docid) => ops::Bound::Excluded(docid.as_ref()),
            },
1342
            end: match &self.0.end {
602
                connection::Bound::Unbounded => ops::Bound::Unbounded,
592
                connection::Bound::Included(docid) => ops::Bound::Included(docid.as_ref()),
148
                connection::Bound::Excluded(docid) => ops::Bound::Excluded(docid.as_ref()),
            },
        }
1342
    }
}

            
/// Operations that can be performed on both [`Database`] and
/// [`AsyncDatabase`](crate::AsyncDatabase).
pub trait DatabaseNonBlocking {
    /// Returns the name of the database.
    #[must_use]
    fn name(&self) -> &str;
}

            
impl DatabaseNonBlocking for Database {
5281795
    fn name(&self) -> &str {
5281795
        self.data.name.as_ref()
5281795
    }
}