1
use std::borrow::{Borrow, Cow};
2
use std::collections::{BTreeMap, HashMap, HashSet};
3
use std::convert::Infallible;
4
use std::ops::{self, Deref};
5
use std::sync::Arc;
6
use std::u8;
7

            
8
use bonsaidb_core::arc_bytes::serde::CowBytes;
9
use bonsaidb_core::arc_bytes::ArcBytes;
10
use bonsaidb_core::connection::{
11
    self, AccessPolicy, Connection, HasSchema, HasSession, LowLevelConnection, Range,
12
    SerializedQueryKey, Session, Sort, StorageConnection,
13
};
14
#[cfg(any(feature = "encryption", feature = "compression"))]
15
use bonsaidb_core::document::KeyId;
16
use bonsaidb_core::document::{BorrowedDocument, DocumentId, Header, OwnedDocument, Revision};
17
use bonsaidb_core::keyvalue::{KeyOperation, Output, Timestamp};
18
use bonsaidb_core::limits::{
19
    LIST_TRANSACTIONS_DEFAULT_RESULT_COUNT, LIST_TRANSACTIONS_MAX_RESULTS,
20
};
21
use bonsaidb_core::permissions::bonsai::{
22
    collection_resource_name, database_resource_name, document_resource_name, kv_resource_name,
23
    view_resource_name, BonsaiAction, DatabaseAction, DocumentAction, TransactionAction,
24
    ViewAction,
25
};
26
use bonsaidb_core::permissions::Permissions;
27
use bonsaidb_core::schema::view::map::MappedSerializedValue;
28
use bonsaidb_core::schema::view::{self};
29
use bonsaidb_core::schema::{self, CollectionName, Schema, Schematic, ViewName};
30
use bonsaidb_core::transaction::{
31
    self, ChangedDocument, Changes, Command, DocumentChanges, Operation, OperationResult,
32
    Transaction,
33
};
34
use itertools::Itertools;
35
use nebari::io::any::AnyFile;
36
use nebari::tree::{
37
    AnyTreeRoot, BorrowByteRange, BorrowedRange, CompareSwap, Root, ScanEvaluation, TreeRoot,
38
    Unversioned, Versioned,
39
};
40
use nebari::{AbortError, ExecutingTransaction, Roots, Tree};
41
use parking_lot::Mutex;
42
use serde::{Deserialize, Serialize};
43
use watchable::Watchable;
44

            
45
use crate::config::{Builder, KeyValuePersistence, StorageConfiguration};
46
use crate::database::keyvalue::BackgroundWorkerProcessTarget;
47
use crate::error::Error;
48
use crate::open_trees::OpenTrees;
49
use crate::storage::StorageLock;
50
#[cfg(feature = "encryption")]
51
use crate::storage::TreeVault;
52
use crate::views::{
53
    mapper, view_document_map_tree_name, view_entries_tree_name, view_invalidated_docs_tree_name,
54
    ViewEntry,
55
};
56
use crate::Storage;
57

            
58
pub mod keyvalue;
59

            
60
pub(crate) mod compat;
61
pub mod pubsub;
62

            
63
/// A database stored in BonsaiDb. This type blocks the current thread when
64
/// used. See [`AsyncDatabase`](crate::AsyncDatabase) for this type's async counterpart.
65
///
66
/// ## Converting between Blocking and Async Types
67
///
68
/// [`AsyncDatabase`](crate::AsyncDatabase) and [`Database`] can be converted to and from each other
69
/// using:
70
///
71
/// - [`AsyncDatabase::into_blocking()`](crate::AsyncDatabase::into_blocking)
72
/// - [`AsyncDatabase::to_blocking()`](crate::AsyncDatabase::to_blocking)
73
/// - [`AsyncDatabase::as_blocking()`](crate::AsyncDatabase::as_blocking)
74
/// - [`Database::into_async()`]
75
/// - [`Database::to_async()`]
76
/// - [`Database::into_async_with_runtime()`]
77
/// - [`Database::to_async_with_runtime()`]
78
///
79
/// ## Using `Database` to create a single database
80
///
81
/// `Database`provides an easy mechanism to open and access a single database:
82
///
83
/// ```rust
84
/// // `bonsaidb_core` is re-exported to `bonsaidb::core` or `bonsaidb_local::core`.
85
/// use bonsaidb_core::schema::Collection;
86
/// // `bonsaidb_local` is re-exported to `bonsaidb::local` if using the omnibus crate.
87
/// use bonsaidb_local::{
88
///     config::{Builder, StorageConfiguration},
89
///     Database,
90
/// };
91
/// use serde::{Deserialize, Serialize};
92
///
93
/// #[derive(Debug, Serialize, Deserialize, Collection)]
94
/// #[collection(name = "blog-posts")]
95
/// # #[collection(core = bonsaidb_core)]
96
/// struct BlogPost {
97
///     pub title: String,
98
///     pub contents: String,
99
/// }
100
///
101
/// # fn test() -> Result<(), bonsaidb_local::Error> {
102
/// let db = Database::open::<BlogPost>(StorageConfiguration::new("my-db.bonsaidb"))?;
103
/// # Ok(())
104
/// # }
105
/// ```
106
///
107
/// Under the hood, this initializes a [`Storage`] instance pointing at
108
/// "./my-db.bonsaidb". It then returns (or creates) a database named "default"
109
/// with the schema `BlogPost`.
110
///
111
/// In this example, `BlogPost` implements the [`Collection`](schema::Collection) trait, and all
112
/// collections can be used as a [`Schema`].
113
4578256
#[derive(Debug, Clone)]
114
pub struct Database {
115
    pub(crate) data: Arc<Data>,
116
    pub(crate) storage: Storage,
117
}
118

            
119
#[derive(Debug)]
120
pub struct Data {
121
    pub name: Arc<Cow<'static, str>>,
122
    context: Context,
123
    pub(crate) schema: Arc<Schematic>,
124
}
125

            
126
impl Database {
127
    /// Opens a local file as a bonsaidb.
128
140158
    pub(crate) fn new<DB: Schema, S: Into<Cow<'static, str>> + Send>(
129
140158
        name: S,
130
140158
        context: Context,
131
140158
        storage: &Storage,
132
140158
    ) -> Result<Self, Error> {
133
140158
        let name = name.into();
134
140158
        let schema = Arc::new(DB::schematic()?);
135
140158
        let db = Self {
136
140158
            storage: storage.clone(),
137
140158
            data: Arc::new(Data {
138
140158
                name: Arc::new(name),
139
140158
                context,
140
140158
                schema,
141
140158
            }),
142
140158
        };
143
140158

            
144
140158
        if storage.instance.check_view_integrity_on_database_open() {
145
18
            for view in db.data.schema.views() {
146
18
                storage.instance.tasks().spawn_integrity_check(view, &db);
147
18
            }
148
140154
        }
149

            
150
140158
        storage
151
140158
            .instance
152
140158
            .tasks()
153
140158
            .spawn_key_value_expiration_loader(&db);
154
140158

            
155
140158
        Ok(db)
156
140158
    }
157

            
158
    /// Restricts an unauthenticated instance to having `effective_permissions`.
159
    /// Returns `None` if a session has already been established.
160
    #[must_use]
161
    pub fn with_effective_permissions(&self, effective_permissions: Permissions) -> Option<Self> {
162
        self.storage
163
            .with_effective_permissions(effective_permissions)
164
            .map(|storage| Self {
165
                storage,
166
                data: self.data.clone(),
167
            })
168
    }
169

            
170
    /// Creates a `Storage` with a single-database named "default" with its data
171
    /// stored at `path`. This requires exclusive access to the storage location
172
    /// configured. Attempting to open the same path multiple times concurrently
173
    /// will lead to errors.
174
    ///
175
    /// Using this method is perfect if only one database is being used.
176
    /// However, if multiple databases are needed, it is much better to store
177
    /// multiple databases in a single [`Storage`] instance rather than creating
178
    /// multiple independent databases using this method.
179
    ///
180
    /// When opening multiple databases using this function, each database will
181
    /// have its own thread pool, cache, task worker pool, and more. By using a
182
    /// single [`Storage`] instance, BonsaiDb will use less resources and likely
183
    /// perform better.
184
50
    pub fn open<DB: Schema>(configuration: StorageConfiguration) -> Result<Self, Error> {
185
50
        let storage = Storage::open(configuration.with_schema::<DB>()?)?;
186

            
187
50
        Ok(storage.create_database::<DB>("default", true)?)
188
50
    }
189

            
190
    /// Returns the [`Schematic`] for the schema for this database.
191
    #[must_use]
192
4798034
    pub fn schematic(&self) -> &'_ Schematic {
193
4798034
        &self.data.schema
194
4798034
    }
195

            
196
2628579
    pub(crate) fn roots(&self) -> &'_ nebari::Roots<AnyFile> {
197
2628579
        &self.data.context.roots
198
2628579
    }
199

            
200
1041270
    fn for_each_in_view<F: FnMut(ViewEntry) -> Result<(), bonsaidb_core::Error> + Send + Sync>(
201
1041270
        &self,
202
1041270
        view: &dyn view::Serialized,
203
1041270
        key: Option<SerializedQueryKey>,
204
1041270
        order: Sort,
205
1041270
        limit: Option<u32>,
206
1041270
        access_policy: AccessPolicy,
207
1041270
        mut callback: F,
208
1041270
    ) -> Result<(), bonsaidb_core::Error> {
209
1041270
        if matches!(access_policy, AccessPolicy::UpdateBefore) {
210
391944
            self.storage
211
391944
                .instance
212
391944
                .tasks()
213
391944
                .update_view_if_needed(view, self, true)?;
214
649326
        } else if let Some(integrity_check) = self
215
649326
            .storage
216
649326
            .instance
217
649326
            .tasks()
218
649326
            .spawn_integrity_check(view, self)
219
        {
220
1621
            integrity_check
221
1621
                .receive()
222
1621
                .map_err(Error::from)?
223
1621
                .map_err(Error::from)?;
224
647705
        }
225

            
226
1041270
        let view_entries = self
227
1041270
            .roots()
228
1041270
            .tree(self.collection_tree(
229
1041270
                &view.collection(),
230
1041270
                view_entries_tree_name(&view.view_name()),
231
1041270
            )?)
232
1041270
            .map_err(Error::from)?;
233

            
234
        {
235
1041270
            for entry in Self::create_view_iterator(&view_entries, key, order, limit)? {
236
848361
                callback(entry)?;
237
            }
238
        }
239

            
240
1041270
        if matches!(access_policy, AccessPolicy::UpdateAfter) {
241
148
            let db = self.clone();
242
148
            let view_name = view.view_name();
243
148
            let view = db
244
148
                .data
245
148
                .schema
246
148
                .view_by_name(&view_name)
247
148
                .expect("query made with view that isn't registered with this database");
248
148
            db.storage
249
148
                .instance
250
148
                .tasks()
251
148
                .update_view_if_needed(view, &db, false)?;
252
1041122
        }
253

            
254
1041270
        Ok(())
255
1041270
    }
256

            
257
672852
    fn open_trees_for_transaction(&self, transaction: &Transaction) -> Result<OpenTrees, Error> {
258
672852
        let mut open_trees = OpenTrees::default();
259
1719730
        for op in &transaction.operations {
260
1047026
            if self
261
1047026
                .data
262
1047026
                .schema
263
1047026
                .collection_primary_key_description(&op.collection)
264
1047026
                .is_none()
265
            {
266
148
                return Err(Error::Core(bonsaidb_core::Error::CollectionNotFound));
267
1046878
            }
268

            
269
            #[cfg(any(feature = "encryption", feature = "compression"))]
270
1046878
            let vault = if let Some(encryption_key) =
271
1046878
                self.collection_encryption_key(&op.collection).cloned()
272
            {
273
                #[cfg(feature = "encryption")]
274
4853
                if let Some(mut vault) = self.storage().tree_vault().cloned() {
275
2800
                    vault.key = Some(encryption_key);
276
2800
                    Some(vault)
277
                } else {
278
2053
                    TreeVault::new_if_needed(
279
2053
                        Some(encryption_key),
280
2053
                        self.storage().vault(),
281
2053
                        #[cfg(feature = "compression")]
282
2053
                        None,
283
2053
                    )
284
                }
285

            
286
                #[cfg(not(feature = "encryption"))]
287
                {
288
                    drop(encryption_key);
289
                    return Err(Error::EncryptionDisabled);
290
                }
291
            } else {
292
1042025
                self.storage().tree_vault().cloned()
293
            };
294

            
295
1046878
            open_trees.open_trees_for_document_change(
296
1046878
                &op.collection,
297
1046878
                &self.data.schema,
298
1046878
                #[cfg(any(feature = "encryption", feature = "compression"))]
299
1046878
                vault,
300
1046878
            );
301
        }
302

            
303
672704
        Ok(open_trees)
304
672852
    }
305

            
306
672852
    fn apply_transaction_to_roots(
307
672852
        &self,
308
672852
        transaction: &Transaction,
309
672852
    ) -> Result<Vec<OperationResult>, Error> {
310
672852
        let open_trees = self.open_trees_for_transaction(transaction)?;
311

            
312
672704
        let mut roots_transaction = self
313
672704
            .data
314
672704
            .context
315
672704
            .roots
316
672704
            .transaction::<_, dyn AnyTreeRoot<AnyFile>>(&open_trees.trees)?;
317

            
318
672704
        let mut results = Vec::new();
319
672704
        let mut changed_documents = Vec::new();
320
672704
        let mut collection_indexes = HashMap::new();
321
672704
        let mut collections = Vec::new();
322
1699771
        for op in &transaction.operations {
323
1046582
            let result = self.execute_operation(
324
1046582
                op,
325
1046582
                &mut roots_transaction,
326
1046582
                &open_trees.trees_index_by_name,
327
1046582
            )?;
328

            
329
1027067
            if let Some((collection, id, deleted)) = match &result {
330
966219
                OperationResult::DocumentUpdated { header, collection } => {
331
966219
                    Some((collection, header.id.clone(), false))
332
                }
333
59688
                OperationResult::DocumentDeleted { id, collection } => {
334
59688
                    Some((collection, id.clone(), true))
335
                }
336
1160
                OperationResult::Success => None,
337
1025907
            } {
338
1025907
                let collection = match collection_indexes.get(collection) {
339
370524
                    Some(index) => *index,
340
                    None => {
341
655383
                        if let Ok(id) = u16::try_from(collections.len()) {
342
655383
                            collection_indexes.insert(collection.clone(), id);
343
655383
                            collections.push(collection.clone());
344
655383
                            id
345
                        } else {
346
                            return Err(Error::TransactionTooLarge);
347
                        }
348
                    }
349
                };
350
1025907
                changed_documents.push(ChangedDocument {
351
1025907
                    collection,
352
1025907
                    id,
353
1025907
                    deleted,
354
1025907
                });
355
1160
            }
356
1027067
            results.push(result);
357
        }
358

            
359
653189
        self.invalidate_changed_documents(
360
653189
            &mut roots_transaction,
361
653189
            &open_trees,
362
653189
            &collections,
363
653189
            &changed_documents,
364
653189
        )?;
365

            
366
653189
        roots_transaction
367
653189
            .entry_mut()
368
653189
            .set_data(compat::serialize_executed_transaction_changes(
369
653189
                &Changes::Documents(DocumentChanges {
370
653189
                    collections,
371
653189
                    documents: changed_documents,
372
653189
                }),
373
653189
            )?)?;
374

            
375
653189
        roots_transaction.commit()?;
376

            
377
653189
        Ok(results)
378
672852
    }
379

            
380
    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
381
653189
    fn invalidate_changed_documents(
382
653189
        &self,
383
653189
        roots_transaction: &mut ExecutingTransaction<AnyFile>,
384
653189
        open_trees: &OpenTrees,
385
653189
        collections: &[CollectionName],
386
653189
        changed_documents: &[ChangedDocument],
387
653189
    ) -> Result<(), Error> {
388
655385
        for (collection, changed_documents) in &changed_documents
389
653189
            .iter()
390
1025907
            .group_by(|doc| &collections[usize::from(doc.collection)])
391
        {
392
655383
            let mut views = self
393
655383
                .data
394
655383
                .schema
395
655383
                .views_in_collection(collection)
396
1887492
                .filter(|view| !view.update_policy().is_eager())
397
655383
                .peekable();
398
655383
            if views.peek().is_some() {
399
327941
                let changed_documents = changed_documents.collect::<Vec<_>>();
400
1855676
                for view in views {
401
1527735
                    let view_name = view.view_name();
402
1527735
                    let tree_name = view_invalidated_docs_tree_name(&view_name);
403
3286506
                    for changed_document in &changed_documents {
404
1758771
                        let mut invalidated_docs = roots_transaction
405
1758771
                            .tree::<Unversioned>(open_trees.trees_index_by_name[&tree_name])
406
1758771
                            .unwrap();
407
1758771
                        invalidated_docs.set(changed_document.id.as_ref().to_vec(), b"")?;
408
                    }
409
                }
410
327442
            }
411
        }
412
653189
        Ok(())
413
653189
    }
414

            
415
1046582
    fn execute_operation(
416
1046582
        &self,
417
1046582
        operation: &Operation,
418
1046582
        transaction: &mut ExecutingTransaction<AnyFile>,
419
1046582
        tree_index_map: &HashMap<String, usize>,
420
1046582
    ) -> Result<OperationResult, Error> {
421
1046582
        match &operation.command {
422
779926
            Command::Insert { id, contents } => {
423
779926
                self.execute_insert(operation, transaction, tree_index_map, id.clone(), contents)
424
            }
425
204920
            Command::Update { header, contents } => self.execute_update(
426
204920
                operation,
427
204920
                transaction,
428
204920
                tree_index_map,
429
204920
                &header.id,
430
204920
                Some(&header.revision),
431
204920
                contents,
432
204920
            ),
433
592
            Command::Overwrite { id, contents } => {
434
592
                self.execute_update(operation, transaction, tree_index_map, id, None, contents)
435
            }
436
59688
            Command::Delete { header } => {
437
59688
                self.execute_delete(operation, transaction, tree_index_map, header)
438
            }
439
1456
            Command::Check { id, revision } => Self::execute_check(
440
1456
                operation,
441
1456
                transaction,
442
1456
                tree_index_map,
443
1456
                id.clone(),
444
1456
                *revision,
445
1456
            ),
446
        }
447
1046582
    }
448

            
449
    #[cfg_attr(
450
        feature = "tracing",
451
        tracing::instrument(
452
            level = "trace",
453
            skip(self, operation, transaction, tree_index_map, contents),
454
            fields(
455
                database = self.name(),
456
                collection.name = operation.collection.name.as_ref(),
457
                collection.authority = operation.collection.authority.as_ref()
458
            )
459
        )
460
    )]
461
205512
    fn execute_update(
462
205512
        &self,
463
205512
        operation: &Operation,
464
205512
        transaction: &mut ExecutingTransaction<AnyFile>,
465
205512
        tree_index_map: &HashMap<String, usize>,
466
205512
        id: &DocumentId,
467
205512
        check_revision: Option<&Revision>,
468
205512
        contents: &[u8],
469
205512
    ) -> Result<OperationResult, crate::Error> {
470
205512
        let mut documents = transaction
471
205512
            .tree::<Versioned>(tree_index_map[&document_tree_name(&operation.collection)])
472
205512
            .unwrap();
473
205512
        let document_id = ArcBytes::from(id.to_vec());
474
205512
        let mut result = None;
475
205512
        let mut updated = false;
476
205512
        documents.modify(
477
205512
            vec![document_id.clone()],
478
205512
            nebari::tree::Operation::CompareSwap(CompareSwap::new(&mut |_key,
479
                                                                        value: Option<
480
                ArcBytes<'_>,
481
205512
            >| {
482
205512
                if let Some(old) = value {
483
205216
                    let doc = match deserialize_document(&old) {
484
205216
                        Ok(doc) => doc,
485
                        Err(err) => {
486
                            result = Some(Err(err));
487
                            return nebari::tree::KeyOperation::Skip;
488
                        }
489
                    };
490
205216
                    if check_revision.is_none() || Some(&doc.header.revision) == check_revision {
491
205068
                        if let Some(updated_revision) = doc.header.revision.next_revision(contents)
492
                        {
493
204772
                            let updated_header = Header {
494
204772
                                id: id.clone(),
495
204772
                                revision: updated_revision,
496
204772
                            };
497
204772
                            let serialized_doc = match serialize_document(&BorrowedDocument {
498
204772
                                header: updated_header.clone(),
499
204772
                                contents: CowBytes::from(contents),
500
204772
                            }) {
501
204772
                                Ok(bytes) => bytes,
502
                                Err(err) => {
503
                                    result = Some(Err(Error::from(err)));
504
                                    return nebari::tree::KeyOperation::Skip;
505
                                }
506
                            };
507
204772
                            result = Some(Ok(OperationResult::DocumentUpdated {
508
204772
                                collection: operation.collection.clone(),
509
204772
                                header: updated_header,
510
204772
                            }));
511
204772
                            updated = true;
512
204772
                            return nebari::tree::KeyOperation::Set(ArcBytes::from(serialized_doc));
513
296
                        }
514
296

            
515
296
                        // If no new revision was made, it means an attempt to
516
296
                        // save a document with the same contents was made.
517
296
                        // We'll return a success but not actually give a new
518
296
                        // version
519
296
                        result = Some(Ok(OperationResult::DocumentUpdated {
520
296
                            collection: operation.collection.clone(),
521
296
                            header: doc.header,
522
296
                        }));
523
148
                    } else {
524
148
                        result = Some(Err(Error::Core(bonsaidb_core::Error::DocumentConflict(
525
148
                            operation.collection.clone(),
526
148
                            Box::new(doc.header),
527
148
                        ))));
528
148
                    }
529
296
                } else if check_revision.is_none() {
530
148
                    let doc = BorrowedDocument::new(id.clone(), contents);
531
148
                    match serialize_document(&doc).map(|bytes| (doc, bytes)) {
532
148
                        Ok((doc, serialized)) => {
533
148
                            result = Some(Ok(OperationResult::DocumentUpdated {
534
148
                                collection: operation.collection.clone(),
535
148
                                header: doc.header,
536
148
                            }));
537
148
                            updated = true;
538
148
                            return nebari::tree::KeyOperation::Set(ArcBytes::from(serialized));
539
                        }
540
                        Err(err) => {
541
                            result = Some(Err(Error::from(err)));
542
                        }
543
                    }
544
148
                } else {
545
148
                    result = Some(Err(Error::Core(bonsaidb_core::Error::DocumentNotFound(
546
148
                        operation.collection.clone(),
547
148
                        Box::new(id.clone()),
548
148
                    ))));
549
148
                }
550
592
                nebari::tree::KeyOperation::Skip
551
205512
            })),
552
205512
        )?;
553
205512
        drop(documents);
554
205512

            
555
205512
        if updated {
556
204920
            self.update_eager_views(&document_id, operation, transaction, tree_index_map)?;
557
592
        }
558

            
559
205216
        result.expect("nebari should invoke the callback even when the key isn't found")
560
205512
    }
561

            
562
    #[cfg_attr(
563
        feature = "tracing",
564
        tracing::instrument(
565
            level = "trace",
566
            skip(self, operation, transaction, tree_index_map, contents),
567
            fields(
568
                database = self.name(),
569
                collection.name = operation.collection.name.as_ref(),
570
                collection.authority = operation.collection.authority.as_ref()
571
            )
572
        )
573
    )]
574
779926
    fn execute_insert(
575
779926
        &self,
576
779926
        operation: &Operation,
577
779926
        transaction: &mut ExecutingTransaction<AnyFile>,
578
779926
        tree_index_map: &HashMap<String, usize>,
579
779926
        id: Option<DocumentId>,
580
779926
        contents: &[u8],
581
779926
    ) -> Result<OperationResult, Error> {
582
779926
        let mut documents = transaction
583
779926
            .tree::<Versioned>(tree_index_map[&document_tree_name(&operation.collection)])
584
779926
            .unwrap();
585
779926
        let id = if let Some(id) = id {
586
333390
            id
587
446536
        } else if let Some(last_key) = documents.last_key()? {
588
415192
            let id = DocumentId::try_from(last_key.as_slice())?;
589
415192
            self.data
590
415192
                .schema
591
415192
                .next_id_for_collection(&operation.collection, Some(id))?
592
        } else {
593
31344
            self.data
594
31344
                .schema
595
31344
                .next_id_for_collection(&operation.collection, None)?
596
        };
597

            
598
779890
        let doc = BorrowedDocument::new(id, contents);
599
779890
        let serialized: Vec<u8> = serialize_document(&doc)?;
600
779890
        let document_id = ArcBytes::from(doc.header.id.as_ref().to_vec());
601
779890
        if let Some(document) = documents.replace(document_id.clone(), serialized)? {
602
18185
            let doc = deserialize_document(&document)?;
603
18185
            Err(Error::Core(bonsaidb_core::Error::DocumentConflict(
604
18185
                operation.collection.clone(),
605
18185
                Box::new(doc.header),
606
18185
            )))
607
        } else {
608
761705
            drop(documents);
609
761705
            self.update_eager_views(&document_id, operation, transaction, tree_index_map)?;
610

            
611
761299
            Ok(OperationResult::DocumentUpdated {
612
761299
                collection: operation.collection.clone(),
613
761299
                header: doc.header,
614
761299
            })
615
        }
616
779926
    }
617

            
618
    #[cfg_attr(feature = "tracing", tracing::instrument(
619
        level = "trace",
620
        skip(self, operation, transaction, tree_index_map),
621
        fields(
622
            database = self.name(),
623
            collection.name = operation.collection.name.as_ref(),
624
            collection.authority = operation.collection.authority.as_ref()
625
        )
626
    ))]
627
59688
    fn execute_delete(
628
59688
        &self,
629
59688
        operation: &Operation,
630
59688
        transaction: &mut ExecutingTransaction<AnyFile>,
631
59688
        tree_index_map: &HashMap<String, usize>,
632
59688
        header: &Header,
633
59688
    ) -> Result<OperationResult, Error> {
634
59688
        let mut documents = transaction
635
59688
            .tree::<Versioned>(tree_index_map[&document_tree_name(&operation.collection)])
636
59688
            .unwrap();
637
59688
        if let Some(vec) = documents.remove(header.id.as_ref())? {
638
59688
            drop(documents);
639
59688
            let doc = deserialize_document(&vec)?;
640
59688
            if &doc.header == header {
641
59688
                self.update_eager_views(
642
59688
                    &ArcBytes::from(doc.header.id.to_vec()),
643
59688
                    operation,
644
59688
                    transaction,
645
59688
                    tree_index_map,
646
59688
                )?;
647

            
648
59688
                Ok(OperationResult::DocumentDeleted {
649
59688
                    collection: operation.collection.clone(),
650
59688
                    id: header.id.clone(),
651
59688
                })
652
            } else {
653
                Err(Error::Core(bonsaidb_core::Error::DocumentConflict(
654
                    operation.collection.clone(),
655
                    Box::new(header.clone()),
656
                )))
657
            }
658
        } else {
659
            Err(Error::Core(bonsaidb_core::Error::DocumentNotFound(
660
                operation.collection.clone(),
661
                Box::new(header.id.clone()),
662
            )))
663
        }
664
59688
    }
665

            
666
    #[cfg_attr(feature = "tracing", tracing::instrument(
667
        level = "trace",
668
        skip(self, operation, transaction, tree_index_map),
669
        fields(
670
            database = self.name(),
671
            collection.name = operation.collection.name.as_ref(),
672
            collection.authority = operation.collection.authority.as_ref()
673
        )
674
    ))]
675
1026313
    fn update_eager_views(
676
1026313
        &self,
677
1026313
        document_id: &ArcBytes<'static>,
678
1026313
        operation: &Operation,
679
1026313
        transaction: &mut ExecutingTransaction<AnyFile>,
680
1026313
        tree_index_map: &HashMap<String, usize>,
681
1026313
    ) -> Result<(), Error> {
682
1026313
        let mut eager_views = self
683
1026313
            .data
684
1026313
            .schema
685
1026313
            .eager_views_in_collection(&operation.collection)
686
1026313
            .peekable();
687
1026313
        if eager_views.peek().is_some() {
688
440995
            let documents = transaction
689
440995
                .unlocked_tree(tree_index_map[&document_tree_name(&operation.collection)])
690
440995
                .unwrap();
691
881288
            for view in eager_views {
692
440995
                let name = view.view_name();
693
440995
                let document_map = transaction
694
440995
                    .unlocked_tree(tree_index_map[&view_document_map_tree_name(&name)])
695
440995
                    .unwrap();
696
440995
                let view_entries = transaction
697
440995
                    .unlocked_tree(tree_index_map[&view_entries_tree_name(&name)])
698
440995
                    .unwrap();
699
440995
                mapper::DocumentRequest {
700
440995
                    database: self,
701
440995
                    document_ids: vec![document_id.clone()],
702
440995
                    map_request: &mapper::Map {
703
440995
                        database: self.data.name.clone(),
704
440995
                        collection: operation.collection.clone(),
705
440995
                        view_name: name.clone(),
706
440995
                    },
707
440995
                    document_map,
708
440995
                    documents,
709
440995
                    view_entries,
710
440995
                    view,
711
440995
                }
712
440995
                .map()?;
713
            }
714
585318
        }
715

            
716
1025611
        Ok(())
717
1026313
    }
718

            
719
    #[cfg_attr(feature = "tracing", tracing::instrument(
720
        level = "trace",
721
        skip(operation, transaction, tree_index_map),
722
        fields(
723
            collection.name = operation.collection.name.as_ref(),
724
            collection.authority = operation.collection.authority.as_ref(),
725
        ),
726
    ))]
727
1456
    fn execute_check(
728
1456
        operation: &Operation,
729
1456
        transaction: &mut ExecutingTransaction<AnyFile>,
730
1456
        tree_index_map: &HashMap<String, usize>,
731
1456
        id: DocumentId,
732
1456
        revision: Option<Revision>,
733
1456
    ) -> Result<OperationResult, Error> {
734
1456
        let mut documents = transaction
735
1456
            .tree::<Versioned>(tree_index_map[&document_tree_name(&operation.collection)])
736
1456
            .unwrap();
737
1456
        if let Some(vec) = documents.get(id.as_ref())? {
738
1308
            drop(documents);
739

            
740
1308
            if let Some(revision) = revision {
741
296
                let doc = deserialize_document(&vec)?;
742
296
                if doc.header.revision != revision {
743
148
                    return Err(Error::Core(bonsaidb_core::Error::DocumentConflict(
744
148
                        operation.collection.clone(),
745
148
                        Box::new(Header { id, revision }),
746
148
                    )));
747
148
                }
748
1012
            }
749

            
750
1160
            Ok(OperationResult::Success)
751
        } else {
752
148
            Err(Error::Core(bonsaidb_core::Error::DocumentNotFound(
753
148
                operation.collection.clone(),
754
148
                Box::new(id),
755
148
            )))
756
        }
757
1456
    }
758

            
759
1041270
    fn create_view_iterator(
760
1041270
        view_entries: &Tree<Unversioned, AnyFile>,
761
1041270
        key: Option<SerializedQueryKey>,
762
1041270
        order: Sort,
763
1041270
        limit: Option<u32>,
764
1041270
    ) -> Result<Vec<ViewEntry>, Error> {
765
1041270
        let mut values = Vec::new();
766
1041270
        let forwards = match order {
767
1041122
            Sort::Ascending => true,
768
148
            Sort::Descending => false,
769
        };
770
1041270
        let mut values_read = 0;
771
1041270
        if let Some(key) = key {
772
1034323
            match key {
773
1412
                SerializedQueryKey::Range(range) => {
774
1412
                    view_entries.scan::<Infallible, _, _, _, _>(
775
2752
                        &range.map_ref(|bytes| &bytes[..]),
776
1412
                        forwards,
777
1412
                        |_, _, _| ScanEvaluation::ReadData,
778
3436
                        |_, _| {
779
3436
                            if let Some(limit) = limit {
780
296
                                if values_read >= limit {
781
148
                                    return ScanEvaluation::Stop;
782
148
                                }
783
148
                                values_read += 1;
784
3140
                            }
785
3288
                            ScanEvaluation::ReadData
786
3436
                        },
787
3288
                        |_key, _index, value| {
788
3288
                            values.push(value);
789
3288
                            Ok(())
790
3288
                        },
791
1412
                    )?;
792
                }
793
1032183
                SerializedQueryKey::Matches(key) => {
794
1032183
                    values.extend(view_entries.get(&key)?);
795
                }
796
728
                SerializedQueryKey::Multiple(mut list) => {
797
728
                    list.sort();
798
728

            
799
728
                    values.extend(
800
728
                        view_entries
801
1168
                            .get_multiple(list.iter().map(|bytes| bytes.as_slice()))?
802
728
                            .into_iter()
803
1168
                            .map(|(_, value)| value),
804
                    );
805
                }
806
            }
807
        } else {
808
6947
            view_entries.scan::<Infallible, _, _, _, _>(
809
6947
                &(..),
810
6947
                forwards,
811
6947
                |_, _, _| ScanEvaluation::ReadData,
812
7919
                |_, _| {
813
7822
                    if let Some(limit) = limit {
814
                        if values_read >= limit {
815
                            return ScanEvaluation::Stop;
816
                        }
817
                        values_read += 1;
818
7822
                    }
819
7822
                    ScanEvaluation::ReadData
820
7919
                },
821
7919
                |_, _, value| {
822
7822
                    values.push(value);
823
7822
                    Ok(())
824
7919
                },
825
6947
            )?;
826
        }
827

            
828
1041270
        values
829
1041270
            .into_iter()
830
1041270
            .map(|value| bincode::deserialize(&value).map_err(Error::from))
831
1041270
            .collect::<Result<Vec<_>, Error>>()
832
1041270
    }
833

            
834
    #[cfg(any(feature = "encryption", feature = "compression"))]
835
3908698
    pub(crate) fn collection_encryption_key(&self, collection: &CollectionName) -> Option<&KeyId> {
836
3908698
        self.schematic()
837
3908698
            .encryption_key_for_collection(collection)
838
3908698
            .or_else(|| self.storage.default_encryption_key())
839
3908698
    }
840

            
841
    #[cfg_attr(
842
        not(feature = "encryption"),
843
        allow(
844
            unused_mut,
845
            unused_variables,
846
            clippy::unused_self,
847
            clippy::let_and_return
848
        )
849
    )]
850
    #[allow(clippy::unnecessary_wraps)]
851
2861820
    pub(crate) fn collection_tree<R: Root, S: Into<Cow<'static, str>>>(
852
2861820
        &self,
853
2861820
        collection: &CollectionName,
854
2861820
        name: S,
855
2861820
    ) -> Result<TreeRoot<R, AnyFile>, Error> {
856
2861820
        let mut tree = R::tree(name);
857
2861820

            
858
2861820
        #[cfg(any(feature = "encryption", feature = "compression"))]
859
2861820
        match (
860
2861820
            self.collection_encryption_key(collection),
861
2861820
            self.storage().tree_vault().cloned(),
862
        ) {
863
42363
            (Some(override_key), Some(mut vault)) => {
864
42363
                #[cfg(feature = "encryption")]
865
42363
                {
866
42363
                    vault.key = Some(override_key.clone());
867
42363
                    tree = tree.with_vault(vault);
868
42363
                }
869

            
870
                #[cfg(not(feature = "encryption"))]
871
                {
872
                    return Err(Error::EncryptionDisabled);
873
                }
874
            }
875
692344
            (None, Some(vault)) => {
876
692344
                tree = tree.with_vault(vault);
877
692344
            }
878
2127113
            (key, None) => {
879
                #[cfg(feature = "encryption")]
880
2127113
                if let Some(vault) = TreeVault::new_if_needed(
881
2127113
                    key.cloned(),
882
2127113
                    self.storage().vault(),
883
2127113
                    #[cfg(feature = "compression")]
884
2127113
                    None,
885
2127113
                ) {
886
45615
                    tree = tree.with_vault(vault);
887
2081498
                }
888

            
889
                #[cfg(not(feature = "encryption"))]
890
                if key.is_some() {
891
                    return Err(Error::EncryptionDisabled);
892
                }
893
            }
894
        }
895

            
896
2861820
        Ok(tree)
897
2861820
    }
898

            
899
1
    pub(crate) fn update_key_expiration<'key>(
900
1
        &self,
901
1
        tree_key: impl Into<Cow<'key, str>>,
902
1
        expiration: Option<Timestamp>,
903
1
    ) {
904
1
        self.data
905
1
            .context
906
1
            .update_key_expiration(tree_key, expiration);
907
1
    }
908

            
909
    /// Converts this instance into its blocking version, which is able to be
910
    /// used without async. The returned instance uses the current Tokio runtime
911
    /// handle to spawn blocking tasks.
912
    ///
913
    /// # Panics
914
    ///
915
    /// Panics if called outside the context of a Tokio runtime.
916
    #[cfg(feature = "async")]
917
    #[must_use]
918
2570298
    pub fn into_async(self) -> crate::AsyncDatabase {
919
2570298
        self.into_async_with_runtime(tokio::runtime::Handle::current())
920
2570298
    }
921

            
922
    /// Converts this instance into its blocking version, which is able to be
923
    /// used without async. The returned instance uses the provided runtime
924
    /// handle to spawn blocking tasks.
925
    #[cfg(feature = "async")]
926
    #[must_use]
927
2570298
    pub fn into_async_with_runtime(self, runtime: tokio::runtime::Handle) -> crate::AsyncDatabase {
928
2570298
        crate::AsyncDatabase {
929
2570298
            database: self,
930
2570298
            runtime: Arc::new(runtime),
931
2570298
        }
932
2570298
    }
933

            
934
    /// Converts this instance into its blocking version, which is able to be
935
    /// used without async. The returned instance uses the current Tokio runtime
936
    /// handle to spawn blocking tasks.
937
    ///
938
    /// # Panics
939
    ///
940
    /// Panics if called outside the context of a Tokio runtime.
941
    #[cfg(feature = "async")]
942
    #[must_use]
943
    pub fn to_async(&self) -> crate::AsyncDatabase {
944
        self.clone().into_async()
945
    }
946

            
947
    /// Converts this instance into its blocking version, which is able to be
948
    /// used without async. The returned instance uses the provided runtime
949
    /// handle to spawn blocking tasks.
950
    #[cfg(feature = "async")]
951
    #[must_use]
952
    pub fn to_async_with_runtime(&self, runtime: tokio::runtime::Handle) -> crate::AsyncDatabase {
953
        self.clone().into_async_with_runtime(runtime)
954
    }
955
}
956
59
#[derive(Serialize, Deserialize)]
957
struct LegacyHeader {
958
    id: u64,
959
    revision: Revision,
960
}
961
59
#[derive(Serialize, Deserialize)]
962
struct LegacyDocument<'a> {
963
    header: LegacyHeader,
964
    #[serde(borrow)]
965
    contents: &'a [u8],
966
}
967

            
968
1692446
pub(crate) fn deserialize_document(bytes: &[u8]) -> Result<BorrowedDocument<'_>, Error> {
969
1692446
    match pot::from_slice::<BorrowedDocument<'_>>(bytes) {
970
1692387
        Ok(document) => Ok(document),
971
59
        Err(err) => match bincode::deserialize::<LegacyDocument<'_>>(bytes) {
972
59
            Ok(legacy_doc) => Ok(BorrowedDocument {
973
59
                header: Header {
974
59
                    id: DocumentId::from_u64(legacy_doc.header.id),
975
59
                    revision: legacy_doc.header.revision,
976
59
                },
977
59
                contents: CowBytes::from(legacy_doc.contents),
978
59
            }),
979
            Err(_) => Err(Error::from(err)),
980
        },
981
    }
982
1692446
}
983

            
984
984810
fn serialize_document(document: &BorrowedDocument<'_>) -> Result<Vec<u8>, bonsaidb_core::Error> {
985
984810
    pot::to_vec(document)
986
984810
        .map_err(Error::from)
987
984810
        .map_err(bonsaidb_core::Error::from)
988
984810
}
989

            
990
impl HasSession for Database {
991
5155234
    fn session(&self) -> Option<&Session> {
992
5155234
        self.storage.session()
993
5155234
    }
994
}
995

            
996
impl Connection for Database {
997
    type Storage = Storage;
998

            
999
6527745
    fn storage(&self) -> Self::Storage {
6527745
        self.storage.clone()
6527745
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(
        level = "trace",
        skip(self),
        fields(
            database = self.name(),
        )
    ))]
38748
    fn list_executed_transactions(
38748
        &self,
38748
        starting_id: Option<u64>,
38748
        result_limit: Option<u32>,
38748
    ) -> Result<Vec<transaction::Executed>, bonsaidb_core::Error> {
38748
        self.check_permission(
38748
            database_resource_name(self.name()),
38748
            &BonsaiAction::Database(DatabaseAction::Transaction(TransactionAction::ListExecuted)),
38748
        )?;
38748
        let result_limit = usize::try_from(
38748
            result_limit
38748
                .unwrap_or(LIST_TRANSACTIONS_DEFAULT_RESULT_COUNT)
38748
                .min(LIST_TRANSACTIONS_MAX_RESULTS),
38748
        )
38748
        .unwrap();
38748
        if result_limit > 0 {
38600
            let range = if let Some(starting_id) = starting_id {
19812
                Range::from(starting_id..)
            } else {
18788
                Range::from(..)
            };

            
38600
            let mut entries = Vec::new();
38600
            self.roots()
38600
                .transactions()
367637
                .scan(range, |entry| {
367637
                    if entry.data().is_some() {
366591
                        entries.push(entry);
366591
                    }
367637
                    entries.len() < result_limit
367637
                })
38600
                .map_err(Error::from)?;

            
38600
            entries
38600
                .into_iter()
366591
                .map(|entry| {
366591
                    if let Some(data) = entry.data() {
366591
                        let changes = compat::deserialize_executed_transaction_changes(data)?;
366591
                        Ok(Some(transaction::Executed {
366591
                            id: entry.id,
366591
                            changes,
366591
                        }))
                    } else {
                        Ok(None)
                    }
366591
                })
38600
                .filter_map(Result::transpose)
38600
                .collect::<Result<Vec<_>, Error>>()
38600
                .map_err(bonsaidb_core::Error::from)
        } else {
            // A request was made to return an empty result? This should probably be
            // an error, but technically this is a correct response.
148
            Ok(Vec::default())
        }
38748
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(
        level = "trace",
        skip(self),
        fields(
            database = self.name(),
        )
    ))]
603009
    fn last_transaction_id(&self) -> Result<Option<u64>, bonsaidb_core::Error> {
603009
        self.check_permission(
603009
            database_resource_name(self.name()),
603009
            &BonsaiAction::Database(DatabaseAction::Transaction(TransactionAction::GetLastId)),
603009
        )?;
603009
        Ok(self.roots().transactions().current_transaction_id())
603009
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(
        level = "trace",
        skip(self),
        fields(
            database = self.name(),
        )
    ))]
148
    fn compact(&self) -> Result<(), bonsaidb_core::Error> {
148
        self.check_permission(
148
            database_resource_name(self.name()),
148
            &BonsaiAction::Database(DatabaseAction::Compact),
148
        )?;
148
        self.storage()
148
            .instance
148
            .tasks()
148
            .compact_database(self.clone())?;
148
        Ok(())
148
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(
        level = "trace",
        skip(self),
        fields(
            database = self.name(),
        )
    ))]
148
    fn compact_key_value_store(&self) -> Result<(), bonsaidb_core::Error> {
148
        self.check_permission(
148
            kv_resource_name(self.name()),
148
            &BonsaiAction::Database(DatabaseAction::Compact),
148
        )?;
148
        self.storage()
148
            .instance
148
            .tasks()
148
            .compact_key_value_store(self.clone())?;
148
        Ok(())
148
    }
}

            
impl LowLevelConnection for Database {
    #[cfg_attr(feature = "tracing", tracing::instrument(
        level = "trace",
        skip(self,  transaction),
        fields(
            database = self.name(),
        )
    ))]
672996
    fn apply_transaction(
672996
        &self,
672996
        transaction: Transaction,
672996
    ) -> Result<Vec<OperationResult>, bonsaidb_core::Error> {
1720022
        for op in &transaction.operations {
1047170
            let (resource, action) = match &op.command {
780146
                Command::Insert { .. } => (
780146
                    collection_resource_name(self.name(), &op.collection),
780146
                    BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Insert)),
780146
                ),
204920
                Command::Update { header, .. } => (
204920
                    document_resource_name(self.name(), &op.collection, &header.id),
204920
                    BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Update)),
204920
                ),
592
                Command::Overwrite { id, .. } => (
592
                    document_resource_name(self.name(), &op.collection, id),
592
                    BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Overwrite)),
592
                ),
59760
                Command::Delete { header } => (
59760
                    document_resource_name(self.name(), &op.collection, &header.id),
59760
                    BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Delete)),
59760
                ),
1752
                Command::Check { id, .. } => (
1752
                    document_resource_name(self.name(), &op.collection, id),
1752
                    BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Get)),
1752
                ),
            };
1047170
            self.check_permission(resource, &action)?;
        }

            
672852
        let mut eager_view_tasks = Vec::new();
675912
        for collection_name in transaction
672852
            .operations
672852
            .iter()
1047026
            .map(|op| &op.collection)
672852
            .collect::<HashSet<_>>()
        {
675910
            for view in self.data.schema.eager_views_in_collection(collection_name) {
380100
                if let Some(task) = self
380100
                    .storage
380100
                    .instance
380100
                    .tasks()
380100
                    .spawn_integrity_check(view, self)
5917
                {
5917
                    eager_view_tasks.push(task);
374183
                }
            }
        }

            
672852
        let mut eager_view_mapping_tasks = Vec::new();
678769
        for task in eager_view_tasks {
5917
            if let Some(spawned_task) = task.receive().map_err(Error::from)?.map_err(Error::from)? {
5771
                eager_view_mapping_tasks.push(spawned_task);
5771
            }
        }

            
678623
        for task in eager_view_mapping_tasks {
5771
            let mut task = task.lock();
5771
            if let Some(task) = task.take() {
4706
                task.receive().map_err(Error::from)?.map_err(Error::from)?;
1065
            }
        }

            
672852
        self.apply_transaction_to_roots(&transaction)
672852
            .map_err(bonsaidb_core::Error::from)
672996
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(
        level = "trace",
        skip(self, collection),
        fields(
            database = self.name(),
            collection.name = collection.name.as_ref(),
            collection.authority = collection.authority.as_ref(),
        )
    ))]
578107
    fn get_from_collection(
578107
        &self,
578107
        id: DocumentId,
578107
        collection: &CollectionName,
578107
    ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error> {
578107
        self.check_permission(
578107
            document_resource_name(self.name(), collection, &id),
578107
            &BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Get)),
578107
        )?;
578107
        let tree = self
578107
            .data
578107
            .context
578107
            .roots
578107
            .tree(self.collection_tree::<Versioned, _>(collection, document_tree_name(collection))?)
578107
            .map_err(Error::from)?;
578107
        if let Some(vec) = tree.get(id.as_ref()).map_err(Error::from)? {
559554
            Ok(Some(deserialize_document(&vec)?.into_owned()))
        } else {
18552
            Ok(None)
        }
578107
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(
        level = "trace",
        skip(self, collection),
        fields(
            database = self.name(),
            collection.name = collection.name.as_ref(),
            collection.authority = collection.authority.as_ref(),
        )
    ))]
898
    fn list_from_collection(
898
        &self,
898
        ids: Range<DocumentId>,
898
        sort: Sort,
898
        limit: Option<u32>,
898
        collection: &CollectionName,
898
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
898
        self.check_permission(
898
            collection_resource_name(self.name(), collection),
898
            &BonsaiAction::Database(DatabaseAction::Document(DocumentAction::List)),
898
        )?;
898
        let tree = self
898
            .data
898
            .context
898
            .roots
898
            .tree(self.collection_tree::<Versioned, _>(collection, document_tree_name(collection))?)
898
            .map_err(Error::from)?;
898
        let mut found_docs = Vec::new();
898
        let mut keys_read = 0;
898
        let ids = DocumentIdRange(ids);
898
        tree.scan(
898
            &ids.borrow_as_bytes(),
898
            match sort {
750
                Sort::Ascending => true,
148
                Sort::Descending => false,
            },
24
            |_, _, _| ScanEvaluation::ReadData,
1315
            |_, _| {
1315
                if let Some(limit) = limit {
296
                    if keys_read >= limit {
148
                        return ScanEvaluation::Stop;
148
                    }
148

            
148
                    keys_read += 1;
1019
                }
1167
                ScanEvaluation::ReadData
1315
            },
1167
            |_, _, doc| {
1167
                found_docs.push(
1167
                    deserialize_document(&doc)
1167
                        .map(BorrowedDocument::into_owned)
1167
                        .map_err(AbortError::Other)?,
                );
1167
                Ok(())
1167
            },
        )
898
        .map_err(|err| match err {
            AbortError::Other(err) => err,
            AbortError::Nebari(err) => crate::Error::from(err),
898
        })?;

            
898
        Ok(found_docs)
898
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(
        level = "trace",
        skip(self, collection),
        fields(
            database = self.name(),
            collection.name = collection.name.as_ref(),
            collection.authority = collection.authority.as_ref(),
        )
    ))]
148
    fn list_headers_from_collection(
148
        &self,
148
        ids: Range<DocumentId>,
148
        sort: Sort,
148
        limit: Option<u32>,
148
        collection: &CollectionName,
148
    ) -> Result<Vec<Header>, bonsaidb_core::Error> {
148
        self.check_permission(
148
            collection_resource_name(self.name(), collection),
148
            &BonsaiAction::Database(DatabaseAction::Document(DocumentAction::ListHeaders)),
148
        )?;
148
        let tree = self
148
            .data
148
            .context
148
            .roots
148
            .tree(self.collection_tree::<Versioned, _>(collection, document_tree_name(collection))?)
148
            .map_err(Error::from)?;
148
        let mut found_headers = Vec::new();
148
        let mut keys_read = 0;
148
        let ids = DocumentIdRange(ids);
148
        tree.scan(
148
            &ids.borrow_as_bytes(),
148
            match sort {
148
                Sort::Ascending => true,
                Sort::Descending => false,
            },
            |_, _, _| ScanEvaluation::ReadData,
296
            |_, _| {
296
                if let Some(limit) = limit {
                    if keys_read >= limit {
                        return ScanEvaluation::Stop;
                    }

            
                    keys_read += 1;
296
                }
296
                ScanEvaluation::ReadData
296
            },
296
            |_, _, doc| {
296
                found_headers.push(
296
                    deserialize_document(&doc)
296
                        .map(|doc| doc.header)
296
                        .map_err(AbortError::Other)?,
                );
296
                Ok(())
296
            },
        )
148
        .map_err(|err| match err {
            AbortError::Other(err) => err,
            AbortError::Nebari(err) => crate::Error::from(err),
148
        })?;

            
148
        Ok(found_headers)
148
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(
        level = "trace",
        skip(self, collection),
        fields(
            database = self.name(),
            collection.name = collection.name.as_ref(),
            collection.authority = collection.authority.as_ref(),
        )
    ))]
296
    fn count_from_collection(
296
        &self,
296
        ids: Range<DocumentId>,
296
        collection: &CollectionName,
296
    ) -> Result<u64, bonsaidb_core::Error> {
296
        self.check_permission(
296
            collection_resource_name(self.name(), collection),
296
            &BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Count)),
296
        )?;
296
        let tree = self
296
            .data
296
            .context
296
            .roots
296
            .tree(self.collection_tree::<Versioned, _>(collection, document_tree_name(collection))?)
296
            .map_err(Error::from)?;
296
        let ids = DocumentIdRange(ids);
296
        let stats = tree.reduce(&ids.borrow_as_bytes()).map_err(Error::from)?;

            
296
        Ok(stats.alive_keys)
296
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(
        level = "trace",
        skip(self, collection),
        fields(
            database = self.name(),
            collection.name = collection.name.as_ref(),
            collection.authority = collection.authority.as_ref(),
        )
    ))]
342570
    fn get_multiple_from_collection(
342570
        &self,
342570
        ids: &[DocumentId],
342570
        collection: &CollectionName,
342570
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
683996
        for id in ids {
341426
            self.check_permission(
341426
                document_resource_name(self.name(), collection, id),
341426
                &BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Get)),
341426
            )?;
        }
342570
        let mut ids = ids.to_vec();
342570
        let collection = collection.clone();
342570
        let tree = self
342570
            .data
342570
            .context
342570
            .roots
342570
            .tree(
342570
                self.collection_tree::<Versioned, _>(&collection, document_tree_name(&collection))?,
            )
342570
            .map_err(Error::from)?;
342570
        ids.sort();
342570
        let keys_and_values = tree
342578
            .get_multiple(ids.iter().map(|id| id.as_ref()))
342570
            .map_err(Error::from)?;

            
342570
        keys_and_values
342570
            .into_iter()
342578
            .map(|(_, value)| deserialize_document(&value).map(BorrowedDocument::into_owned))
342570
            .collect::<Result<Vec<_>, Error>>()
342570
            .map_err(bonsaidb_core::Error::from)
342570
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(
        level = "trace",
        skip(self, collection),
        fields(
            database = self.name(),
            collection.name = collection.name.as_ref(),
            collection.authority = collection.authority.as_ref(),
        )
    ))]
148
    fn compact_collection_by_name(
148
        &self,
148
        collection: CollectionName,
148
    ) -> Result<(), bonsaidb_core::Error> {
148
        self.check_permission(
148
            collection_resource_name(self.name(), &collection),
148
            &BonsaiAction::Database(DatabaseAction::Compact),
148
        )?;
148
        self.storage()
148
            .instance
148
            .tasks()
148
            .compact_collection(self.clone(), collection)?;
148
        Ok(())
148
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(
        level = "trace",
        skip(self, view),
        fields(
            database = self.name(),
            view.collection.name = view.collection.name.as_ref(),
            view.collection.authority = view.collection.authority.as_ref(),
            view.name = view.name.as_ref(),
        )
    ))]
376440
    fn query_by_name(
376440
        &self,
376440
        view: &ViewName,
376440
        key: Option<SerializedQueryKey>,
376440
        order: Sort,
376440
        limit: Option<u32>,
376440
        access_policy: AccessPolicy,
376440
    ) -> Result<Vec<schema::view::map::Serialized>, bonsaidb_core::Error> {
376440
        let view = self.schematic().view_by_name(view)?;
376440
        self.check_permission(
376440
            view_resource_name(self.name(), &view.view_name()),
376440
            &BonsaiAction::Database(DatabaseAction::View(ViewAction::Query)),
376440
        )?;
376440
        let mut results = Vec::new();
376440
        self.for_each_in_view(view, key, order, limit, access_policy, |entry| {
739862
            for mapping in entry.mappings {
373899
                results.push(bonsaidb_core::schema::view::map::Serialized {
373899
                    source: mapping.source,
373899
                    key: entry.key.clone(),
373899
                    value: mapping.value,
373899
                });
373899
            }
365963
            Ok(())
376440
        })?;

            
376440
        Ok(results)
376440
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(
        level = "trace",
        skip(self, view),
        fields(
            database = self.name(),
            view.collection.name = view.collection.name.as_ref(),
            view.collection.authority = view.collection.authority.as_ref(),
            view.name = view.name.as_ref(),
        )
    ))]
    fn query_by_name_with_docs(
        &self,
        view: &ViewName,
        key: Option<SerializedQueryKey>,
        order: Sort,
        limit: Option<u32>,
        access_policy: AccessPolicy,
    ) -> Result<schema::view::map::MappedSerializedDocuments, bonsaidb_core::Error> {
        let results = self.query_by_name(view, key, order, limit, access_policy)?;
        let view = self.schematic().view_by_name(view).unwrap(); // query() will fail if it's not present

            
        let documents = self
            .get_multiple_from_collection(
                &results
                    .iter()
                    .map(|m| m.source.id.clone())
                    .collect::<Vec<_>>(),
                &view.collection(),
            )?
            .into_iter()
            .map(|doc| (doc.header.id.clone(), doc))
            .collect::<BTreeMap<_, _>>();

            
        Ok(
            bonsaidb_core::schema::view::map::MappedSerializedDocuments {
                mappings: results,
                documents,
            },
        )
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(
        level = "trace",
        skip(self, view_name),
        fields(
            database = self.name(),
            view.collection.name = view_name.collection.name.as_ref(),
            view.collection.authority = view_name.collection.authority.as_ref(),
            view.name = view_name.name.as_ref(),
        )
    ))]
663938
    fn reduce_by_name(
663938
        &self,
663938
        view_name: &ViewName,
663938
        key: Option<SerializedQueryKey>,
663938
        access_policy: AccessPolicy,
663938
    ) -> Result<Vec<u8>, bonsaidb_core::Error> {
663938
        let mut mappings = self.reduce_grouped_by_name(view_name, key, access_policy)?;

            
663938
        let result = if mappings.len() == 1 {
474738
            mappings.pop().unwrap().value.into_vec()
        } else {
189200
            let view = self.data.schema.view_by_name(view_name)?;
189200
            view.reduce(
189200
                &mappings
189200
                    .iter()
189200
                    .map(|map| (map.key.as_ref(), map.value.as_ref()))
189200
                    .collect::<Vec<_>>(),
189200
                true,
189200
            )
189200
            .map_err(Error::from)?
        };

            
663790
        Ok(result)
663938
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(
        level = "trace",
        skip(self, view_name),
        fields(
            database = self.name(),
            view.collection.name = view_name.collection.name.as_ref(),
            view.collection.authority = view_name.collection.authority.as_ref(),
            view.name = view_name.name.as_ref(),
        )
    ))]
664318
    fn reduce_grouped_by_name(
664318
        &self,
664318
        view_name: &ViewName,
664318
        key: Option<SerializedQueryKey>,
664318
        access_policy: AccessPolicy,
664318
    ) -> Result<Vec<MappedSerializedValue>, bonsaidb_core::Error> {
664318
        let view = self.data.schema.view_by_name(view_name)?;
664318
        self.check_permission(
664318
            view_resource_name(self.name(), &view.view_name()),
664318
            &BonsaiAction::Database(DatabaseAction::View(ViewAction::Reduce)),
664318
        )?;
664318
        let mut mappings = Vec::new();
664334
        self.for_each_in_view(view, key, Sort::Ascending, None, access_policy, |entry| {
481958
            mappings.push(MappedSerializedValue {
481958
                key: entry.key,
481958
                value: entry.reduced_value,
481958
            });
481958
            Ok(())
664334
        })?;

            
664318
        Ok(mappings)
664318
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(
        level = "trace",
        skip(self, view),
        fields(
            database = self.name(),
            view.collection.name = view.collection.name.as_ref(),
            view.collection.authority = view.collection.authority.as_ref(),
            view.name = view.name.as_ref(),
        )
    ))]
512
    fn delete_docs_by_name(
512
        &self,
512
        view: &ViewName,
512
        key: Option<SerializedQueryKey>,
512
        access_policy: AccessPolicy,
512
    ) -> Result<u64, bonsaidb_core::Error> {
512
        let view = self.data.schema.view_by_name(view)?;
512
        let collection = view.collection();
512
        let mut transaction = Transaction::default();
512
        self.for_each_in_view(view, key, Sort::Ascending, None, access_policy, |entry| {
3408
            for mapping in entry.mappings {
2968
                transaction.push(Operation::delete(collection.clone(), mapping.source));
2968
            }

            
440
            Ok(())
512
        })?;

            
512
        let results = LowLevelConnection::apply_transaction(self, transaction)?;

            
512
        Ok(results.len() as u64)
512
    }
}

            
impl HasSchema for Database {
27018
    fn schematic(&self) -> &Schematic {
27018
        &self.data.schema
27018
    }
}

            
type ViewIterator<'a> =
    Box<dyn Iterator<Item = Result<(ArcBytes<'static>, ArcBytes<'static>), Error>> + 'a>;

            
struct ViewEntryCollectionIterator<'a> {
    iterator: ViewIterator<'a>,
}

            
impl<'a> Iterator for ViewEntryCollectionIterator<'a> {
    type Item = Result<ViewEntry, crate::Error>;

            
    fn next(&mut self) -> Option<Self::Item> {
        self.iterator.next().map(|item| {
            item.map_err(crate::Error::from)
                .and_then(|(_, value)| bincode::deserialize(&value).map_err(Error::from))
        })
    }
}

            
2638878
#[derive(Debug, Clone)]
pub(crate) struct Context {
    data: Arc<ContextData>,
}

            
impl Deref for Context {
    type Target = ContextData;

            
4240846
    fn deref(&self) -> &Self::Target {
4240846
        &self.data
4240846
    }
}

            
#[derive(Debug)]
pub(crate) struct ContextData {
    pub(crate) roots: Roots<AnyFile>,
    key_value_state: Arc<Mutex<keyvalue::KeyValueState>>,
}

            
impl Borrow<Roots<AnyFile>> for Context {
    fn borrow(&self) -> &Roots<AnyFile> {
        &self.data.roots
    }
}

            
impl Context {
34897
    pub(crate) fn new(
34897
        roots: Roots<AnyFile>,
34897
        key_value_persistence: KeyValuePersistence,
34897
        storage_lock: Option<StorageLock>,
34897
    ) -> Self {
34897
        let background_worker_target = Watchable::new(BackgroundWorkerProcessTarget::Never);
34897
        let mut background_worker_target_watcher = background_worker_target.watch();
34897
        let key_value_state = Arc::new(Mutex::new(keyvalue::KeyValueState::new(
34897
            key_value_persistence,
34897
            roots.clone(),
34897
            background_worker_target,
34897
        )));
34897
        let background_worker_state = Arc::downgrade(&key_value_state);
34897
        let context = Self {
34897
            data: Arc::new(ContextData {
34897
                roots,
34897
                key_value_state,
34897
            }),
34897
        };
34897
        std::thread::Builder::new()
34897
            .name(String::from("keyvalue-worker"))
34897
            .spawn(move || {
34897
                keyvalue::background_worker(
34897
                    &background_worker_state,
34897
                    &mut background_worker_target_watcher,
34897
                    storage_lock,
34897
                );
34897
            })
34897
            .unwrap();
34897
        context
34897
    }

            
1496682
    pub(crate) fn perform_kv_operation(
1496682
        &self,
1496682
        op: KeyOperation,
1496682
    ) -> Result<Output, bonsaidb_core::Error> {
1496682
        let mut state = self.data.key_value_state.lock();
1496682
        state.perform_kv_operation(op, &self.data.key_value_state)
1496682
    }

            
14
    pub(crate) fn update_key_expiration<'key>(
14
        &self,
14
        tree_key: impl Into<Cow<'key, str>>,
14
        expiration: Option<Timestamp>,
14
    ) {
14
        let mut state = self.data.key_value_state.lock();
14
        state.update_key_expiration(tree_key, expiration);
14
    }

            
    #[cfg(test)]
5
    pub(crate) fn kv_persistence_watcher(&self) -> watchable::Watcher<Timestamp> {
5
        let state = self.data.key_value_state.lock();
5
        state.persistence_watcher()
5
    }
}

            
impl Drop for ContextData {
31045
    fn drop(&mut self) {
31045
        if let Some(shutdown) = {
31045
            let mut state = self.key_value_state.lock();
31045
            state.shutdown(&self.key_value_state)
31045
        } {
362
            let _: Result<_, _> = shutdown.recv();
30683
        }
31045
    }
}

            
3686574
pub fn document_tree_name(collection: &CollectionName) -> String {
3686574
    format!("collection.{collection:#}")
3686574
}

            
pub struct DocumentIdRange(Range<DocumentId>);

            
impl<'a> BorrowByteRange<'a> for DocumentIdRange {
1342
    fn borrow_as_bytes(&'a self) -> BorrowedRange<'a> {
1342
        BorrowedRange {
1342
            start: match &self.0.start {
602
                connection::Bound::Unbounded => ops::Bound::Unbounded,
740
                connection::Bound::Included(docid) => ops::Bound::Included(docid.as_ref()),
                connection::Bound::Excluded(docid) => ops::Bound::Excluded(docid.as_ref()),
            },
1342
            end: match &self.0.end {
602
                connection::Bound::Unbounded => ops::Bound::Unbounded,
592
                connection::Bound::Included(docid) => ops::Bound::Included(docid.as_ref()),
148
                connection::Bound::Excluded(docid) => ops::Bound::Excluded(docid.as_ref()),
            },
        }
1342
    }
}

            
/// Operations that can be performed on both [`Database`] and
/// [`AsyncDatabase`](crate::AsyncDatabase).
pub trait DatabaseNonBlocking {
    /// Returns the name of the database.
    #[must_use]
    fn name(&self) -> &str;
}

            
impl DatabaseNonBlocking for Database {
5165710
    fn name(&self) -> &str {
5165710
        self.data.name.as_ref()
5165710
    }
}