1
use std::{
2
    borrow::{Borrow, Cow},
3
    collections::{BTreeMap, HashMap, HashSet},
4
    convert::Infallible,
5
    ops::{self, Deref},
6
    sync::Arc,
7
    u8,
8
};
9

            
10
#[cfg(any(feature = "encryption", feature = "compression"))]
11
use bonsaidb_core::document::KeyId;
12
use bonsaidb_core::{
13
    arc_bytes::{
14
        serde::{Bytes, CowBytes},
15
        ArcBytes,
16
    },
17
    connection::{
18
        self, AccessPolicy, Connection, HasSession, LowLevelConnection, QueryKey, Range, Session,
19
        Sort, StorageConnection,
20
    },
21
    document::{BorrowedDocument, DocumentId, Header, OwnedDocument, Revision},
22
    key::Key,
23
    keyvalue::{KeyOperation, Output, Timestamp},
24
    limits::{LIST_TRANSACTIONS_DEFAULT_RESULT_COUNT, LIST_TRANSACTIONS_MAX_RESULTS},
25
    permissions::{
26
        bonsai::{
27
            collection_resource_name, database_resource_name, document_resource_name,
28
            kv_resource_name, view_resource_name, BonsaiAction, DatabaseAction, DocumentAction,
29
            TransactionAction, ViewAction,
30
        },
31
        Permissions,
32
    },
33
    schema::{
34
        self,
35
        view::{self, map::MappedSerializedValue},
36
        CollectionName, Schema, Schematic, ViewName,
37
    },
38
    transaction::{
39
        self, ChangedDocument, Changes, Command, DocumentChanges, Operation, OperationResult,
40
        Transaction,
41
    },
42
};
43
use itertools::Itertools;
44
use nebari::{
45
    io::any::AnyFile,
46
    tree::{
47
        AnyTreeRoot, BorrowByteRange, BorrowedRange, CompareSwap, Root, ScanEvaluation, TreeRoot,
48
        Unversioned, Versioned,
49
    },
50
    AbortError, ExecutingTransaction, Roots, Tree,
51
};
52
use parking_lot::Mutex;
53
use serde::{Deserialize, Serialize};
54
use watchable::Watchable;
55

            
56
#[cfg(feature = "encryption")]
57
use crate::storage::TreeVault;
58
use crate::{
59
    config::{Builder, KeyValuePersistence, StorageConfiguration},
60
    database::keyvalue::BackgroundWorkerProcessTarget,
61
    error::Error,
62
    open_trees::OpenTrees,
63
    views::{
64
        mapper, view_document_map_tree_name, view_entries_tree_name,
65
        view_invalidated_docs_tree_name, ViewEntry,
66
    },
67
    Storage,
68
};
69

            
70
pub mod keyvalue;
71

            
72
pub(crate) mod compat;
73
pub mod pubsub;
74

            
75
/// A database stored in BonsaiDb. This type blocks the current thread when
76
/// used. See [`AsyncDatabase`](crate::AsyncDatabase) for this type's async counterpart.
77
///
78
/// ## Converting between Blocking and Async Types
79
///
80
/// [`AsyncDatabase`](crate::AsyncDatabase) and [`Database`] can be converted to and from each other
81
/// using:
82
///
83
/// - [`AsyncDatabase::into_blocking()`](crate::AsyncDatabase::into_blocking)
84
/// - [`AsyncDatabase::to_blocking()`](crate::AsyncDatabase::to_blocking)
85
/// - [`AsyncDatabase::as_blocking()`](crate::AsyncDatabase::as_blocking)
86
/// - [`Database::into_async()`]
87
/// - [`Database::to_async()`]
88
/// - [`Database::into_async_with_runtime()`]
89
/// - [`Database::to_async_with_runtime()`]
90
///
91
/// ## Using `Database` to create a single database
92
///
93
/// `Database`provides an easy mechanism to open and access a single database:
94
///
95
/// ```rust
96
/// // `bonsaidb_core` is re-exported to `bonsaidb::core` or `bonsaidb_local::core`.
97
/// use bonsaidb_core::schema::Collection;
98
/// // `bonsaidb_local` is re-exported to `bonsaidb::local` if using the omnibus crate.
99
/// use bonsaidb_local::{
100
///     config::{Builder, StorageConfiguration},
101
///     Database,
102
/// };
103
/// use serde::{Deserialize, Serialize};
104
///
105
/// #[derive(Debug, Serialize, Deserialize, Collection)]
106
/// #[collection(name = "blog-posts")]
107
/// # #[collection(core = bonsaidb_core)]
108
/// struct BlogPost {
109
///     pub title: String,
110
///     pub contents: String,
111
/// }
112
///
113
/// # fn test() -> Result<(), bonsaidb_local::Error> {
114
/// let db = Database::open::<BlogPost>(StorageConfiguration::new("my-db.bonsaidb"))?;
115
/// # Ok(())
116
/// # }
117
/// ```
118
///
119
/// Under the hood, this initializes a [`Storage`] instance pointing at
120
/// "./my-db.bonsaidb". It then returns (or creates) a database named "default"
121
/// with the schema `BlogPost`.
122
///
123
/// In this example, `BlogPost` implements the [`Collection`](schema::Collection) trait, and all
124
/// collections can be used as a [`Schema`].
125
3716132
#[derive(Debug, Clone)]
126
pub struct Database {
127
    pub(crate) data: Arc<Data>,
128
    pub(crate) storage: Storage,
129
}
130

            
131
#[derive(Debug)]
132
pub struct Data {
133
    pub name: Arc<Cow<'static, str>>,
134
    context: Context,
135
    pub(crate) schema: Arc<Schematic>,
136
}
137

            
138
impl Database {
139
    /// Opens a local file as a bonsaidb.
140
122558
    pub(crate) fn new<DB: Schema, S: Into<Cow<'static, str>> + Send>(
141
122558
        name: S,
142
122558
        context: Context,
143
122558
        storage: &Storage,
144
122558
    ) -> Result<Self, Error> {
145
122558
        let name = name.into();
146
122558
        let schema = Arc::new(DB::schematic()?);
147
122558
        let db = Self {
148
122558
            storage: storage.clone(),
149
122558
            data: Arc::new(Data {
150
122558
                name: Arc::new(name),
151
122558
                context,
152
122558
                schema,
153
122558
            }),
154
122558
        };
155
122558

            
156
122558
        if storage.instance.check_view_integrity_on_database_open() {
157
16
            for view in db.data.schema.views() {
158
16
                storage.instance.tasks().spawn_integrity_check(view, &db);
159
16
            }
160
122554
        }
161

            
162
122558
        storage
163
122558
            .instance
164
122558
            .tasks()
165
122558
            .spawn_key_value_expiration_loader(&db);
166
122558

            
167
122558
        Ok(db)
168
122558
    }
169

            
170
    /// Restricts an unauthenticated instance to having `effective_permissions`.
171
    /// Returns `None` if a session has already been established.
172
    #[must_use]
173
    pub fn with_effective_permissions(&self, effective_permissions: Permissions) -> Option<Self> {
174
        self.storage
175
            .with_effective_permissions(effective_permissions)
176
            .map(|storage| Self {
177
                storage,
178
                data: self.data.clone(),
179
            })
180
    }
181

            
182
    /// Creates a `Storage` with a single-database named "default" with its data
183
    /// stored at `path`. This requires exclusive access to the storage location
184
    /// configured. Attempting to open the same path multiple times concurrently
185
    /// will lead to errors.
186
    ///
187
    /// Using this method is perfect if only one database is being used.
188
    /// However, if multiple databases are needed, it is much better to store
189
    /// multiple databases in a single [`Storage`] instance rather than creating
190
    /// multiple independent databases using this method.
191
    ///
192
    /// When opening multiple databases using this function, each database will
193
    /// have its own thread pool, cache, task worker pool, and more. By using a
194
    /// single [`Storage`] instance, BonsaiDb will use less resources and likely
195
    /// perform better.
196
29
    pub fn open<DB: Schema>(configuration: StorageConfiguration) -> Result<Self, Error> {
197
29
        let storage = Storage::open(configuration.with_schema::<DB>()?)?;
198

            
199
29
        Ok(storage.create_database::<DB>("default", true)?)
200
29
    }
201

            
202
    /// Returns the [`Schematic`] for the schema for this database.
203
    #[must_use]
204
3857915
    pub fn schematic(&self) -> &'_ Schematic {
205
3857915
        &self.data.schema
206
3857915
    }
207

            
208
2046311
    pub(crate) fn roots(&self) -> &'_ nebari::Roots<AnyFile> {
209
2046311
        &self.data.context.roots
210
2046311
    }
211

            
212
838370
    fn for_each_in_view<F: FnMut(ViewEntry) -> Result<(), bonsaidb_core::Error> + Send + Sync>(
213
838370
        &self,
214
838370
        view: &dyn view::Serialized,
215
838370
        key: Option<QueryKey<Bytes>>,
216
838370
        order: Sort,
217
838370
        limit: Option<u32>,
218
838370
        access_policy: AccessPolicy,
219
838370
        mut callback: F,
220
838370
    ) -> Result<(), bonsaidb_core::Error> {
221
838370
        if matches!(access_policy, AccessPolicy::UpdateBefore) {
222
309276
            self.storage
223
309276
                .instance
224
309276
                .tasks()
225
309276
                .update_view_if_needed(view, self)?;
226
529094
        }
227

            
228
838370
        let view_entries = self
229
838370
            .roots()
230
838370
            .tree(self.collection_tree(
231
838370
                &view.collection(),
232
838370
                view_entries_tree_name(&view.view_name()),
233
838370
            )?)
234
838370
            .map_err(Error::from)?;
235

            
236
        {
237
838370
            for entry in Self::create_view_iterator(&view_entries, key, order, limit)? {
238
539043
                callback(entry)?;
239
            }
240
        }
241

            
242
838370
        if matches!(access_policy, AccessPolicy::UpdateAfter) {
243
124
            let db = self.clone();
244
124
            let view_name = view.view_name();
245
124
            let view = db
246
124
                .data
247
124
                .schema
248
124
                .view_by_name(&view_name)
249
124
                .expect("query made with view that isn't registered with this database");
250
124
            db.storage
251
124
                .instance
252
124
                .tasks()
253
124
                .update_view_if_needed(view, &db)?;
254
838246
        }
255

            
256
838370
        Ok(())
257
838370
    }
258

            
259
546460
    fn apply_transaction_to_roots(
260
546460
        &self,
261
546460
        transaction: &Transaction,
262
546460
    ) -> Result<Vec<OperationResult>, Error> {
263
546460
        let mut open_trees = OpenTrees::default();
264
1432038
        for op in &transaction.operations {
265
885702
            if !self.data.schema.contains_collection_name(&op.collection) {
266
124
                return Err(Error::Core(bonsaidb_core::Error::CollectionNotFound));
267
885578
            }
268

            
269
            #[cfg(any(feature = "encryption", feature = "compression"))]
270
885578
            let vault = if let Some(encryption_key) =
271
885578
                self.collection_encryption_key(&op.collection).cloned()
272
            {
273
                #[cfg(feature = "encryption")]
274
3535
                if let Some(mut vault) = self.storage().tree_vault().cloned() {
275
2094
                    vault.key = Some(encryption_key);
276
2094
                    Some(vault)
277
                } else {
278
1441
                    TreeVault::new_if_needed(
279
1441
                        Some(encryption_key),
280
1441
                        self.storage().vault(),
281
1441
                        #[cfg(feature = "compression")]
282
1441
                        None,
283
1441
                    )
284
                }
285

            
286
                #[cfg(not(feature = "encryption"))]
287
                {
288
                    drop(encryption_key);
289
                    return Err(Error::EncryptionDisabled);
290
                }
291
            } else {
292
882043
                self.storage().tree_vault().cloned()
293
            };
294

            
295
885578
            open_trees.open_trees_for_document_change(
296
885578
                &op.collection,
297
885578
                &self.data.schema,
298
885578
                #[cfg(any(feature = "encryption", feature = "compression"))]
299
885578
                vault,
300
885578
            );
301
        }
302

            
303
546336
        let mut roots_transaction = self
304
546336
            .data
305
546336
            .context
306
546336
            .roots
307
546336
            .transaction::<_, dyn AnyTreeRoot<AnyFile>>(&open_trees.trees)?;
308

            
309
546336
        let mut results = Vec::new();
310
546336
        let mut changed_documents = Vec::new();
311
546336
        let mut collection_indexes = HashMap::new();
312
546336
        let mut collections = Vec::new();
313
1415953
        for op in &transaction.operations {
314
885578
            let result = self.execute_operation(
315
885578
                op,
316
885578
                &mut roots_transaction,
317
885578
                &open_trees.trees_index_by_name,
318
885578
            )?;
319

            
320
869617
            if let Some((collection, id, deleted)) = match &result {
321
825219
                OperationResult::DocumentUpdated { header, collection } => {
322
825219
                    Some((collection, header.id, false))
323
                }
324
44398
                OperationResult::DocumentDeleted { id, collection } => {
325
44398
                    Some((collection, *id, true))
326
                }
327
                OperationResult::Success => None,
328
869617
            } {
329
869617
                let collection = match collection_indexes.get(collection) {
330
337354
                    Some(index) => *index,
331
                    None => {
332
532263
                        if let Ok(id) = u16::try_from(collections.len()) {
333
532263
                            collection_indexes.insert(collection.clone(), id);
334
532263
                            collections.push(collection.clone());
335
532263
                            id
336
                        } else {
337
                            return Err(Error::TransactionTooLarge);
338
                        }
339
                    }
340
                };
341
869617
                changed_documents.push(ChangedDocument {
342
869617
                    collection,
343
869617
                    id,
344
869617
                    deleted,
345
869617
                });
346
            }
347
869617
            results.push(result);
348
        }
349

            
350
530375
        self.invalidate_changed_documents(
351
530375
            &mut roots_transaction,
352
530375
            &open_trees,
353
530375
            &collections,
354
530375
            &changed_documents,
355
530375
        )?;
356

            
357
530375
        roots_transaction
358
530375
            .entry_mut()
359
530375
            .set_data(compat::serialize_executed_transaction_changes(
360
530375
                &Changes::Documents(DocumentChanges {
361
530375
                    collections,
362
530375
                    documents: changed_documents,
363
530375
                }),
364
530375
            )?)?;
365

            
366
530375
        roots_transaction.commit()?;
367

            
368
530375
        Ok(results)
369
546460
    }
370

            
371
530375
    fn invalidate_changed_documents(
372
530375
        &self,
373
530375
        roots_transaction: &mut ExecutingTransaction<AnyFile>,
374
530375
        open_trees: &OpenTrees,
375
530375
        collections: &[CollectionName],
376
530375
        changed_documents: &[ChangedDocument],
377
530375
    ) -> Result<(), Error> {
378
532265
        for (collection, changed_documents) in &changed_documents
379
530375
            .iter()
380
869617
            .group_by(|doc| &collections[usize::from(doc.collection)])
381
        {
382
532263
            if let Some(views) = self.data.schema.views_in_collection(collection) {
383
313052
                let changed_documents = changed_documents.collect::<Vec<_>>();
384
1062856
                for view in views.into_iter().filter(|view| !view.unique()) {
385
1016779
                    let view_name = view.view_name();
386
1016779
                    let tree_name = view_invalidated_docs_tree_name(&view_name);
387
2120874
                    for changed_document in &changed_documents {
388
1104065
                        let mut invalidated_docs = roots_transaction
389
1104065
                            .tree::<Unversioned>(open_trees.trees_index_by_name[&tree_name])
390
1104065
                            .unwrap();
391
1104065
                        invalidated_docs.set(changed_document.id.as_ref().to_vec(), b"")?;
392
                    }
393
                }
394
219211
            }
395
        }
396
530375
        Ok(())
397
530375
    }
398

            
399
885578
    fn execute_operation(
400
885578
        &self,
401
885578
        operation: &Operation,
402
885578
        transaction: &mut ExecutingTransaction<AnyFile>,
403
885578
        tree_index_map: &HashMap<String, usize>,
404
885578
    ) -> Result<OperationResult, Error> {
405
885578
        match &operation.command {
406
658074
            Command::Insert { id, contents } => {
407
658074
                self.execute_insert(operation, transaction, tree_index_map, *id, contents)
408
            }
409
167828
            Command::Update { header, contents } => self.execute_update(
410
167828
                operation,
411
167828
                transaction,
412
167828
                tree_index_map,
413
167828
                header.id,
414
167828
                Some(&header.revision),
415
167828
                contents,
416
167828
            ),
417
15278
            Command::Overwrite { id, contents } => {
418
15278
                self.execute_update(operation, transaction, tree_index_map, *id, None, contents)
419
            }
420
44398
            Command::Delete { header } => {
421
44398
                self.execute_delete(operation, transaction, tree_index_map, header)
422
            }
423
        }
424
885578
    }
425

            
426
183106
    fn execute_update(
427
183106
        &self,
428
183106
        operation: &Operation,
429
183106
        transaction: &mut ExecutingTransaction<AnyFile>,
430
183106
        tree_index_map: &HashMap<String, usize>,
431
183106
        id: DocumentId,
432
183106
        check_revision: Option<&Revision>,
433
183106
        contents: &[u8],
434
183106
    ) -> Result<OperationResult, crate::Error> {
435
183106
        let mut documents = transaction
436
183106
            .tree::<Versioned>(tree_index_map[&document_tree_name(&operation.collection)])
437
183106
            .unwrap();
438
183106
        let document_id = ArcBytes::from(id.to_vec());
439
183106
        let mut result = None;
440
183106
        let mut updated = false;
441
183106
        documents.modify(
442
183106
            vec![document_id.clone()],
443
183106
            nebari::tree::Operation::CompareSwap(CompareSwap::new(&mut |_key,
444
                                                                        value: Option<
445
                ArcBytes<'_>,
446
            >| {
447
183106
                if let Some(old) = value {
448
182982
                    let doc = match deserialize_document(&old) {
449
182982
                        Ok(doc) => doc,
450
                        Err(err) => {
451
                            result = Some(Err(err));
452
                            return nebari::tree::KeyOperation::Skip;
453
                        }
454
                    };
455
182982
                    if check_revision.is_none() || Some(&doc.header.revision) == check_revision {
456
182858
                        if let Some(updated_revision) = doc.header.revision.next_revision(contents)
457
                        {
458
182610
                            let updated_header = Header {
459
182610
                                id,
460
182610
                                revision: updated_revision,
461
182610
                            };
462
182610
                            let serialized_doc = match serialize_document(&BorrowedDocument {
463
182610
                                header: updated_header.clone(),
464
182610
                                contents: CowBytes::from(contents),
465
182610
                            }) {
466
182610
                                Ok(bytes) => bytes,
467
                                Err(err) => {
468
                                    result = Some(Err(Error::from(err)));
469
                                    return nebari::tree::KeyOperation::Skip;
470
                                }
471
                            };
472
182610
                            result = Some(Ok(OperationResult::DocumentUpdated {
473
182610
                                collection: operation.collection.clone(),
474
182610
                                header: updated_header,
475
182610
                            }));
476
182610
                            updated = true;
477
182610
                            return nebari::tree::KeyOperation::Set(ArcBytes::from(serialized_doc));
478
248
                        }
479
248

            
480
248
                        // If no new revision was made, it means an attempt to
481
248
                        // save a document with the same contents was made.
482
248
                        // We'll return a success but not actually give a new
483
248
                        // version
484
248
                        result = Some(Ok(OperationResult::DocumentUpdated {
485
248
                            collection: operation.collection.clone(),
486
248
                            header: doc.header,
487
248
                        }));
488
124
                    } else {
489
124
                        result = Some(Err(Error::Core(bonsaidb_core::Error::DocumentConflict(
490
124
                            operation.collection.clone(),
491
124
                            Box::new(doc.header),
492
124
                        ))));
493
124
                    }
494
124
                } else if check_revision.is_none() {
495
                    let doc = BorrowedDocument::new(id, contents);
496
                    match serialize_document(&doc).map(|bytes| (doc, bytes)) {
497
                        Ok((doc, serialized)) => {
498
                            result = Some(Ok(OperationResult::DocumentUpdated {
499
                                collection: operation.collection.clone(),
500
                                header: doc.header,
501
                            }));
502
                            return nebari::tree::KeyOperation::Set(ArcBytes::from(serialized));
503
                        }
504
                        Err(err) => {
505
                            result = Some(Err(Error::from(err)));
506
                        }
507
                    }
508
124
                } else {
509
124
                    result = Some(Err(Error::Core(bonsaidb_core::Error::DocumentNotFound(
510
124
                        operation.collection.clone(),
511
124
                        Box::new(id),
512
124
                    ))));
513
124
                }
514
496
                nebari::tree::KeyOperation::Skip
515
183106
            })),
516
183106
        )?;
517
183106
        drop(documents);
518
183106

            
519
183106
        if updated {
520
182610
            self.update_unique_views(&document_id, operation, transaction, tree_index_map)?;
521
496
        }
522

            
523
182858
        result.expect("nebari should invoke the callback even when the key isn't found")
524
183106
    }
525

            
526
658074
    fn execute_insert(
527
658074
        &self,
528
658074
        operation: &Operation,
529
658074
        transaction: &mut ExecutingTransaction<AnyFile>,
530
658074
        tree_index_map: &HashMap<String, usize>,
531
658074
        id: Option<DocumentId>,
532
658074
        contents: &[u8],
533
658074
    ) -> Result<OperationResult, Error> {
534
658074
        let mut documents = transaction
535
658074
            .tree::<Versioned>(tree_index_map[&document_tree_name(&operation.collection)])
536
658074
            .unwrap();
537
658074
        let id = if let Some(id) = id {
538
352454
            id
539
305620
        } else if let Some(last_key) = documents.last_key()? {
540
281897
            let id = DocumentId::try_from(last_key.as_slice())?;
541
281897
            self.data
542
281897
                .schema
543
281897
                .next_id_for_collection(&operation.collection, Some(id))?
544
        } else {
545
23723
            self.data
546
23723
                .schema
547
23723
                .next_id_for_collection(&operation.collection, None)?
548
        };
549

            
550
658044
        let doc = BorrowedDocument::new(id, contents);
551
658044
        let serialized: Vec<u8> = serialize_document(&doc)?;
552
658044
        let document_id = ArcBytes::from(doc.header.id.as_ref().to_vec());
553
658044
        if let Some(document) = documents.replace(document_id.clone(), serialized)? {
554
15155
            let doc = deserialize_document(&document)?;
555
15155
            Err(Error::Core(bonsaidb_core::Error::DocumentConflict(
556
15155
                operation.collection.clone(),
557
15155
                Box::new(doc.header),
558
15155
            )))
559
        } else {
560
642889
            drop(documents);
561
642889
            self.update_unique_views(&document_id, operation, transaction, tree_index_map)?;
562

            
563
642609
            Ok(OperationResult::DocumentUpdated {
564
642609
                collection: operation.collection.clone(),
565
642609
                header: doc.header,
566
642609
            })
567
        }
568
658074
    }
569

            
570
44398
    fn execute_delete(
571
44398
        &self,
572
44398
        operation: &Operation,
573
44398
        transaction: &mut ExecutingTransaction<AnyFile>,
574
44398
        tree_index_map: &HashMap<String, usize>,
575
44398
        header: &Header,
576
44398
    ) -> Result<OperationResult, Error> {
577
44398
        let mut documents = transaction
578
44398
            .tree::<Versioned>(tree_index_map[&document_tree_name(&operation.collection)])
579
44398
            .unwrap();
580
44398
        if let Some(vec) = documents.remove(header.id.as_ref())? {
581
44398
            drop(documents);
582
44398
            let doc = deserialize_document(&vec)?;
583
44398
            if &doc.header == header {
584
44398
                self.update_unique_views(
585
44398
                    &ArcBytes::from(doc.header.id.to_vec()),
586
44398
                    operation,
587
44398
                    transaction,
588
44398
                    tree_index_map,
589
44398
                )?;
590

            
591
44398
                Ok(OperationResult::DocumentDeleted {
592
44398
                    collection: operation.collection.clone(),
593
44398
                    id: header.id,
594
44398
                })
595
            } else {
596
                Err(Error::Core(bonsaidb_core::Error::DocumentConflict(
597
                    operation.collection.clone(),
598
                    Box::new(header.clone()),
599
                )))
600
            }
601
        } else {
602
            Err(Error::Core(bonsaidb_core::Error::DocumentNotFound(
603
                operation.collection.clone(),
604
                Box::new(header.id),
605
            )))
606
        }
607
44398
    }
608

            
609
    fn update_unique_views(
610
        &self,
611
        document_id: &ArcBytes<'static>,
612
        operation: &Operation,
613
        transaction: &mut ExecutingTransaction<AnyFile>,
614
        tree_index_map: &HashMap<String, usize>,
615
    ) -> Result<(), Error> {
616
869897
        if let Some(unique_views) = self
617
869897
            .data
618
869897
            .schema
619
869897
            .unique_views_in_collection(&operation.collection)
620
        {
621
101085
            let documents = transaction
622
101085
                .unlocked_tree(tree_index_map[&document_tree_name(&operation.collection)])
623
101085
                .unwrap();
624
201642
            for view in unique_views {
625
101085
                let name = view.view_name();
626
101085
                let document_map = transaction
627
101085
                    .unlocked_tree(tree_index_map[&view_document_map_tree_name(&name)])
628
101085
                    .unwrap();
629
101085
                let view_entries = transaction
630
101085
                    .unlocked_tree(tree_index_map[&view_entries_tree_name(&name)])
631
101085
                    .unwrap();
632
101085
                mapper::DocumentRequest {
633
101085
                    database: self,
634
101085
                    document_ids: vec![document_id.clone()],
635
101085
                    map_request: &mapper::Map {
636
101085
                        database: self.data.name.clone(),
637
101085
                        collection: operation.collection.clone(),
638
101085
                        view_name: name.clone(),
639
101085
                    },
640
101085
                    document_map,
641
101085
                    documents,
642
101085
                    view_entries,
643
101085
                    view,
644
101085
                }
645
101085
                .map()?;
646
            }
647
768812
        }
648

            
649
869369
        Ok(())
650
869897
    }
651

            
652
838370
    fn create_view_iterator<'a, K: for<'k> Key<'k> + 'a>(
653
838370
        view_entries: &'a Tree<Unversioned, AnyFile>,
654
838370
        key: Option<QueryKey<K>>,
655
838370
        order: Sort,
656
838370
        limit: Option<u32>,
657
838370
    ) -> Result<Vec<ViewEntry>, Error> {
658
838370
        let mut values = Vec::new();
659
838370
        let forwards = match order {
660
838246
            Sort::Ascending => true,
661
124
            Sort::Descending => false,
662
        };
663
838370
        let mut values_read = 0;
664
838370
        if let Some(key) = key {
665
833623
            match key {
666
398
                QueryKey::Range(range) => {
667
398
                    let range = range
668
398
                        .as_ord_bytes()
669
398
                        .map_err(view::Error::key_serialization)?;
670
398
                    view_entries.scan::<Infallible, _, _, _, _>(
671
796
                        &range.map_ref(|bytes| &bytes[..]),
672
398
                        forwards,
673
668
                        |_, _, _| ScanEvaluation::ReadData,
674
398
                        |_, _| {
675
2026
                            if let Some(limit) = limit {
676
248
                                if values_read >= limit {
677
124
                                    return ScanEvaluation::Stop;
678
124
                                }
679
124
                                values_read += 1;
680
1778
                            }
681
1902
                            ScanEvaluation::ReadData
682
2026
                        },
683
1902
                        |_key, _index, value| {
684
1902
                            values.push(value);
685
1902
                            Ok(())
686
1902
                        },
687
398
                    )?;
688
                }
689
832917
                QueryKey::Matches(key) => {
690
832917
                    let key = key
691
832917
                        .as_ord_bytes()
692
832917
                        .map_err(view::Error::key_serialization)?
693
832917
                        .to_vec();
694
832917

            
695
832917
                    values.extend(view_entries.get(&key)?);
696
                }
697
308
                QueryKey::Multiple(list) => {
698
308
                    let mut list = list
699
308
                        .into_iter()
700
616
                        .map(|key| {
701
616
                            key.as_ord_bytes()
702
616
                                .map(|bytes| bytes.to_vec())
703
616
                                .map_err(view::Error::key_serialization)
704
616
                        })
705
308
                        .collect::<Result<Vec<_>, _>>()?;
706

            
707
308
                    list.sort();
708
308

            
709
308
                    values.extend(
710
308
                        view_entries
711
308
                            .get_multiple(list.iter().map(Vec::as_slice))?
712
308
                            .into_iter()
713
616
                            .map(|(_, value)| value),
714
                    );
715
                }
716
            }
717
        } else {
718
4747
            view_entries.scan::<Infallible, _, _, _, _>(
719
4747
                &(..),
720
4747
                forwards,
721
4747
                |_, _, _| ScanEvaluation::ReadData,
722
4747
                |_, _| {
723
6108
                    if let Some(limit) = limit {
724
                        if values_read >= limit {
725
                            return ScanEvaluation::Stop;
726
                        }
727
                        values_read += 1;
728
6108
                    }
729
6108
                    ScanEvaluation::ReadData
730
6187
                },
731
6187
                |_, _, value| {
732
6108
                    values.push(value);
733
6108
                    Ok(())
734
6187
                },
735
4747
            )?;
736
        }
737

            
738
838370
        values
739
838370
            .into_iter()
740
838370
            .map(|value| bincode::deserialize(&value).map_err(Error::from))
741
838370
            .collect::<Result<Vec<_>, Error>>()
742
838370
    }
743

            
744
    #[cfg(any(feature = "encryption", feature = "compression"))]
745
3146785
    pub(crate) fn collection_encryption_key(&self, collection: &CollectionName) -> Option<&KeyId> {
746
3146785
        self.schematic()
747
3146785
            .encryption_key_for_collection(collection)
748
3146785
            .or_else(|| self.storage.default_encryption_key())
749
3146785
    }
750

            
751
    #[cfg_attr(
752
        not(feature = "encryption"),
753
        allow(
754
            unused_mut,
755
            unused_variables,
756
            clippy::unused_self,
757
            clippy::let_and_return
758
        )
759
    )]
760
    #[allow(clippy::unnecessary_wraps)]
761
2261207
    pub(crate) fn collection_tree<R: Root, S: Into<Cow<'static, str>>>(
762
2261207
        &self,
763
2261207
        collection: &CollectionName,
764
2261207
        name: S,
765
2261207
    ) -> Result<TreeRoot<R, AnyFile>, Error> {
766
2261207
        let mut tree = R::tree(name);
767
2261207

            
768
2261207
        #[cfg(any(feature = "encryption", feature = "compression"))]
769
2261207
        match (
770
2261207
            self.collection_encryption_key(collection),
771
2261207
            self.storage().tree_vault().cloned(),
772
        ) {
773
31229
            (Some(override_key), Some(mut vault)) => {
774
31229
                #[cfg(feature = "encryption")]
775
31229
                {
776
31229
                    vault.key = Some(override_key.clone());
777
31229
                    tree = tree.with_vault(vault);
778
31229
                }
779

            
780
                #[cfg(not(feature = "encryption"))]
781
                {
782
                    return Err(Error::EncryptionDisabled);
783
                }
784
            }
785
540292
            (None, Some(vault)) => {
786
540292
                tree = tree.with_vault(vault);
787
540292
            }
788
1689686
            (key, None) => {
789
                #[cfg(feature = "encryption")]
790
1689686
                if let Some(vault) = TreeVault::new_if_needed(
791
1689686
                    key.cloned(),
792
1689686
                    self.storage().vault(),
793
1689686
                    #[cfg(feature = "compression")]
794
1689686
                    None,
795
1689686
                ) {
796
33093
                    tree = tree.with_vault(vault);
797
1656593
                }
798

            
799
                #[cfg(not(feature = "encryption"))]
800
                if key.is_some() {
801
                    return Err(Error::EncryptionDisabled);
802
                }
803
            }
804
        }
805

            
806
2261207
        Ok(tree)
807
2261207
    }
808

            
809
1
    pub(crate) fn update_key_expiration<'key>(
810
1
        &self,
811
1
        tree_key: impl Into<Cow<'key, str>>,
812
1
        expiration: Option<Timestamp>,
813
1
    ) {
814
1
        self.data
815
1
            .context
816
1
            .update_key_expiration(tree_key, expiration);
817
1
    }
818

            
819
    /// Converts this instance into its blocking version, which is able to be
820
    /// used without async. The returned instance uses the current Tokio runtime
821
    /// handle to spawn blocking tasks.
822
    ///
823
    /// # Panics
824
    ///
825
    /// Panics if called outside the context of a Tokio runtime.
826
    #[cfg(feature = "async")]
827
    #[must_use]
828
2113448
    pub fn into_async(self) -> crate::AsyncDatabase {
829
2113448
        self.into_async_with_runtime(tokio::runtime::Handle::current())
830
2113448
    }
831

            
832
    /// Converts this instance into its blocking version, which is able to be
833
    /// used without async. The returned instance uses the provided runtime
834
    /// handle to spawn blocking tasks.
835
    #[cfg(feature = "async")]
836
    #[must_use]
837
2113448
    pub fn into_async_with_runtime(self, runtime: tokio::runtime::Handle) -> crate::AsyncDatabase {
838
2113448
        crate::AsyncDatabase {
839
2113448
            database: self,
840
2113448
            runtime: Arc::new(runtime),
841
2113448
        }
842
2113448
    }
843

            
844
    /// Converts this instance into its blocking version, which is able to be
845
    /// used without async. The returned instance uses the current Tokio runtime
846
    /// handle to spawn blocking tasks.
847
    ///
848
    /// # Panics
849
    ///
850
    /// Panics if called outside the context of a Tokio runtime.
851
    #[cfg(feature = "async")]
852
    #[must_use]
853
    pub fn to_async(&self) -> crate::AsyncDatabase {
854
        self.clone().into_async()
855
    }
856

            
857
    /// Converts this instance into its blocking version, which is able to be
858
    /// used without async. The returned instance uses the provided runtime
859
    /// handle to spawn blocking tasks.
860
    #[cfg(feature = "async")]
861
    #[must_use]
862
    pub fn to_async_with_runtime(&self, runtime: tokio::runtime::Handle) -> crate::AsyncDatabase {
863
        self.clone().into_async_with_runtime(runtime)
864
    }
865
}
866
59
#[derive(Serialize, Deserialize)]
867
struct LegacyHeader {
868
    id: u64,
869
    revision: Revision,
870
}
871
59
#[derive(Serialize, Deserialize)]
872
struct LegacyDocument<'a> {
873
    header: LegacyHeader,
874
    #[serde(borrow)]
875
    contents: &'a [u8],
876
}
877

            
878
1115078
pub(crate) fn deserialize_document(bytes: &[u8]) -> Result<BorrowedDocument<'_>, Error> {
879
1115078
    match pot::from_slice::<BorrowedDocument<'_>>(bytes) {
880
1115019
        Ok(document) => Ok(document),
881
59
        Err(err) => match bincode::deserialize::<LegacyDocument<'_>>(bytes) {
882
59
            Ok(legacy_doc) => Ok(BorrowedDocument {
883
59
                header: Header {
884
59
                    id: DocumentId::from_u64(legacy_doc.header.id),
885
59
                    revision: legacy_doc.header.revision,
886
59
                },
887
59
                contents: CowBytes::from(legacy_doc.contents),
888
59
            }),
889
            Err(_) => Err(Error::from(err)),
890
        },
891
    }
892
1115078
}
893

            
894
840654
fn serialize_document(document: &BorrowedDocument<'_>) -> Result<Vec<u8>, bonsaidb_core::Error> {
895
840654
    pot::to_vec(document)
896
840654
        .map_err(Error::from)
897
840654
        .map_err(bonsaidb_core::Error::from)
898
840654
}
899

            
900
impl HasSession for Database {
901
4233540
    fn session(&self) -> Option<&Session> {
902
4233540
        self.storage.session()
903
4233540
    }
904
}
905

            
906
impl Connection for Database {
907
    type Storage = Storage;
908

            
909
4972941
    fn storage(&self) -> Self::Storage {
910
4972941
        self.storage.clone()
911
4972941
    }
912

            
913
    fn list_executed_transactions(
914
        &self,
915
        starting_id: Option<u64>,
916
        result_limit: Option<u32>,
917
    ) -> Result<Vec<transaction::Executed>, bonsaidb_core::Error> {
918
32304
        self.check_permission(
919
32304
            database_resource_name(self.name()),
920
32304
            &BonsaiAction::Database(DatabaseAction::Transaction(TransactionAction::ListExecuted)),
921
32304
        )?;
922
32304
        let result_limit = usize::try_from(
923
32304
            result_limit
924
32304
                .unwrap_or(LIST_TRANSACTIONS_DEFAULT_RESULT_COUNT)
925
32304
                .min(LIST_TRANSACTIONS_MAX_RESULTS),
926
32304
        )
927
32304
        .unwrap();
928
32304
        if result_limit > 0 {
929
32180
            let range = if let Some(starting_id) = starting_id {
930
16518
                Range::from(starting_id..)
931
            } else {
932
15662
                Range::from(..)
933
            };
934

            
935
32180
            let mut entries = Vec::new();
936
32180
            self.roots()
937
32180
                .transactions()
938
306942
                .scan(range, |entry| {
939
306942
                    entries.push(entry);
940
306942
                    entries.len() < result_limit
941
306942
                })
942
32180
                .map_err(Error::from)?;
943

            
944
32180
            entries
945
32180
                .into_iter()
946
32180
                .map(|entry| {
947
306942
                    if let Some(data) = entry.data() {
948
306884
                        let changes = compat::deserialize_executed_transaction_changes(data)?;
949
306884
                        Ok(Some(transaction::Executed {
950
306884
                            id: entry.id,
951
306884
                            changes,
952
306884
                        }))
953
                    } else {
954
58
                        Ok(None)
955
                    }
956
306942
                })
957
32180
                .filter_map(Result::transpose)
958
32180
                .collect::<Result<Vec<_>, Error>>()
959
32180
                .map_err(bonsaidb_core::Error::from)
960
        } else {
961
            // A request was made to return an empty result? This should probably be
962
            // an error, but technically this is a correct response.
963
124
            Ok(Vec::default())
964
        }
965
32304
    }
966

            
967
    fn last_transaction_id(&self) -> Result<Option<u64>, bonsaidb_core::Error> {
968
469862
        self.check_permission(
969
469862
            database_resource_name(self.name()),
970
469862
            &BonsaiAction::Database(DatabaseAction::Transaction(TransactionAction::GetLastId)),
971
469862
        )?;
972
469862
        Ok(self.roots().transactions().current_transaction_id())
973
469862
    }
974

            
975
    fn compact(&self) -> Result<(), bonsaidb_core::Error> {
976
124
        self.check_permission(
977
124
            database_resource_name(self.name()),
978
124
            &BonsaiAction::Database(DatabaseAction::Compact),
979
124
        )?;
980
124
        self.storage()
981
124
            .instance
982
124
            .tasks()
983
124
            .compact_database(self.clone())?;
984
124
        Ok(())
985
124
    }
986

            
987
    fn compact_key_value_store(&self) -> Result<(), bonsaidb_core::Error> {
988
124
        self.check_permission(
989
124
            kv_resource_name(self.name()),
990
124
            &BonsaiAction::Database(DatabaseAction::Compact),
991
124
        )?;
992
124
        self.storage()
993
124
            .instance
994
124
            .tasks()
995
124
            .compact_key_value_store(self.clone())?;
996
124
        Ok(())
997
124
    }
998
}
999

            
impl LowLevelConnection for Database {
19768
    fn schematic(&self) -> &Schematic {
19768
        &self.data.schema
19768
    }

            
546550
    fn apply_transaction(
546550
        &self,
546550
        transaction: Transaction,
546550
    ) -> Result<Vec<OperationResult>, bonsaidb_core::Error> {
1432252
        for op in &transaction.operations {
885792
            let (resource, action) = match &op.command {
658228
                Command::Insert { .. } => (
658228
                    collection_resource_name(self.name(), &op.collection),
658228
                    BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Insert)),
658228
                ),
167828
                Command::Update { header, .. } => (
167828
                    document_resource_name(self.name(), &op.collection, &header.id),
167828
                    BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Update)),
167828
                ),
15278
                Command::Overwrite { id, .. } => (
15278
                    document_resource_name(self.name(), &op.collection, id),
15278
                    BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Overwrite)),
15278
                ),
44458
                Command::Delete { header } => (
44458
                    document_resource_name(self.name(), &op.collection, &header.id),
44458
                    BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Delete)),
44458
                ),
            };
885792
            self.check_permission(&resource, &action)?;
        }

            
546460
        let mut unique_view_tasks = Vec::new();
548320
        for collection_name in transaction
546460
            .operations
546460
            .iter()
885702
            .map(|op| &op.collection)
546460
            .collect::<HashSet<_>>()
        {
548318
            if let Some(views) = self.data.schema.views_in_collection(collection_name) {
1453979
                for view in views {
1124996
                    if view.unique() {
46575
                        if let Some(task) = self
46575
                            .storage
46575
                            .instance
46575
                            .tasks()
46575
                            .spawn_integrity_check(view, self)
1198
                        {
1198
                            unique_view_tasks.push(task);
45407
                        }
1078421
                    }
                }
219365
            }
        }

            
546460
        let mut unique_view_mapping_tasks = Vec::new();
547658
        for task in unique_view_tasks {
1198
            if let Some(spawned_task) = task.receive().map_err(Error::from)?.map_err(Error::from)? {
1106
                unique_view_mapping_tasks.push(spawned_task);
1106
            }
        }

            
547566
        for task in unique_view_mapping_tasks {
1106
            let mut task = task.lock();
1106
            if let Some(task) = task.take() {
1106
                task.receive().map_err(Error::from)?.map_err(Error::from)?;
            }
        }

            
546460
        self.apply_transaction_to_roots(&transaction)
546460
            .map_err(bonsaidb_core::Error::from)
546580
    }

            
    fn get_from_collection(
        &self,
        id: DocumentId,
        collection: &CollectionName,
    ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error> {
474657
        self.check_permission(
474657
            document_resource_name(self.name(), collection, &id),
474657
            &BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Get)),
474657
        )?;
474657
        let tree = self
474657
            .data
474657
            .context
474657
            .roots
474657
            .tree(self.collection_tree::<Versioned, _>(collection, document_tree_name(collection))?)
474657
            .map_err(Error::from)?;
474657
        if let Some(vec) = tree.get(id.as_ref()).map_err(Error::from)? {
459254
            Ok(Some(deserialize_document(&vec)?.into_owned()))
        } else {
15402
            Ok(None)
        }
474657
    }

            
    fn list_from_collection(
        &self,
        ids: Range<DocumentId>,
        sort: Sort,
        limit: Option<u32>,
        collection: &CollectionName,
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
723
        self.check_permission(
723
            collection_resource_name(self.name(), collection),
723
            &BonsaiAction::Database(DatabaseAction::Document(DocumentAction::List)),
723
        )?;
723
        let tree = self
723
            .data
723
            .context
723
            .roots
723
            .tree(self.collection_tree::<Versioned, _>(collection, document_tree_name(collection))?)
723
            .map_err(Error::from)?;
723
        let mut found_docs = Vec::new();
723
        let mut keys_read = 0;
723
        let ids = DocumentIdRange(ids);
723
        tree.scan(
723
            &ids.borrow_as_bytes(),
723
            match sort {
599
                Sort::Ascending => true,
124
                Sort::Descending => false,
            },
24
            |_, _, _| ScanEvaluation::ReadData,
723
            |_, _| {
1111
                if let Some(limit) = limit {
248
                    if keys_read >= limit {
124
                        return ScanEvaluation::Stop;
124
                    }
124

            
124
                    keys_read += 1;
863
                }
987
                ScanEvaluation::ReadData
1111
            },
723
            |_, _, doc| {
                found_docs.push(
987
                    deserialize_document(&doc)
987
                        .map(BorrowedDocument::into_owned)
987
                        .map_err(AbortError::Other)?,
                );
987
                Ok(())
987
            },
        )
723
        .map_err(|err| match err {
            AbortError::Other(err) => err,
            AbortError::Nebari(err) => crate::Error::from(err),
723
        })?;

            
723
        Ok(found_docs)
723
    }

            
    fn list_headers_from_collection(
        &self,
        ids: Range<DocumentId>,
        sort: Sort,
        limit: Option<u32>,
        collection: &CollectionName,
    ) -> Result<Vec<Header>, bonsaidb_core::Error> {
124
        self.check_permission(
124
            collection_resource_name(self.name(), collection),
124
            &BonsaiAction::Database(DatabaseAction::Document(DocumentAction::ListHeaders)),
124
        )?;
124
        let tree = self
124
            .data
124
            .context
124
            .roots
124
            .tree(self.collection_tree::<Versioned, _>(collection, document_tree_name(collection))?)
124
            .map_err(Error::from)?;
124
        let mut found_headers = Vec::new();
124
        let mut keys_read = 0;
124
        let ids = DocumentIdRange(ids);
124
        tree.scan(
124
            &ids.borrow_as_bytes(),
124
            match sort {
124
                Sort::Ascending => true,
                Sort::Descending => false,
            },
            |_, _, _| ScanEvaluation::ReadData,
124
            |_, _| {
248
                if let Some(limit) = limit {
                    if keys_read >= limit {
                        return ScanEvaluation::Stop;
                    }

            
                    keys_read += 1;
248
                }
248
                ScanEvaluation::ReadData
248
            },
124
            |_, _, doc| {
                found_headers.push(
248
                    deserialize_document(&doc)
248
                        .map(|doc| doc.header)
248
                        .map_err(AbortError::Other)?,
                );
248
                Ok(())
248
            },
        )
124
        .map_err(|err| match err {
            AbortError::Other(err) => err,
            AbortError::Nebari(err) => crate::Error::from(err),
124
        })?;

            
124
        Ok(found_headers)
124
    }

            
    fn count_from_collection(
        &self,
        ids: Range<DocumentId>,
        collection: &CollectionName,
    ) -> Result<u64, bonsaidb_core::Error> {
248
        self.check_permission(
248
            collection_resource_name(self.name(), collection),
248
            &BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Count)),
248
        )?;
248
        let tree = self
248
            .data
248
            .context
248
            .roots
248
            .tree(self.collection_tree::<Versioned, _>(collection, document_tree_name(collection))?)
248
            .map_err(Error::from)?;
248
        let ids = DocumentIdRange(ids);
248
        let stats = tree.reduce(&ids.borrow_as_bytes()).map_err(Error::from)?;

            
248
        Ok(stats.alive_keys)
248
    }

            
273208
    fn get_multiple_from_collection(
273208
        &self,
273208
        ids: &[DocumentId],
273208
        collection: &CollectionName,
273208
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
544452
        for id in ids {
271274
            self.check_permission(
271274
                document_resource_name(self.name(), collection, id),
271274
                &BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Get)),
271274
            )?;
        }
273178
        let mut ids = ids.to_vec();
273178
        let collection = collection.clone();
273178
        let tree = self
273178
            .data
273178
            .context
273178
            .roots
273178
            .tree(
273178
                self.collection_tree::<Versioned, _>(&collection, document_tree_name(&collection))?,
            )
273178
            .map_err(Error::from)?;
273178
        ids.sort();
273208
        let keys_and_values = tree
273194
            .get_multiple(ids.iter().map(|id| id.as_ref()))
273178
            .map_err(Error::from)?;

            
273208
        keys_and_values
273208
            .into_iter()
273224
            .map(|(_, value)| deserialize_document(&value).map(BorrowedDocument::into_owned))
273208
            .collect::<Result<Vec<_>, Error>>()
273208
            .map_err(bonsaidb_core::Error::from)
273208
    }

            
    fn compact_collection_by_name(
        &self,
        collection: CollectionName,
    ) -> Result<(), bonsaidb_core::Error> {
124
        self.check_permission(
124
            collection_resource_name(self.name(), &collection),
124
            &BonsaiAction::Database(DatabaseAction::Compact),
124
        )?;
124
        self.storage()
124
            .instance
124
            .tasks()
124
            .compact_collection(self.clone(), collection)?;
124
        Ok(())
124
    }

            
298172
    fn query_by_name(
298172
        &self,
298172
        view: &ViewName,
298172
        key: Option<QueryKey<Bytes>>,
298172
        order: Sort,
298172
        limit: Option<u32>,
298172
        access_policy: AccessPolicy,
298172
    ) -> Result<Vec<schema::view::map::Serialized>, bonsaidb_core::Error> {
298172
        let view = self.schematic().view_by_name(view)?;
298172
        self.check_permission(
298172
            view_resource_name(self.name(), &view.view_name()),
298172
            &BonsaiAction::Database(DatabaseAction::View(ViewAction::Query)),
298172
        )?;
298172
        let mut results = Vec::new();
298172
        self.for_each_in_view(view, key, order, limit, access_policy, |entry| {
583276
            for mapping in entry.mappings {
292367
                results.push(bonsaidb_core::schema::view::map::Serialized {
292367
                    source: mapping.source,
292367
                    key: entry.key.clone(),
292367
                    value: mapping.value,
292367
                });
292367
            }
290909
            Ok(())
298172
        })?;

            
298172
        Ok(results)
298172
    }

            
    fn query_by_name_with_docs(
        &self,
        view: &ViewName,
        key: Option<QueryKey<Bytes>>,
        order: Sort,
        limit: Option<u32>,
        access_policy: AccessPolicy,
    ) -> Result<schema::view::map::MappedSerializedDocuments, bonsaidb_core::Error> {
        let results = self.query_by_name(view, key, order, limit, access_policy)?;
        let view = self.schematic().view_by_name(view).unwrap(); // query() will fail if it's not present

            
        let documents = self
            .get_multiple_from_collection(
                &results.iter().map(|m| m.source.id).collect::<Vec<_>>(),
                &view.collection(),
            )?
            .into_iter()
            .map(|doc| (doc.header.id, doc))
            .collect::<BTreeMap<_, _>>();

            
        Ok(
            bonsaidb_core::schema::view::map::MappedSerializedDocuments {
                mappings: results,
                documents,
            },
        )
    }

            
539630
    fn reduce_by_name(
539630
        &self,
539630
        view_name: &ViewName,
539630
        key: Option<QueryKey<Bytes>>,
539630
        access_policy: AccessPolicy,
539630
    ) -> Result<Vec<u8>, bonsaidb_core::Error> {
539630
        let mut mappings = self.reduce_grouped_by_name(view_name, key, access_policy)?;

            
539630
        let result = if mappings.len() == 1 {
242348
            mappings.pop().unwrap().value.into_vec()
        } else {
297282
            let view = self.data.schema.view_by_name(view_name)?;
297282
            view.reduce(
297282
                &mappings
297282
                    .iter()
297282
                    .map(|map| (map.key.as_ref(), map.value.as_ref()))
297282
                    .collect::<Vec<_>>(),
297282
                true,
297282
            )
297282
            .map_err(Error::from)?
        };

            
539506
        Ok(result)
539630
    }

            
539950
    fn reduce_grouped_by_name(
539950
        &self,
539950
        view_name: &ViewName,
539950
        key: Option<QueryKey<Bytes>>,
539950
        access_policy: AccessPolicy,
539950
    ) -> Result<Vec<MappedSerializedValue>, bonsaidb_core::Error> {
539950
        let view = self.data.schema.view_by_name(view_name)?;
539950
        self.check_permission(
539950
            view_resource_name(self.name(), &view.view_name()),
539950
            &BonsaiAction::Database(DatabaseAction::View(ViewAction::Reduce)),
539950
        )?;
539950
        let mut mappings = Vec::new();
539966
        self.for_each_in_view(view, key, Sort::Ascending, None, access_policy, |entry| {
247886
            mappings.push(MappedSerializedValue {
247886
                key: entry.key,
247886
                value: entry.reduced_value,
247886
            });
247886
            Ok(())
539966
        })?;

            
539950
        Ok(mappings)
539950
    }

            
248
    fn delete_docs_by_name(
248
        &self,
248
        view: &ViewName,
248
        key: Option<QueryKey<Bytes>>,
248
        access_policy: AccessPolicy,
248
    ) -> Result<u64, bonsaidb_core::Error> {
248
        let view = self.data.schema.view_by_name(view)?;
248
        let collection = view.collection();
248
        let mut transaction = Transaction::default();
248
        self.for_each_in_view(view, key, Sort::Ascending, None, access_policy, |entry| {
620
            for mapping in entry.mappings {
372
                transaction.push(Operation::delete(collection.clone(), mapping.source));
372
            }

            
248
            Ok(())
248
        })?;

            
248
        let results = LowLevelConnection::apply_transaction(self, transaction)?;

            
248
        Ok(results.len() as u64)
248
    }
}

            
type ViewIterator<'a> =
    Box<dyn Iterator<Item = Result<(ArcBytes<'static>, ArcBytes<'static>), Error>> + 'a>;

            
struct ViewEntryCollectionIterator<'a> {
    iterator: ViewIterator<'a>,
}

            
impl<'a> Iterator for ViewEntryCollectionIterator<'a> {
    type Item = Result<ViewEntry, crate::Error>;

            
    fn next(&mut self) -> Option<Self::Item> {
        self.iterator.next().map(|item| {
            item.map_err(crate::Error::from)
                .and_then(|(_, value)| bincode::deserialize(&value).map_err(Error::from))
        })
    }
}

            
2165463
#[derive(Debug, Clone)]
pub(crate) struct Context {
    data: Arc<ContextData>,
}

            
impl Deref for Context {
    type Target = ContextData;

            
3353992
    fn deref(&self) -> &Self::Target {
3353992
        &self.data
3353992
    }
}

            
#[derive(Debug)]
pub(crate) struct ContextData {
    pub(crate) roots: Roots<AnyFile>,
    key_value_state: Arc<Mutex<keyvalue::KeyValueState>>,
}

            
impl Borrow<Roots<AnyFile>> for Context {
    fn borrow(&self) -> &Roots<AnyFile> {
        &self.data.roots
    }
}

            
impl Context {
26877
    pub(crate) fn new(roots: Roots<AnyFile>, key_value_persistence: KeyValuePersistence) -> Self {
26877
        let background_worker_target = Watchable::new(BackgroundWorkerProcessTarget::Never);
26877
        let mut background_worker_target_watcher = background_worker_target.watch();
26877
        let key_value_state = Arc::new(Mutex::new(keyvalue::KeyValueState::new(
26877
            key_value_persistence,
26877
            roots.clone(),
26877
            background_worker_target,
26877
        )));
26877
        let background_worker_state = Arc::downgrade(&key_value_state);
26877
        let context = Self {
26877
            data: Arc::new(ContextData {
26877
                roots,
26877
                key_value_state,
26877
            }),
26877
        };
26877
        std::thread::Builder::new()
26877
            .name(String::from("keyvalue-worker"))
26877
            .spawn(move || {
26847
                keyvalue::background_worker(
26847
                    &background_worker_state,
26847
                    &mut background_worker_target_watcher,
26847
                );
26877
            })
26877
            .unwrap();
26877
        context
26877
    }

            
1253889
    pub(crate) fn perform_kv_operation(
1253889
        &self,
1253889
        op: KeyOperation,
1253889
    ) -> Result<Output, bonsaidb_core::Error> {
1253889
        let mut state = self.data.key_value_state.lock();
1253889
        state.perform_kv_operation(op, &self.data.key_value_state)
1253889
    }

            
14
    pub(crate) fn update_key_expiration<'key>(
14
        &self,
14
        tree_key: impl Into<Cow<'key, str>>,
14
        expiration: Option<Timestamp>,
14
    ) {
14
        let mut state = self.data.key_value_state.lock();
14
        state.update_key_expiration(tree_key, expiration);
14
    }

            
    #[cfg(test)]
1
    pub(crate) fn kv_persistence_watcher(&self) -> watchable::Watcher<Timestamp> {
1
        let state = self.data.key_value_state.lock();
1
        state.persistence_watcher()
1
    }
}

            
impl Drop for ContextData {
    fn drop(&mut self) {
24027
        if let Some(shutdown) = {
24027
            let mut state = self.key_value_state.lock();
24027
            state.shutdown(&self.key_value_state)
24027
        } {
301
            let _ = shutdown.recv();
23726
        }
24027
    }
}

            
2792776
pub fn document_tree_name(collection: &CollectionName) -> String {
2792776
    format!("collection.{:#}", collection)
2792776
}

            
pub struct DocumentIdRange(Range<DocumentId>);

            
impl<'a> BorrowByteRange<'a> for DocumentIdRange {
1095
    fn borrow_as_bytes(&'a self) -> BorrowedRange<'a> {
1095
        BorrowedRange {
1095
            start: match &self.0.start {
475
                connection::Bound::Unbounded => ops::Bound::Unbounded,
620
                connection::Bound::Included(docid) => ops::Bound::Included(docid.as_ref()),
                connection::Bound::Excluded(docid) => ops::Bound::Excluded(docid.as_ref()),
            },
1095
            end: match &self.0.end {
475
                connection::Bound::Unbounded => ops::Bound::Unbounded,
496
                connection::Bound::Included(docid) => ops::Bound::Included(docid.as_ref()),
124
                connection::Bound::Excluded(docid) => ops::Bound::Excluded(docid.as_ref()),
            },
        }
1095
    }
}

            
/// Operations that can be performed on both [`Database`] and
/// [`AsyncDatabase`](crate::AsyncDatabase).
pub trait DatabaseNonBlocking {
    /// Returns the name of the database.
    #[must_use]
    fn name(&self) -> &str;
}

            
impl DatabaseNonBlocking for Database {
4240562
    fn name(&self) -> &str {
4240562
        self.data.name.as_ref()
4240562
    }
}