1
use std::borrow::{Borrow, Cow};
2
use std::collections::{BTreeMap, HashMap, HashSet};
3
use std::convert::Infallible;
4
use std::ops::{self, Deref};
5
use std::sync::Arc;
6
use std::u8;
7

            
8
use bonsaidb_core::arc_bytes::serde::CowBytes;
9
use bonsaidb_core::arc_bytes::ArcBytes;
10
use bonsaidb_core::connection::{
11
    self, AccessPolicy, Connection, HasSchema, HasSession, LowLevelConnection, Range,
12
    SerializedQueryKey, Session, Sort, StorageConnection,
13
};
14
#[cfg(any(feature = "encryption", feature = "compression"))]
15
use bonsaidb_core::document::KeyId;
16
use bonsaidb_core::document::{BorrowedDocument, DocumentId, Header, OwnedDocument, Revision};
17
use bonsaidb_core::keyvalue::{KeyOperation, Output, Timestamp};
18
use bonsaidb_core::limits::{
19
    LIST_TRANSACTIONS_DEFAULT_RESULT_COUNT, LIST_TRANSACTIONS_MAX_RESULTS,
20
};
21
use bonsaidb_core::permissions::bonsai::{
22
    collection_resource_name, database_resource_name, document_resource_name, kv_resource_name,
23
    view_resource_name, BonsaiAction, DatabaseAction, DocumentAction, TransactionAction,
24
    ViewAction,
25
};
26
use bonsaidb_core::permissions::Permissions;
27
use bonsaidb_core::schema::view::map::MappedSerializedValue;
28
use bonsaidb_core::schema::view::{self};
29
use bonsaidb_core::schema::{self, CollectionName, Schema, Schematic, ViewName};
30
use bonsaidb_core::transaction::{
31
    self, ChangedDocument, Changes, Command, DocumentChanges, Operation, OperationResult,
32
    Transaction,
33
};
34
use itertools::Itertools;
35
use nebari::io::any::AnyFile;
36
use nebari::tree::{
37
    AnyTreeRoot, BorrowByteRange, BorrowedRange, CompareSwap, Root, ScanEvaluation, TreeRoot,
38
    Unversioned, Versioned,
39
};
40
use nebari::{AbortError, ExecutingTransaction, Roots, Tree};
41
use parking_lot::Mutex;
42
use serde::{Deserialize, Serialize};
43
use watchable::Watchable;
44

            
45
use crate::config::{Builder, KeyValuePersistence, StorageConfiguration};
46
use crate::database::keyvalue::BackgroundWorkerProcessTarget;
47
use crate::error::Error;
48
use crate::open_trees::OpenTrees;
49
use crate::storage::StorageLock;
50
#[cfg(feature = "encryption")]
51
use crate::storage::TreeVault;
52
use crate::views::{
53
    mapper, view_document_map_tree_name, view_entries_tree_name, view_invalidated_docs_tree_name,
54
    ViewEntry,
55
};
56
use crate::Storage;
57

            
58
pub mod keyvalue;
59

            
60
pub(crate) mod compat;
61
pub mod pubsub;
62

            
63
/// A database stored in BonsaiDb. This type blocks the current thread when
64
/// used. See [`AsyncDatabase`](crate::AsyncDatabase) for this type's async counterpart.
65
///
66
/// ## Converting between Blocking and Async Types
67
///
68
/// [`AsyncDatabase`](crate::AsyncDatabase) and [`Database`] can be converted to and from each other
69
/// using:
70
///
71
/// - [`AsyncDatabase::into_blocking()`](crate::AsyncDatabase::into_blocking)
72
/// - [`AsyncDatabase::to_blocking()`](crate::AsyncDatabase::to_blocking)
73
/// - [`AsyncDatabase::as_blocking()`](crate::AsyncDatabase::as_blocking)
74
/// - [`Database::into_async()`]
75
/// - [`Database::to_async()`]
76
/// - [`Database::into_async_with_runtime()`]
77
/// - [`Database::to_async_with_runtime()`]
78
///
79
/// ## Using `Database` to create a single database
80
///
81
/// `Database`provides an easy mechanism to open and access a single database:
82
///
83
/// ```rust
84
/// // `bonsaidb_core` is re-exported to `bonsaidb::core` or `bonsaidb_local::core`.
85
/// use bonsaidb_core::schema::Collection;
86
/// // `bonsaidb_local` is re-exported to `bonsaidb::local` if using the omnibus crate.
87
/// use bonsaidb_local::{
88
///     config::{Builder, StorageConfiguration},
89
///     Database,
90
/// };
91
/// use serde::{Deserialize, Serialize};
92
///
93
/// #[derive(Debug, Serialize, Deserialize, Collection)]
94
/// #[collection(name = "blog-posts")]
95
/// # #[collection(core = bonsaidb_core)]
96
/// struct BlogPost {
97
///     pub title: String,
98
///     pub contents: String,
99
/// }
100
///
101
/// # fn test() -> Result<(), bonsaidb_local::Error> {
102
/// let db = Database::open::<BlogPost>(StorageConfiguration::new("my-db.bonsaidb"))?;
103
/// # Ok(())
104
/// # }
105
/// ```
106
///
107
/// Under the hood, this initializes a [`Storage`] instance pointing at
108
/// "./my-db.bonsaidb". It then returns (or creates) a database named "default"
109
/// with the schema `BlogPost`.
110
///
111
/// In this example, `BlogPost` implements the [`Collection`](schema::Collection) trait, and all
112
/// collections can be used as a [`Schema`].
113
4732378
#[derive(Debug, Clone)]
114
pub struct Database {
115
    pub(crate) data: Arc<Data>,
116
    pub(crate) storage: Storage,
117
}
118

            
119
#[derive(Debug)]
120
pub struct Data {
121
    pub name: Arc<Cow<'static, str>>,
122
    context: Context,
123
    pub(crate) schema: Arc<Schematic>,
124
}
125

            
126
impl Database {
127
    /// Opens a local file as a bonsaidb.
128
142362
    pub(crate) fn new<DB: Schema, S: Into<Cow<'static, str>> + Send>(
129
142362
        name: S,
130
142362
        context: Context,
131
142362
        storage: &Storage,
132
142362
    ) -> Result<Self, Error> {
133
142362
        let name = name.into();
134
142362
        let schema = Arc::new(DB::schematic()?);
135
142362
        let db = Self {
136
142362
            storage: storage.clone(),
137
142362
            data: Arc::new(Data {
138
142362
                name: Arc::new(name),
139
142362
                context,
140
142362
                schema,
141
142362
            }),
142
142362
        };
143
142362

            
144
142362
        if storage.instance.check_view_integrity_on_database_open() {
145
18
            for view in db.data.schema.views() {
146
18
                storage.instance.tasks().spawn_integrity_check(view, &db);
147
18
            }
148
142358
        }
149

            
150
142362
        storage
151
142362
            .instance
152
142362
            .tasks()
153
142362
            .spawn_key_value_expiration_loader(&db);
154
142362

            
155
142362
        Ok(db)
156
142362
    }
157

            
158
    /// Restricts an unauthenticated instance to having `effective_permissions`.
159
    /// Returns `None` if a session has already been established.
160
    #[must_use]
161
    pub fn with_effective_permissions(&self, effective_permissions: Permissions) -> Option<Self> {
162
        self.storage
163
            .with_effective_permissions(effective_permissions)
164
            .map(|storage| Self {
165
                storage,
166
                data: self.data.clone(),
167
            })
168
    }
169

            
170
    /// Creates a `Storage` with a single-database named "default" with its data
171
    /// stored at `path`. This requires exclusive access to the storage location
172
    /// configured. Attempting to open the same path multiple times concurrently
173
    /// will lead to errors.
174
    ///
175
    /// Using this method is perfect if only one database is being used.
176
    /// However, if multiple databases are needed, it is much better to store
177
    /// multiple databases in a single [`Storage`] instance rather than creating
178
    /// multiple independent databases using this method.
179
    ///
180
    /// When opening multiple databases using this function, each database will
181
    /// have its own thread pool, cache, task worker pool, and more. By using a
182
    /// single [`Storage`] instance, BonsaiDb will use less resources and likely
183
    /// perform better.
184
48
    pub fn open<DB: Schema>(configuration: StorageConfiguration) -> Result<Self, Error> {
185
48
        let storage = Storage::open(configuration.with_schema::<DB>()?)?;
186

            
187
48
        Ok(storage.create_database::<DB>("default", true)?)
188
48
    }
189

            
190
    /// Returns the [`Schematic`] for the schema for this database.
191
    #[must_use]
192
4976421
    pub fn schematic(&self) -> &'_ Schematic {
193
4976421
        &self.data.schema
194
4976421
    }
195

            
196
2742293
    pub(crate) fn roots(&self) -> &'_ nebari::Roots<AnyFile> {
197
2742293
        &self.data.context.roots
198
2742293
    }
199

            
200
1079727
    fn for_each_in_view<F: FnMut(ViewEntry) -> Result<(), bonsaidb_core::Error> + Send + Sync>(
201
1079727
        &self,
202
1079727
        view: &dyn view::Serialized,
203
1079727
        key: Option<SerializedQueryKey>,
204
1079727
        order: Sort,
205
1079727
        limit: Option<u32>,
206
1079727
        access_policy: AccessPolicy,
207
1079727
        mut callback: F,
208
1079727
    ) -> Result<(), bonsaidb_core::Error> {
209
1079727
        if matches!(access_policy, AccessPolicy::UpdateBefore) {
210
410589
            self.storage
211
410589
                .instance
212
410589
                .tasks()
213
410589
                .update_view_if_needed(view, self, true)?;
214
669138
        } else if let Some(integrity_check) = self
215
669138
            .storage
216
669138
            .instance
217
669138
            .tasks()
218
669138
            .spawn_integrity_check(view, self)
219
        {
220
1037
            integrity_check
221
1037
                .receive()
222
1037
                .map_err(Error::from)?
223
1037
                .map_err(Error::from)?;
224
668101
        }
225

            
226
1079727
        let view_entries = self
227
1079727
            .roots()
228
1079727
            .tree(self.collection_tree(
229
1079727
                &view.collection(),
230
1079727
                view_entries_tree_name(&view.view_name()),
231
1079727
            )?)
232
1079727
            .map_err(Error::from)?;
233

            
234
        {
235
1079727
            for entry in Self::create_view_iterator(&view_entries, key, order, limit)? {
236
915429
                callback(entry)?;
237
            }
238
        }
239

            
240
1079727
        if matches!(access_policy, AccessPolicy::UpdateAfter) {
241
152
            let db = self.clone();
242
152
            let view_name = view.view_name();
243
152
            let view = db
244
152
                .data
245
152
                .schema
246
152
                .view_by_name(&view_name)
247
152
                .expect("query made with view that isn't registered with this database");
248
152
            db.storage
249
152
                .instance
250
152
                .tasks()
251
152
                .update_view_if_needed(view, &db, false)?;
252
1079575
        }
253

            
254
1079727
        Ok(())
255
1079727
    }
256

            
257
699846
    fn open_trees_for_transaction(&self, transaction: &Transaction) -> Result<OpenTrees, Error> {
258
699846
        let mut open_trees = OpenTrees::default();
259
1776264
        for op in &transaction.operations {
260
1076570
            if self
261
1076570
                .data
262
1076570
                .schema
263
1076570
                .collection_primary_key_description(&op.collection)
264
1076570
                .is_none()
265
            {
266
152
                return Err(Error::Core(bonsaidb_core::Error::CollectionNotFound));
267
1076418
            }
268

            
269
            #[cfg(any(feature = "encryption", feature = "compression"))]
270
1076418
            let vault = if let Some(encryption_key) =
271
1076418
                self.collection_encryption_key(&op.collection).cloned()
272
            {
273
                #[cfg(feature = "encryption")]
274
4987
                if let Some(mut vault) = self.storage().tree_vault().cloned() {
275
2877
                    vault.key = Some(encryption_key);
276
2877
                    Some(vault)
277
                } else {
278
2110
                    TreeVault::new_if_needed(
279
2110
                        Some(encryption_key),
280
2110
                        self.storage().vault(),
281
2110
                        #[cfg(feature = "compression")]
282
2110
                        None,
283
2110
                    )
284
                }
285

            
286
                #[cfg(not(feature = "encryption"))]
287
                {
288
                    drop(encryption_key);
289
                    return Err(Error::EncryptionDisabled);
290
                }
291
            } else {
292
1071431
                self.storage().tree_vault().cloned()
293
            };
294

            
295
1076418
            open_trees.open_trees_for_document_change(
296
1076418
                &op.collection,
297
1076418
                &self.data.schema,
298
1076418
                #[cfg(any(feature = "encryption", feature = "compression"))]
299
1076418
                vault,
300
1076418
            );
301
        }
302

            
303
699694
        Ok(open_trees)
304
699846
    }
305

            
306
699846
    fn apply_transaction_to_roots(
307
699846
        &self,
308
699846
        transaction: &Transaction,
309
699846
    ) -> Result<Vec<OperationResult>, Error> {
310
699846
        let open_trees = self.open_trees_for_transaction(transaction)?;
311

            
312
699694
        let mut roots_transaction = self
313
699694
            .data
314
699694
            .context
315
699694
            .roots
316
699694
            .transaction::<_, dyn AnyTreeRoot<AnyFile>>(&open_trees.trees)?;
317

            
318
699694
        let mut results = Vec::new();
319
699694
        let mut changed_documents = Vec::new();
320
699694
        let mut collection_indexes = HashMap::new();
321
699694
        let mut collections = Vec::new();
322
1755752
        for op in &transaction.operations {
323
1076114
            let result = self.execute_operation(
324
1076114
                op,
325
1076114
                &mut roots_transaction,
326
1076114
                &open_trees.trees_index_by_name,
327
1076114
            )?;
328

            
329
1056058
            if let Some((collection, id, deleted)) = match &result {
330
993077
                OperationResult::DocumentUpdated { header, collection } => {
331
993077
                    Some((collection, header.id.clone(), false))
332
                }
333
61789
                OperationResult::DocumentDeleted { id, collection } => {
334
61789
                    Some((collection, id.clone(), true))
335
                }
336
1192
                OperationResult::Success => None,
337
1054866
            } {
338
1054866
                let collection = match collection_indexes.get(collection) {
339
372972
                    Some(index) => *index,
340
                    None => {
341
681894
                        if let Ok(id) = u16::try_from(collections.len()) {
342
681894
                            collection_indexes.insert(collection.clone(), id);
343
681894
                            collections.push(collection.clone());
344
681894
                            id
345
                        } else {
346
                            return Err(Error::TransactionTooLarge);
347
                        }
348
                    }
349
                };
350
1054866
                changed_documents.push(ChangedDocument {
351
1054866
                    collection,
352
1054866
                    id,
353
1054866
                    deleted,
354
1054866
                });
355
1192
            }
356
1056058
            results.push(result);
357
        }
358

            
359
679638
        self.invalidate_changed_documents(
360
679638
            &mut roots_transaction,
361
679638
            &open_trees,
362
679638
            &collections,
363
679638
            &changed_documents,
364
679638
        )?;
365

            
366
679638
        roots_transaction
367
679638
            .entry_mut()
368
679638
            .set_data(compat::serialize_executed_transaction_changes(
369
679638
                &Changes::Documents(DocumentChanges {
370
679638
                    collections,
371
679638
                    documents: changed_documents,
372
679638
                }),
373
679638
            )?)?;
374

            
375
679638
        roots_transaction.commit()?;
376

            
377
679638
        Ok(results)
378
699846
    }
379

            
380
    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
381
679638
    fn invalidate_changed_documents(
382
679638
        &self,
383
679638
        roots_transaction: &mut ExecutingTransaction<AnyFile>,
384
679638
        open_trees: &OpenTrees,
385
679638
        collections: &[CollectionName],
386
679638
        changed_documents: &[ChangedDocument],
387
679638
    ) -> Result<(), Error> {
388
681895
        for (collection, changed_documents) in &changed_documents
389
679638
            .iter()
390
1054866
            .group_by(|doc| &collections[usize::from(doc.collection)])
391
        {
392
681894
            let mut views = self
393
681894
                .data
394
681894
                .schema
395
681894
                .views_in_collection(collection)
396
1941132
                .filter(|view| !view.update_policy().is_eager())
397
681894
                .peekable();
398
681894
            if views.peek().is_some() {
399
338857
                let changed_documents = changed_documents.collect::<Vec<_>>();
400
1910368
                for view in views {
401
1571511
                    let view_name = view.view_name();
402
1571511
                    let tree_name = view_invalidated_docs_tree_name(&view_name);
403
3343178
                    for changed_document in &changed_documents {
404
1771667
                        let mut invalidated_docs = roots_transaction
405
1771667
                            .tree::<Unversioned>(open_trees.trees_index_by_name[&tree_name])
406
1771667
                            .unwrap();
407
1771667
                        invalidated_docs.set(changed_document.id.as_ref().to_vec(), b"")?;
408
                    }
409
                }
410
343037
            }
411
        }
412
679638
        Ok(())
413
679638
    }
414

            
415
1076114
    fn execute_operation(
416
1076114
        &self,
417
1076114
        operation: &Operation,
418
1076114
        transaction: &mut ExecutingTransaction<AnyFile>,
419
1076114
        tree_index_map: &HashMap<String, usize>,
420
1076114
    ) -> Result<OperationResult, Error> {
421
1076114
        match &operation.command {
422
796283
            Command::Insert { id, contents } => {
423
796283
                self.execute_insert(operation, transaction, tree_index_map, id.clone(), contents)
424
            }
425
215938
            Command::Update { header, contents } => self.execute_update(
426
215938
                operation,
427
215938
                transaction,
428
215938
                tree_index_map,
429
215938
                &header.id,
430
215938
                Some(&header.revision),
431
215938
                contents,
432
215938
            ),
433
608
            Command::Overwrite { id, contents } => {
434
608
                self.execute_update(operation, transaction, tree_index_map, id, None, contents)
435
            }
436
61789
            Command::Delete { header } => {
437
61789
                self.execute_delete(operation, transaction, tree_index_map, header)
438
            }
439
1496
            Command::Check { id, revision } => Self::execute_check(
440
1496
                operation,
441
1496
                transaction,
442
1496
                tree_index_map,
443
1496
                id.clone(),
444
1496
                *revision,
445
1496
            ),
446
        }
447
1076114
    }
448

            
449
    #[cfg_attr(
450
        feature = "tracing",
451
        tracing::instrument(
452
            level = "trace",
453
            skip(self, operation, transaction, tree_index_map, contents),
454
            fields(
455
                database = self.name(),
456
                collection.name = operation.collection.name.as_ref(),
457
                collection.authority = operation.collection.authority.as_ref()
458
            )
459
        )
460
    )]
461
216546
    fn execute_update(
462
216546
        &self,
463
216546
        operation: &Operation,
464
216546
        transaction: &mut ExecutingTransaction<AnyFile>,
465
216546
        tree_index_map: &HashMap<String, usize>,
466
216546
        id: &DocumentId,
467
216546
        check_revision: Option<&Revision>,
468
216546
        contents: &[u8],
469
216546
    ) -> Result<OperationResult, crate::Error> {
470
216546
        let mut documents = transaction
471
216546
            .tree::<Versioned>(tree_index_map[&document_tree_name(&operation.collection)])
472
216546
            .unwrap();
473
216546
        let document_id = ArcBytes::from(id.to_vec());
474
216546
        let mut result = None;
475
216546
        let mut updated = false;
476
216546
        documents.modify(
477
216546
            vec![document_id.clone()],
478
216546
            nebari::tree::Operation::CompareSwap(CompareSwap::new(&mut |_key,
479
                                                                        value: Option<
480
                ArcBytes<'_>,
481
            >| {
482
216546
                if let Some(old) = value {
483
216242
                    let doc = match deserialize_document(&old) {
484
216242
                        Ok(doc) => doc,
485
                        Err(err) => {
486
                            result = Some(Err(err));
487
                            return nebari::tree::KeyOperation::Skip;
488
                        }
489
                    };
490
216242
                    if check_revision.is_none() || Some(&doc.header.revision) == check_revision {
491
216090
                        if let Some(updated_revision) = doc.header.revision.next_revision(contents)
492
                        {
493
215786
                            let updated_header = Header {
494
215786
                                id: id.clone(),
495
215786
                                revision: updated_revision,
496
215786
                            };
497
215786
                            let serialized_doc = match serialize_document(&BorrowedDocument {
498
215786
                                header: updated_header.clone(),
499
215786
                                contents: CowBytes::from(contents),
500
215786
                            }) {
501
215786
                                Ok(bytes) => bytes,
502
                                Err(err) => {
503
                                    result = Some(Err(Error::from(err)));
504
                                    return nebari::tree::KeyOperation::Skip;
505
                                }
506
                            };
507
215786
                            result = Some(Ok(OperationResult::DocumentUpdated {
508
215786
                                collection: operation.collection.clone(),
509
215786
                                header: updated_header,
510
215786
                            }));
511
215786
                            updated = true;
512
215786
                            return nebari::tree::KeyOperation::Set(ArcBytes::from(serialized_doc));
513
304
                        }
514
304

            
515
304
                        // If no new revision was made, it means an attempt to
516
304
                        // save a document with the same contents was made.
517
304
                        // We'll return a success but not actually give a new
518
304
                        // version
519
304
                        result = Some(Ok(OperationResult::DocumentUpdated {
520
304
                            collection: operation.collection.clone(),
521
304
                            header: doc.header,
522
304
                        }));
523
152
                    } else {
524
152
                        result = Some(Err(Error::Core(bonsaidb_core::Error::DocumentConflict(
525
152
                            operation.collection.clone(),
526
152
                            Box::new(doc.header),
527
152
                        ))));
528
152
                    }
529
304
                } else if check_revision.is_none() {
530
152
                    let doc = BorrowedDocument::new(id.clone(), contents);
531
152
                    match serialize_document(&doc).map(|bytes| (doc, bytes)) {
532
152
                        Ok((doc, serialized)) => {
533
152
                            result = Some(Ok(OperationResult::DocumentUpdated {
534
152
                                collection: operation.collection.clone(),
535
152
                                header: doc.header,
536
152
                            }));
537
152
                            updated = true;
538
152
                            return nebari::tree::KeyOperation::Set(ArcBytes::from(serialized));
539
                        }
540
                        Err(err) => {
541
                            result = Some(Err(Error::from(err)));
542
                        }
543
                    }
544
152
                } else {
545
152
                    result = Some(Err(Error::Core(bonsaidb_core::Error::DocumentNotFound(
546
152
                        operation.collection.clone(),
547
152
                        Box::new(id.clone()),
548
152
                    ))));
549
152
                }
550
608
                nebari::tree::KeyOperation::Skip
551
216546
            })),
552
216546
        )?;
553
216546
        drop(documents);
554
216546

            
555
216546
        if updated {
556
215938
            self.update_eager_views(&document_id, operation, transaction, tree_index_map)?;
557
608
        }
558

            
559
216242
        result.expect("nebari should invoke the callback even when the key isn't found")
560
216546
    }
561

            
562
    #[cfg_attr(
563
        feature = "tracing",
564
        tracing::instrument(
565
            level = "trace",
566
            skip(self, operation, transaction, tree_index_map, contents),
567
            fields(
568
                database = self.name(),
569
                collection.name = operation.collection.name.as_ref(),
570
                collection.authority = operation.collection.authority.as_ref()
571
            )
572
        )
573
    )]
574
796283
    fn execute_insert(
575
796283
        &self,
576
796283
        operation: &Operation,
577
796283
        transaction: &mut ExecutingTransaction<AnyFile>,
578
796283
        tree_index_map: &HashMap<String, usize>,
579
796283
        id: Option<DocumentId>,
580
796283
        contents: &[u8],
581
796283
    ) -> Result<OperationResult, Error> {
582
796283
        let mut documents = transaction
583
796283
            .tree::<Versioned>(tree_index_map[&document_tree_name(&operation.collection)])
584
796283
            .unwrap();
585
796283
        let id = if let Some(id) = id {
586
357450
            id
587
438833
        } else if let Some(last_key) = documents.last_key()? {
588
406626
            let id = DocumentId::try_from(last_key.as_slice())?;
589
406626
            self.data
590
406626
                .schema
591
406626
                .next_id_for_collection(&operation.collection, Some(id))?
592
        } else {
593
32207
            self.data
594
32207
                .schema
595
32207
                .next_id_for_collection(&operation.collection, None)?
596
        };
597

            
598
796246
        let doc = BorrowedDocument::new(id, contents);
599
796246
        let serialized: Vec<u8> = serialize_document(&doc)?;
600
796246
        let document_id = ArcBytes::from(doc.header.id.as_ref().to_vec());
601
796246
        if let Some(document) = documents.replace(document_id.clone(), serialized)? {
602
18690
            let doc = deserialize_document(&document)?;
603
18690
            Err(Error::Core(bonsaidb_core::Error::DocumentConflict(
604
18690
                operation.collection.clone(),
605
18690
                Box::new(doc.header),
606
18690
            )))
607
        } else {
608
777556
            drop(documents);
609
777556
            self.update_eager_views(&document_id, operation, transaction, tree_index_map)?;
610

            
611
777139
            Ok(OperationResult::DocumentUpdated {
612
777139
                collection: operation.collection.clone(),
613
777139
                header: doc.header,
614
777139
            })
615
        }
616
796283
    }
617

            
618
    #[cfg_attr(feature = "tracing", tracing::instrument(
619
        level = "trace",
620
        skip(self, operation, transaction, tree_index_map),
621
        fields(
622
            database = self.name(),
623
            collection.name = operation.collection.name.as_ref(),
624
            collection.authority = operation.collection.authority.as_ref()
625
        )
626
    ))]
627
61789
    fn execute_delete(
628
61789
        &self,
629
61789
        operation: &Operation,
630
61789
        transaction: &mut ExecutingTransaction<AnyFile>,
631
61789
        tree_index_map: &HashMap<String, usize>,
632
61789
        header: &Header,
633
61789
    ) -> Result<OperationResult, Error> {
634
61789
        let mut documents = transaction
635
61789
            .tree::<Versioned>(tree_index_map[&document_tree_name(&operation.collection)])
636
61789
            .unwrap();
637
61789
        if let Some(vec) = documents.remove(header.id.as_ref())? {
638
61789
            drop(documents);
639
61789
            let doc = deserialize_document(&vec)?;
640
61789
            if &doc.header == header {
641
61789
                self.update_eager_views(
642
61789
                    &ArcBytes::from(doc.header.id.to_vec()),
643
61789
                    operation,
644
61789
                    transaction,
645
61789
                    tree_index_map,
646
61789
                )?;
647

            
648
61789
                Ok(OperationResult::DocumentDeleted {
649
61789
                    collection: operation.collection.clone(),
650
61789
                    id: header.id.clone(),
651
61789
                })
652
            } else {
653
                Err(Error::Core(bonsaidb_core::Error::DocumentConflict(
654
                    operation.collection.clone(),
655
                    Box::new(header.clone()),
656
                )))
657
            }
658
        } else {
659
            Err(Error::Core(bonsaidb_core::Error::DocumentNotFound(
660
                operation.collection.clone(),
661
                Box::new(header.id.clone()),
662
            )))
663
        }
664
61789
    }
665

            
666
    #[cfg_attr(feature = "tracing", tracing::instrument(
667
        level = "trace",
668
        skip(self, operation, transaction, tree_index_map),
669
        fields(
670
            database = self.name(),
671
            collection.name = operation.collection.name.as_ref(),
672
            collection.authority = operation.collection.authority.as_ref()
673
        )
674
    ))]
675
1055283
    fn update_eager_views(
676
1055283
        &self,
677
1055283
        document_id: &ArcBytes<'static>,
678
1055283
        operation: &Operation,
679
1055283
        transaction: &mut ExecutingTransaction<AnyFile>,
680
1055283
        tree_index_map: &HashMap<String, usize>,
681
1055283
    ) -> Result<(), Error> {
682
1055283
        let mut eager_views = self
683
1055283
            .data
684
1055283
            .schema
685
1055283
            .eager_views_in_collection(&operation.collection)
686
1055283
            .peekable();
687
1055283
        if eager_views.peek().is_some() {
688
438463
            let documents = transaction
689
438463
                .unlocked_tree(tree_index_map[&document_tree_name(&operation.collection)])
690
438463
                .unwrap();
691
876205
            for view in eager_views {
692
438463
                let name = view.view_name();
693
438463
                let document_map = transaction
694
438463
                    .unlocked_tree(tree_index_map[&view_document_map_tree_name(&name)])
695
438463
                    .unwrap();
696
438463
                let view_entries = transaction
697
438463
                    .unlocked_tree(tree_index_map[&view_entries_tree_name(&name)])
698
438463
                    .unwrap();
699
438463
                mapper::DocumentRequest {
700
438463
                    database: self,
701
438463
                    document_ids: vec![document_id.clone()],
702
438463
                    map_request: &mapper::Map {
703
438463
                        database: self.data.name.clone(),
704
438463
                        collection: operation.collection.clone(),
705
438463
                        view_name: name.clone(),
706
438463
                    },
707
438463
                    document_map,
708
438463
                    documents,
709
438463
                    view_entries,
710
438463
                    view,
711
438463
                }
712
438463
                .map()?;
713
            }
714
616820
        }
715

            
716
1054562
        Ok(())
717
1055283
    }
718

            
719
    #[cfg_attr(feature = "tracing", tracing::instrument(
720
        level = "trace",
721
        skip(operation, transaction, tree_index_map),
722
        fields(
723
            collection.name = operation.collection.name.as_ref(),
724
            collection.authority = operation.collection.authority.as_ref(),
725
        ),
726
    ))]
727
1496
    fn execute_check(
728
1496
        operation: &Operation,
729
1496
        transaction: &mut ExecutingTransaction<AnyFile>,
730
1496
        tree_index_map: &HashMap<String, usize>,
731
1496
        id: DocumentId,
732
1496
        revision: Option<Revision>,
733
1496
    ) -> Result<OperationResult, Error> {
734
1496
        let mut documents = transaction
735
1496
            .tree::<Versioned>(tree_index_map[&document_tree_name(&operation.collection)])
736
1496
            .unwrap();
737
1496
        if let Some(vec) = documents.get(id.as_ref())? {
738
1344
            drop(documents);
739

            
740
1344
            if let Some(revision) = revision {
741
304
                let doc = deserialize_document(&vec)?;
742
304
                if doc.header.revision != revision {
743
152
                    return Err(Error::Core(bonsaidb_core::Error::DocumentConflict(
744
152
                        operation.collection.clone(),
745
152
                        Box::new(Header { id, revision }),
746
152
                    )));
747
152
                }
748
1040
            }
749

            
750
1192
            Ok(OperationResult::Success)
751
        } else {
752
152
            Err(Error::Core(bonsaidb_core::Error::DocumentNotFound(
753
152
                operation.collection.clone(),
754
152
                Box::new(id),
755
152
            )))
756
        }
757
1496
    }
758

            
759
1079727
    fn create_view_iterator(
760
1079727
        view_entries: &Tree<Unversioned, AnyFile>,
761
1079727
        key: Option<SerializedQueryKey>,
762
1079727
        order: Sort,
763
1079727
        limit: Option<u32>,
764
1079727
    ) -> Result<Vec<ViewEntry>, Error> {
765
1079727
        let mut values = Vec::new();
766
1079727
        let forwards = match order {
767
1079575
            Sort::Ascending => true,
768
152
            Sort::Descending => false,
769
        };
770
1079727
        let mut values_read = 0;
771
1079727
        if let Some(key) = key {
772
1072595
            match key {
773
1451
                SerializedQueryKey::Range(range) => {
774
1451
                    view_entries.scan::<Infallible, _, _, _, _>(
775
2828
                        &range.map_ref(|bytes| &bytes[..]),
776
1451
                        forwards,
777
1451
                        |_, _, _| ScanEvaluation::ReadData,
778
1451
                        |_, _| {
779
3531
                            if let Some(limit) = limit {
780
304
                                if values_read >= limit {
781
152
                                    return ScanEvaluation::Stop;
782
152
                                }
783
152
                                values_read += 1;
784
3227
                            }
785
3379
                            ScanEvaluation::ReadData
786
3531
                        },
787
3379
                        |_key, _index, value| {
788
3379
                            values.push(value);
789
3379
                            Ok(())
790
3379
                        },
791
1451
                    )?;
792
                }
793
1070396
                SerializedQueryKey::Matches(key) => {
794
1070396
                    values.extend(view_entries.get(&key)?);
795
                }
796
748
                SerializedQueryKey::Multiple(mut list) => {
797
748
                    list.sort();
798
748

            
799
748
                    values.extend(
800
748
                        view_entries
801
1200
                            .get_multiple(list.iter().map(|bytes| bytes.as_slice()))?
802
748
                            .into_iter()
803
1200
                            .map(|(_, value)| value),
804
                    );
805
                }
806
            }
807
        } else {
808
7132
            view_entries.scan::<Infallible, _, _, _, _>(
809
7132
                &(..),
810
7132
                forwards,
811
7132
                |_, _, _| ScanEvaluation::ReadData,
812
7132
                |_, _| {
813
8034
                    if let Some(limit) = limit {
814
                        if values_read >= limit {
815
                            return ScanEvaluation::Stop;
816
                        }
817
                        values_read += 1;
818
8034
                    }
819
8034
                    ScanEvaluation::ReadData
820
8131
                },
821
8131
                |_, _, value| {
822
8034
                    values.push(value);
823
8034
                    Ok(())
824
8131
                },
825
7132
            )?;
826
        }
827

            
828
1079727
        values
829
1079727
            .into_iter()
830
1079727
            .map(|value| bincode::deserialize(&value).map_err(Error::from))
831
1079727
            .collect::<Result<Vec<_>, Error>>()
832
1079727
    }
833

            
834
    #[cfg(any(feature = "encryption", feature = "compression"))]
835
4051890
    pub(crate) fn collection_encryption_key(&self, collection: &CollectionName) -> Option<&KeyId> {
836
4051890
        self.schematic()
837
4051890
            .encryption_key_for_collection(collection)
838
4051890
            .or_else(|| self.storage.default_encryption_key())
839
4051890
    }
840

            
841
    #[cfg_attr(
842
        not(feature = "encryption"),
843
        allow(
844
            unused_mut,
845
            unused_variables,
846
            clippy::unused_self,
847
            clippy::let_and_return
848
        )
849
    )]
850
    #[allow(clippy::unnecessary_wraps)]
851
2975472
    pub(crate) fn collection_tree<R: Root, S: Into<Cow<'static, str>>>(
852
2975472
        &self,
853
2975472
        collection: &CollectionName,
854
2975472
        name: S,
855
2975472
    ) -> Result<TreeRoot<R, AnyFile>, Error> {
856
2975472
        let mut tree = R::tree(name);
857
2975472

            
858
2975472
        #[cfg(any(feature = "encryption", feature = "compression"))]
859
2975472
        match (
860
2975472
            self.collection_encryption_key(collection),
861
2975472
            self.storage().tree_vault().cloned(),
862
        ) {
863
43535
            (Some(override_key), Some(mut vault)) => {
864
43535
                #[cfg(feature = "encryption")]
865
43535
                {
866
43535
                    vault.key = Some(override_key.clone());
867
43535
                    tree = tree.with_vault(vault);
868
43535
                }
869

            
870
                #[cfg(not(feature = "encryption"))]
871
                {
872
                    return Err(Error::EncryptionDisabled);
873
                }
874
            }
875
717754
            (None, Some(vault)) => {
876
717754
                tree = tree.with_vault(vault);
877
717754
            }
878
2214183
            (key, None) => {
879
                #[cfg(feature = "encryption")]
880
2214183
                if let Some(vault) = TreeVault::new_if_needed(
881
2214183
                    key.cloned(),
882
2214183
                    self.storage().vault(),
883
2214183
                    #[cfg(feature = "compression")]
884
2214183
                    None,
885
2214183
                ) {
886
46660
                    tree = tree.with_vault(vault);
887
2167523
                }
888

            
889
                #[cfg(not(feature = "encryption"))]
890
                if key.is_some() {
891
                    return Err(Error::EncryptionDisabled);
892
                }
893
            }
894
        }
895

            
896
2975472
        Ok(tree)
897
2975472
    }
898

            
899
1
    pub(crate) fn update_key_expiration<'key>(
900
1
        &self,
901
1
        tree_key: impl Into<Cow<'key, str>>,
902
1
        expiration: Option<Timestamp>,
903
1
    ) {
904
1
        self.data
905
1
            .context
906
1
            .update_key_expiration(tree_key, expiration);
907
1
    }
908

            
909
    /// Converts this instance into its blocking version, which is able to be
910
    /// used without async. The returned instance uses the current Tokio runtime
911
    /// handle to spawn blocking tasks.
912
    ///
913
    /// # Panics
914
    ///
915
    /// Panics if called outside the context of a Tokio runtime.
916
    #[cfg(feature = "async")]
917
    #[must_use]
918
2653903
    pub fn into_async(self) -> crate::AsyncDatabase {
919
2653903
        self.into_async_with_runtime(tokio::runtime::Handle::current())
920
2653903
    }
921

            
922
    /// Converts this instance into its blocking version, which is able to be
923
    /// used without async. The returned instance uses the provided runtime
924
    /// handle to spawn blocking tasks.
925
    #[cfg(feature = "async")]
926
    #[must_use]
927
2653903
    pub fn into_async_with_runtime(self, runtime: tokio::runtime::Handle) -> crate::AsyncDatabase {
928
2653903
        crate::AsyncDatabase {
929
2653903
            database: self,
930
2653903
            runtime: Arc::new(runtime),
931
2653903
        }
932
2653903
    }
933

            
934
    /// Converts this instance into its blocking version, which is able to be
935
    /// used without async. The returned instance uses the current Tokio runtime
936
    /// handle to spawn blocking tasks.
937
    ///
938
    /// # Panics
939
    ///
940
    /// Panics if called outside the context of a Tokio runtime.
941
    #[cfg(feature = "async")]
942
    #[must_use]
943
    pub fn to_async(&self) -> crate::AsyncDatabase {
944
        self.clone().into_async()
945
    }
946

            
947
    /// Converts this instance into its blocking version, which is able to be
948
    /// used without async. The returned instance uses the provided runtime
949
    /// handle to spawn blocking tasks.
950
    #[cfg(feature = "async")]
951
    #[must_use]
952
    pub fn to_async_with_runtime(&self, runtime: tokio::runtime::Handle) -> crate::AsyncDatabase {
953
        self.clone().into_async_with_runtime(runtime)
954
    }
955
}
956
59
#[derive(Serialize, Deserialize)]
957
struct LegacyHeader {
958
    id: u64,
959
    revision: Revision,
960
}
961
59
#[derive(Serialize, Deserialize)]
962
struct LegacyDocument<'a> {
963
    header: LegacyHeader,
964
    #[serde(borrow)]
965
    contents: &'a [u8],
966
}
967

            
968
1717298
pub(crate) fn deserialize_document(bytes: &[u8]) -> Result<BorrowedDocument<'_>, Error> {
969
1717298
    match pot::from_slice::<BorrowedDocument<'_>>(bytes) {
970
1717239
        Ok(document) => Ok(document),
971
59
        Err(err) => match bincode::deserialize::<LegacyDocument<'_>>(bytes) {
972
59
            Ok(legacy_doc) => Ok(BorrowedDocument {
973
59
                header: Header {
974
59
                    id: DocumentId::from_u64(legacy_doc.header.id),
975
59
                    revision: legacy_doc.header.revision,
976
59
                },
977
59
                contents: CowBytes::from(legacy_doc.contents),
978
59
            }),
979
            Err(_) => Err(Error::from(err)),
980
        },
981
    }
982
1717298
}
983

            
984
1012184
fn serialize_document(document: &BorrowedDocument<'_>) -> Result<Vec<u8>, bonsaidb_core::Error> {
985
1012184
    pot::to_vec(document)
986
1012184
        .map_err(Error::from)
987
1012184
        .map_err(bonsaidb_core::Error::from)
988
1012184
}
989

            
990
impl HasSession for Database {
991
5326983
    fn session(&self) -> Option<&Session> {
992
5326983
        self.storage.session()
993
5326983
    }
994
}
995

            
996
impl Connection for Database {
997
    type Storage = Storage;
998

            
999
6758786
    fn storage(&self) -> Self::Storage {
6758786
        self.storage.clone()
6758786
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(
        level = "trace",
        skip(self),
        fields(
            database = self.name(),
        )
    ))]
    fn list_executed_transactions(
        &self,
        starting_id: Option<u64>,
        result_limit: Option<u32>,
    ) -> Result<Vec<transaction::Executed>, bonsaidb_core::Error> {
39822
        self.check_permission(
39822
            database_resource_name(self.name()),
39822
            &BonsaiAction::Database(DatabaseAction::Transaction(TransactionAction::ListExecuted)),
39822
        )?;
39822
        let result_limit = usize::try_from(
39822
            result_limit
39822
                .unwrap_or(LIST_TRANSACTIONS_DEFAULT_RESULT_COUNT)
39822
                .min(LIST_TRANSACTIONS_MAX_RESULTS),
39822
        )
39822
        .unwrap();
39822
        if result_limit > 0 {
39670
            let range = if let Some(starting_id) = starting_id {
20361
                Range::from(starting_id..)
            } else {
19309
                Range::from(..)
            };

            
39670
            let mut entries = Vec::new();
39670
            self.roots()
39670
                .transactions()
377498
                .scan(range, |entry| {
377498
                    if entry.data().is_some() {
376425
                        entries.push(entry);
376425
                    }
377498
                    entries.len() < result_limit
377498
                })
39670
                .map_err(Error::from)?;

            
39670
            entries
39670
                .into_iter()
39670
                .map(|entry| {
376425
                    if let Some(data) = entry.data() {
376425
                        let changes = compat::deserialize_executed_transaction_changes(data)?;
376425
                        Ok(Some(transaction::Executed {
376425
                            id: entry.id,
376425
                            changes,
376425
                        }))
                    } else {
                        Ok(None)
                    }
376425
                })
39670
                .filter_map(Result::transpose)
39670
                .collect::<Result<Vec<_>, Error>>()
39670
                .map_err(bonsaidb_core::Error::from)
        } else {
            // A request was made to return an empty result? This should probably be
            // an error, but technically this is a correct response.
152
            Ok(Vec::default())
        }
39822
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(
        level = "trace",
        skip(self),
        fields(
            database = self.name(),
        )
    ))]
    fn last_transaction_id(&self) -> Result<Option<u64>, bonsaidb_core::Error> {
631795
        self.check_permission(
631795
            database_resource_name(self.name()),
631795
            &BonsaiAction::Database(DatabaseAction::Transaction(TransactionAction::GetLastId)),
631795
        )?;
631795
        Ok(self.roots().transactions().current_transaction_id())
631795
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(
        level = "trace",
        skip(self),
        fields(
            database = self.name(),
        )
    ))]
    fn compact(&self) -> Result<(), bonsaidb_core::Error> {
152
        self.check_permission(
152
            database_resource_name(self.name()),
152
            &BonsaiAction::Database(DatabaseAction::Compact),
152
        )?;
152
        self.storage()
152
            .instance
152
            .tasks()
152
            .compact_database(self.clone())?;
152
        Ok(())
152
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(
        level = "trace",
        skip(self),
        fields(
            database = self.name(),
        )
    ))]
    fn compact_key_value_store(&self) -> Result<(), bonsaidb_core::Error> {
152
        self.check_permission(
152
            kv_resource_name(self.name()),
152
            &BonsaiAction::Database(DatabaseAction::Compact),
152
        )?;
152
        self.storage()
152
            .instance
152
            .tasks()
152
            .compact_key_value_store(self.clone())?;
152
        Ok(())
152
    }
}

            
impl LowLevelConnection for Database {
    #[cfg_attr(feature = "tracing", tracing::instrument(
        level = "trace",
        skip(self,  transaction),
        fields(
            database = self.name(),
        )
    ))]
699994
    fn apply_transaction(
699994
        &self,
699994
        transaction: Transaction,
699994
    ) -> Result<Vec<OperationResult>, bonsaidb_core::Error> {
1776564
        for op in &transaction.operations {
1076718
            let (resource, action) = match &op.command {
796509
                Command::Insert { .. } => (
796509
                    collection_resource_name(self.name(), &op.collection),
796509
                    BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Insert)),
796509
                ),
215938
                Command::Update { header, .. } => (
215938
                    document_resource_name(self.name(), &op.collection, &header.id),
215938
                    BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Update)),
215938
                ),
608
                Command::Overwrite { id, .. } => (
608
                    document_resource_name(self.name(), &op.collection, id),
608
                    BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Overwrite)),
608
                ),
61863
                Command::Delete { header } => (
61863
                    document_resource_name(self.name(), &op.collection, &header.id),
61863
                    BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Delete)),
61863
                ),
1800
                Command::Check { id, .. } => (
1800
                    document_resource_name(self.name(), &op.collection, id),
1800
                    BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Get)),
1800
                ),
            };
1076718
            self.check_permission(resource, &action)?;
        }

            
699846
        let mut eager_view_tasks = Vec::new();
702991
        for collection_name in transaction
699846
            .operations
699846
            .iter()
1076570
            .map(|op| &op.collection)
699846
            .collect::<HashSet<_>>()
        {
702990
            for view in self.data.schema.eager_views_in_collection(collection_name) {
390528
                if let Some(task) = self
390528
                    .storage
390528
                    .instance
390528
                    .tasks()
390528
                    .spawn_integrity_check(view, self)
6335
                {
6335
                    eager_view_tasks.push(task);
384193
                }
            }
        }

            
699846
        let mut eager_view_mapping_tasks = Vec::new();
706181
        for task in eager_view_tasks {
6335
            if let Some(spawned_task) = task.receive().map_err(Error::from)?.map_err(Error::from)? {
6185
                eager_view_mapping_tasks.push(spawned_task);
6185
            }
        }

            
706031
        for task in eager_view_mapping_tasks {
6185
            let mut task = task.lock();
6185
            if let Some(task) = task.take() {
4834
                task.receive().map_err(Error::from)?.map_err(Error::from)?;
1351
            }
        }

            
699846
        self.apply_transaction_to_roots(&transaction)
699846
            .map_err(bonsaidb_core::Error::from)
699994
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(
        level = "trace",
        skip(self, collection),
        fields(
            database = self.name(),
            collection.name = collection.name.as_ref(),
            collection.authority = collection.authority.as_ref(),
        )
    ))]
    fn get_from_collection(
        &self,
        id: DocumentId,
        collection: &CollectionName,
    ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error> {
595642
        self.check_permission(
595642
            document_resource_name(self.name(), collection, &id),
595642
            &BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Get)),
595642
        )?;
595642
        let tree = self
595642
            .data
595642
            .context
595642
            .roots
595642
            .tree(self.collection_tree::<Versioned, _>(collection, document_tree_name(collection))?)
595642
            .map_err(Error::from)?;
595642
        if let Some(vec) = tree.get(id.as_ref()).map_err(Error::from)? {
576574
            Ok(Some(deserialize_document(&vec)?.into_owned()))
        } else {
19067
            Ok(None)
        }
595642
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(
        level = "trace",
        skip(self, collection),
        fields(
            database = self.name(),
            collection.name = collection.name.as_ref(),
            collection.authority = collection.authority.as_ref(),
        )
    ))]
    fn list_from_collection(
        &self,
        ids: Range<DocumentId>,
        sort: Sort,
        limit: Option<u32>,
        collection: &CollectionName,
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
922
        self.check_permission(
922
            collection_resource_name(self.name(), collection),
922
            &BonsaiAction::Database(DatabaseAction::Document(DocumentAction::List)),
922
        )?;
922
        let tree = self
922
            .data
922
            .context
922
            .roots
922
            .tree(self.collection_tree::<Versioned, _>(collection, document_tree_name(collection))?)
922
            .map_err(Error::from)?;
922
        let mut found_docs = Vec::new();
922
        let mut keys_read = 0;
922
        let ids = DocumentIdRange(ids);
922
        tree.scan(
922
            &ids.borrow_as_bytes(),
922
            match sort {
770
                Sort::Ascending => true,
152
                Sort::Descending => false,
            },
24
            |_, _, _| ScanEvaluation::ReadData,
922
            |_, _| {
1349
                if let Some(limit) = limit {
304
                    if keys_read >= limit {
152
                        return ScanEvaluation::Stop;
152
                    }
152

            
152
                    keys_read += 1;
1045
                }
1197
                ScanEvaluation::ReadData
1349
            },
1197
            |_, _, doc| {
1197
                found_docs.push(
1197
                    deserialize_document(&doc)
1197
                        .map(BorrowedDocument::into_owned)
1197
                        .map_err(AbortError::Other)?,
                );
1197
                Ok(())
1197
            },
        )
922
        .map_err(|err| match err {
            AbortError::Other(err) => err,
            AbortError::Nebari(err) => crate::Error::from(err),
922
        })?;

            
922
        Ok(found_docs)
922
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(
        level = "trace",
        skip(self, collection),
        fields(
            database = self.name(),
            collection.name = collection.name.as_ref(),
            collection.authority = collection.authority.as_ref(),
        )
    ))]
    fn list_headers_from_collection(
        &self,
        ids: Range<DocumentId>,
        sort: Sort,
        limit: Option<u32>,
        collection: &CollectionName,
    ) -> Result<Vec<Header>, bonsaidb_core::Error> {
152
        self.check_permission(
152
            collection_resource_name(self.name(), collection),
152
            &BonsaiAction::Database(DatabaseAction::Document(DocumentAction::ListHeaders)),
152
        )?;
152
        let tree = self
152
            .data
152
            .context
152
            .roots
152
            .tree(self.collection_tree::<Versioned, _>(collection, document_tree_name(collection))?)
152
            .map_err(Error::from)?;
152
        let mut found_headers = Vec::new();
152
        let mut keys_read = 0;
152
        let ids = DocumentIdRange(ids);
152
        tree.scan(
152
            &ids.borrow_as_bytes(),
152
            match sort {
152
                Sort::Ascending => true,
                Sort::Descending => false,
            },
            |_, _, _| ScanEvaluation::ReadData,
152
            |_, _| {
304
                if let Some(limit) = limit {
                    if keys_read >= limit {
                        return ScanEvaluation::Stop;
                    }

            
                    keys_read += 1;
304
                }
304
                ScanEvaluation::ReadData
304
            },
304
            |_, _, doc| {
304
                found_headers.push(
304
                    deserialize_document(&doc)
304
                        .map(|doc| doc.header)
304
                        .map_err(AbortError::Other)?,
                );
304
                Ok(())
304
            },
        )
152
        .map_err(|err| match err {
            AbortError::Other(err) => err,
            AbortError::Nebari(err) => crate::Error::from(err),
152
        })?;

            
152
        Ok(found_headers)
152
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(
        level = "trace",
        skip(self, collection),
        fields(
            database = self.name(),
            collection.name = collection.name.as_ref(),
            collection.authority = collection.authority.as_ref(),
        )
    ))]
    fn count_from_collection(
        &self,
        ids: Range<DocumentId>,
        collection: &CollectionName,
    ) -> Result<u64, bonsaidb_core::Error> {
304
        self.check_permission(
304
            collection_resource_name(self.name(), collection),
304
            &BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Count)),
304
        )?;
304
        let tree = self
304
            .data
304
            .context
304
            .roots
304
            .tree(self.collection_tree::<Versioned, _>(collection, document_tree_name(collection))?)
304
            .map_err(Error::from)?;
304
        let ids = DocumentIdRange(ids);
304
        let stats = tree.reduce(&ids.borrow_as_bytes()).map_err(Error::from)?;

            
304
        Ok(stats.alive_keys)
304
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(
        level = "trace",
        skip(self, collection),
        fields(
            database = self.name(),
            collection.name = collection.name.as_ref(),
            collection.authority = collection.authority.as_ref(),
        )
    ))]
358152
    fn get_multiple_from_collection(
358152
        &self,
358152
        ids: &[DocumentId],
358152
        collection: &CollectionName,
358152
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
715128
        for id in ids {
356976
            self.check_permission(
356976
                document_resource_name(self.name(), collection, id),
356976
                &BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Get)),
356976
            )?;
        }
358152
        let mut ids = ids.to_vec();
358152
        let collection = collection.clone();
358152
        let tree = self
358152
            .data
358152
            .context
358152
            .roots
358152
            .tree(
358152
                self.collection_tree::<Versioned, _>(&collection, document_tree_name(&collection))?,
            )
358152
            .map_err(Error::from)?;
358152
        ids.sort();
358152
        let keys_and_values = tree
358160
            .get_multiple(ids.iter().map(|id| id.as_ref()))
358152
            .map_err(Error::from)?;

            
358152
        keys_and_values
358152
            .into_iter()
358160
            .map(|(_, value)| deserialize_document(&value).map(BorrowedDocument::into_owned))
358152
            .collect::<Result<Vec<_>, Error>>()
358152
            .map_err(bonsaidb_core::Error::from)
358152
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(
        level = "trace",
        skip(self, collection),
        fields(
            database = self.name(),
            collection.name = collection.name.as_ref(),
            collection.authority = collection.authority.as_ref(),
        )
    ))]
    fn compact_collection_by_name(
        &self,
        collection: CollectionName,
    ) -> Result<(), bonsaidb_core::Error> {
152
        self.check_permission(
152
            collection_resource_name(self.name(), &collection),
152
            &BonsaiAction::Database(DatabaseAction::Compact),
152
        )?;
152
        self.storage()
152
            .instance
152
            .tasks()
152
            .compact_collection(self.clone(), collection)?;
152
        Ok(())
152
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(
        level = "trace",
        skip(self, view),
        fields(
            database = self.name(),
            view.collection.name = view.collection.name.as_ref(),
            view.collection.authority = view.collection.authority.as_ref(),
            view.name = view.name.as_ref(),
        )
    ))]
392731
    fn query_by_name(
392731
        &self,
392731
        view: &ViewName,
392731
        key: Option<SerializedQueryKey>,
392731
        order: Sort,
392731
        limit: Option<u32>,
392731
        access_policy: AccessPolicy,
392731
    ) -> Result<Vec<schema::view::map::Serialized>, bonsaidb_core::Error> {
392731
        let view = self.schematic().view_by_name(view)?;
392731
        self.check_permission(
392731
            view_resource_name(self.name(), &view.view_name()),
392731
            &BonsaiAction::Database(DatabaseAction::View(ViewAction::Query)),
392731
        )?;
392731
        let mut results = Vec::new();
392731
        self.for_each_in_view(view, key, order, limit, access_policy, |entry| {
772089
            for mapping in entry.mappings {
390122
                results.push(bonsaidb_core::schema::view::map::Serialized {
390122
                    source: mapping.source,
390122
                    key: entry.key.clone(),
390122
                    value: mapping.value,
390122
                });
390122
            }
381967
            Ok(())
392731
        })?;

            
392731
        Ok(results)
392731
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(
        level = "trace",
        skip(self, view),
        fields(
            database = self.name(),
            view.collection.name = view.collection.name.as_ref(),
            view.collection.authority = view.collection.authority.as_ref(),
            view.name = view.name.as_ref(),
        )
    ))]
    fn query_by_name_with_docs(
        &self,
        view: &ViewName,
        key: Option<SerializedQueryKey>,
        order: Sort,
        limit: Option<u32>,
        access_policy: AccessPolicy,
    ) -> Result<schema::view::map::MappedSerializedDocuments, bonsaidb_core::Error> {
        let results = self.query_by_name(view, key, order, limit, access_policy)?;
        let view = self.schematic().view_by_name(view).unwrap(); // query() will fail if it's not present

            
        let documents = self
            .get_multiple_from_collection(
                &results
                    .iter()
                    .map(|m| m.source.id.clone())
                    .collect::<Vec<_>>(),
                &view.collection(),
            )?
            .into_iter()
            .map(|doc| (doc.header.id.clone(), doc))
            .collect::<BTreeMap<_, _>>();

            
        Ok(
            bonsaidb_core::schema::view::map::MappedSerializedDocuments {
                mappings: results,
                documents,
            },
        )
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(
        level = "trace",
        skip(self, view_name),
        fields(
            database = self.name(),
            view.collection.name = view_name.collection.name.as_ref(),
            view.collection.authority = view_name.collection.authority.as_ref(),
            view.name = view_name.name.as_ref(),
        )
    ))]
686080
    fn reduce_by_name(
686080
        &self,
686080
        view_name: &ViewName,
686080
        key: Option<SerializedQueryKey>,
686080
        access_policy: AccessPolicy,
686080
    ) -> Result<Vec<u8>, bonsaidb_core::Error> {
686080
        let mut mappings = self.reduce_grouped_by_name(view_name, key, access_policy)?;

            
686080
        let result = if mappings.len() == 1 {
525591
            mappings.pop().unwrap().value.into_vec()
        } else {
160489
            let view = self.data.schema.view_by_name(view_name)?;
160489
            view.reduce(
160489
                &mappings
160489
                    .iter()
160489
                    .map(|map| (map.key.as_ref(), map.value.as_ref()))
160489
                    .collect::<Vec<_>>(),
160489
                true,
160489
            )
160489
            .map_err(Error::from)?
        };

            
685928
        Ok(result)
686080
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(
        level = "trace",
        skip(self, view_name),
        fields(
            database = self.name(),
            view.collection.name = view_name.collection.name.as_ref(),
            view.collection.authority = view_name.collection.authority.as_ref(),
            view.name = view_name.name.as_ref(),
        )
    ))]
686470
    fn reduce_grouped_by_name(
686470
        &self,
686470
        view_name: &ViewName,
686470
        key: Option<SerializedQueryKey>,
686470
        access_policy: AccessPolicy,
686470
    ) -> Result<Vec<MappedSerializedValue>, bonsaidb_core::Error> {
686470
        let view = self.data.schema.view_by_name(view_name)?;
686470
        self.check_permission(
686470
            view_resource_name(self.name(), &view.view_name()),
686470
            &BonsaiAction::Database(DatabaseAction::View(ViewAction::Reduce)),
686470
        )?;
686470
        let mut mappings = Vec::new();
686486
        self.for_each_in_view(view, key, Sort::Ascending, None, access_policy, |entry| {
533010
            mappings.push(MappedSerializedValue {
533010
                key: entry.key,
533010
                value: entry.reduced_value,
533010
            });
533010
            Ok(())
686486
        })?;

            
686470
        Ok(mappings)
686470
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(
        level = "trace",
        skip(self, view),
        fields(
            database = self.name(),
            view.collection.name = view.collection.name.as_ref(),
            view.collection.authority = view.collection.authority.as_ref(),
            view.name = view.name.as_ref(),
        )
    ))]
526
    fn delete_docs_by_name(
526
        &self,
526
        view: &ViewName,
526
        key: Option<SerializedQueryKey>,
526
        access_policy: AccessPolicy,
526
    ) -> Result<u64, bonsaidb_core::Error> {
526
        let view = self.data.schema.view_by_name(view)?;
526
        let collection = view.collection();
526
        let mut transaction = Transaction::default();
526
        self.for_each_in_view(view, key, Sort::Ascending, None, access_policy, |entry| {
3502
            for mapping in entry.mappings {
3050
                transaction.push(Operation::delete(collection.clone(), mapping.source));
3050
            }

            
452
            Ok(())
526
        })?;

            
526
        let results = LowLevelConnection::apply_transaction(self, transaction)?;

            
526
        Ok(results.len() as u64)
526
    }
}

            
impl HasSchema for Database {
27758
    fn schematic(&self) -> &Schematic {
27758
        &self.data.schema
27758
    }
}

            
type ViewIterator<'a> =
    Box<dyn Iterator<Item = Result<(ArcBytes<'static>, ArcBytes<'static>), Error>> + 'a>;

            
struct ViewEntryCollectionIterator<'a> {
    iterator: ViewIterator<'a>,
}

            
impl<'a> Iterator for ViewEntryCollectionIterator<'a> {
    type Item = Result<ViewEntry, crate::Error>;

            
    fn next(&mut self) -> Option<Self::Item> {
        self.iterator.next().map(|item| {
            item.map_err(crate::Error::from)
                .and_then(|(_, value)| bincode::deserialize(&value).map_err(Error::from))
        })
    }
}

            
2724354
#[derive(Debug, Clone)]
pub(crate) struct Context {
    data: Arc<ContextData>,
}

            
impl Deref for Context {
    type Target = ContextData;

            
4415177
    fn deref(&self) -> &Self::Target {
4415177
        &self.data
4415177
    }
}

            
#[derive(Debug)]
pub(crate) struct ContextData {
    pub(crate) roots: Roots<AnyFile>,
    key_value_state: Arc<Mutex<keyvalue::KeyValueState>>,
}

            
impl Borrow<Roots<AnyFile>> for Context {
    fn borrow(&self) -> &Roots<AnyFile> {
        &self.data.roots
    }
}

            
impl Context {
35853
    pub(crate) fn new(
35853
        roots: Roots<AnyFile>,
35853
        key_value_persistence: KeyValuePersistence,
35853
        storage_lock: Option<StorageLock>,
35853
    ) -> Self {
35853
        let background_worker_target = Watchable::new(BackgroundWorkerProcessTarget::Never);
35853
        let mut background_worker_target_watcher = background_worker_target.watch();
35853
        let key_value_state = Arc::new(Mutex::new(keyvalue::KeyValueState::new(
35853
            key_value_persistence,
35853
            roots.clone(),
35853
            background_worker_target,
35853
        )));
35853
        let background_worker_state = Arc::downgrade(&key_value_state);
35853
        let context = Self {
35853
            data: Arc::new(ContextData {
35853
                roots,
35853
                key_value_state,
35853
            }),
35853
        };
35853
        std::thread::Builder::new()
35853
            .name(String::from("keyvalue-worker"))
35853
            .spawn(move || {
35853
                keyvalue::background_worker(
35853
                    &background_worker_state,
35853
                    &mut background_worker_target_watcher,
35853
                    storage_lock,
35853
                );
35853
            })
35853
            .unwrap();
35853
        context
35853
    }

            
1537131
    pub(crate) fn perform_kv_operation(
1537131
        &self,
1537131
        op: KeyOperation,
1537131
    ) -> Result<Output, bonsaidb_core::Error> {
1537131
        let mut state = self.data.key_value_state.lock();
1537131
        state.perform_kv_operation(op, &self.data.key_value_state)
1537131
    }

            
14
    pub(crate) fn update_key_expiration<'key>(
14
        &self,
14
        tree_key: impl Into<Cow<'key, str>>,
14
        expiration: Option<Timestamp>,
14
    ) {
14
        let mut state = self.data.key_value_state.lock();
14
        state.update_key_expiration(tree_key, expiration);
14
    }

            
    #[cfg(test)]
5
    pub(crate) fn kv_persistence_watcher(&self) -> watchable::Watcher<Timestamp> {
5
        let state = self.data.key_value_state.lock();
5
        state.persistence_watcher()
5
    }
}

            
impl Drop for ContextData {
    fn drop(&mut self) {
31894
        if let Some(shutdown) = {
31894
            let mut state = self.key_value_state.lock();
31894
            state.shutdown(&self.key_value_state)
31894
        } {
372
            let _: Result<_, _> = shutdown.recv();
31522
        }
31894
    }
}

            
3786926
pub fn document_tree_name(collection: &CollectionName) -> String {
3786926
    format!("collection.{collection:#}")
3786926
}

            
pub struct DocumentIdRange(Range<DocumentId>);

            
impl<'a> BorrowByteRange<'a> for DocumentIdRange {
1378
    fn borrow_as_bytes(&'a self) -> BorrowedRange<'a> {
1378
        BorrowedRange {
1378
            start: match &self.0.start {
618
                connection::Bound::Unbounded => ops::Bound::Unbounded,
760
                connection::Bound::Included(docid) => ops::Bound::Included(docid.as_ref()),
                connection::Bound::Excluded(docid) => ops::Bound::Excluded(docid.as_ref()),
            },
1378
            end: match &self.0.end {
618
                connection::Bound::Unbounded => ops::Bound::Unbounded,
608
                connection::Bound::Included(docid) => ops::Bound::Included(docid.as_ref()),
152
                connection::Bound::Excluded(docid) => ops::Bound::Excluded(docid.as_ref()),
            },
        }
1378
    }
}

            
/// Operations that can be performed on both [`Database`] and
/// [`AsyncDatabase`](crate::AsyncDatabase).
pub trait DatabaseNonBlocking {
    /// Returns the name of the database.
    #[must_use]
    fn name(&self) -> &str;
}

            
impl DatabaseNonBlocking for Database {
5337737
    fn name(&self) -> &str {
5337737
        self.data.name.as_ref()
5337737
    }
}