1
use std::{
2
    borrow::{Borrow, Cow},
3
    collections::{BTreeMap, HashMap, HashSet},
4
    convert::Infallible,
5
    ops::{self, Deref},
6
    sync::Arc,
7
    u8,
8
};
9

            
10
use async_lock::Mutex;
11
use async_trait::async_trait;
12
#[cfg(any(feature = "encryption", feature = "compression"))]
13
use bonsaidb_core::document::KeyId;
14
use bonsaidb_core::{
15
    arc_bytes::{
16
        serde::{Bytes, CowBytes},
17
        ArcBytes,
18
    },
19
    connection::{self, AccessPolicy, Connection, QueryKey, Range, Sort, StorageConnection},
20
    document::{AnyDocumentId, BorrowedDocument, DocumentId, Header, OwnedDocument, Revision},
21
    key::Key,
22
    keyvalue::{KeyOperation, Output, Timestamp},
23
    limits::{LIST_TRANSACTIONS_DEFAULT_RESULT_COUNT, LIST_TRANSACTIONS_MAX_RESULTS},
24
    permissions::Permissions,
25
    schema::{
26
        self,
27
        view::{
28
            self,
29
            map::{MappedDocuments, MappedSerializedValue},
30
        },
31
        Collection, CollectionName, Map, MappedValue, Schema, Schematic, ViewName,
32
    },
33
    transaction::{
34
        self, ChangedDocument, Changes, Command, Operation, OperationResult, Transaction,
35
    },
36
};
37
use bonsaidb_utils::fast_async_lock;
38
use itertools::Itertools;
39
use nebari::{
40
    io::any::AnyFile,
41
    tree::{
42
        AnyTreeRoot, BorrowByteRange, BorrowedRange, CompareSwap, KeyEvaluation, Root, TreeRoot,
43
        Unversioned, Versioned,
44
    },
45
    AbortError, ExecutingTransaction, Roots, Tree,
46
};
47
use serde::{Deserialize, Serialize};
48
use tokio::sync::watch;
49

            
50
#[cfg(feature = "encryption")]
51
use crate::storage::TreeVault;
52
use crate::{
53
    config::{Builder, KeyValuePersistence, StorageConfiguration},
54
    database::keyvalue::BackgroundWorkerProcessTarget,
55
    error::Error,
56
    open_trees::OpenTrees,
57
    views::{
58
        mapper::{self, ViewEntryCollection},
59
        view_document_map_tree_name, view_entries_tree_name, view_invalidated_docs_tree_name,
60
        view_omitted_docs_tree_name, ViewEntry,
61
    },
62
    Storage,
63
};
64

            
65
pub mod keyvalue;
66

            
67
pub mod pubsub;
68

            
69
/// A local, file-based database.
70
///
71
/// ## Using `Database` to create a single database
72
///
73
/// `Database`provides an easy mechanism to open and access a single database:
74
///
75
/// ```rust
76
/// // `bonsaidb_core` is re-exported to `bonsaidb::core` or `bonsaidb_local::core`.
77
/// use bonsaidb_core::schema::Collection;
78
/// // `bonsaidb_local` is re-exported to `bonsaidb::local` if using the omnibus crate.
79
/// use bonsaidb_local::{
80
///     config::{Builder, StorageConfiguration},
81
///     Database,
82
/// };
83
/// use serde::{Deserialize, Serialize};
84
///
85
/// #[derive(Debug, Serialize, Deserialize, Collection)]
86
/// #[collection(name = "blog-posts")]
87
/// # #[collection(core = bonsaidb_core)]
88
/// struct BlogPost {
89
///     pub title: String,
90
///     pub contents: String,
91
/// }
92
///
93
/// # async fn test_fn() -> Result<(), bonsaidb_core::Error> {
94
/// let db = Database::open::<BlogPost>(StorageConfiguration::new("my-db.bonsaidb")).await?;
95
/// #     Ok(())
96
/// # }
97
/// ```
98
///
99
/// Under the hood, this initializes a [`Storage`] instance pointing at
100
/// "./my-db.bonsaidb". It then returns (or creates) a database named "default"
101
/// with the schema `BlogPost`.
102
///
103
/// In this example, `BlogPost` implements the [`Collection`] trait, and all
104
/// collections can be used as a [`Schema`].
105
#[derive(Debug)]
106
pub struct Database {
107
    pub(crate) data: Arc<Data>,
108
}
109

            
110
#[derive(Debug)]
111
pub struct Data {
112
    pub name: Arc<Cow<'static, str>>,
113
    context: Context,
114
    pub(crate) storage: Storage,
115
    pub(crate) schema: Arc<Schematic>,
116
    #[allow(dead_code)] // This code was previously used, it works, but is currently unused.
117
    pub(crate) effective_permissions: Option<Permissions>,
118
}
119
impl Clone for Database {
120
1276145
    fn clone(&self) -> Self {
121
1276145
        Self {
122
1276145
            data: self.data.clone(),
123
1276145
        }
124
1276145
    }
125
}
126

            
127
impl Database {
128
    /// Opens a local file as a bonsaidb.
129
89043
    pub(crate) async fn new<DB: Schema, S: Into<Cow<'static, str>> + Send>(
130
89043
        name: S,
131
89043
        context: Context,
132
89043
        storage: Storage,
133
89043
    ) -> Result<Self, Error> {
134
89043
        let name = name.into();
135
89043
        let schema = Arc::new(DB::schematic()?);
136
89043
        let db = Self {
137
89043
            data: Arc::new(Data {
138
89043
                name: Arc::new(name),
139
89043
                context,
140
89043
                storage: storage.clone(),
141
89043
                schema,
142
89043
                effective_permissions: None,
143
89043
            }),
144
89043
        };
145
89043

            
146
89043
        if db.data.storage.check_view_integrity_on_database_open() {
147
16
            for view in db.data.schema.views() {
148
16
                db.data
149
16
                    .storage
150
16
                    .tasks()
151
16
                    .spawn_integrity_check(view, &db)
152
                    .await?;
153
            }
154
89039
        }
155

            
156
89043
        storage.tasks().spawn_key_value_expiration_loader(&db).await;
157

            
158
89043
        Ok(db)
159
89043
    }
160

            
161
    /// Returns a clone with `effective_permissions`. Replaces any previously applied permissions.
162
    ///
163
    /// # Unstable
164
    ///
165
    /// See [this issue](https://github.com/khonsulabs/bonsaidb/issues/68).
166
    #[doc(hidden)]
167
    #[must_use]
168
    pub fn with_effective_permissions(&self, effective_permissions: Permissions) -> Self {
169
        Self {
170
            data: Arc::new(Data {
171
                name: self.data.name.clone(),
172
                context: self.data.context.clone(),
173
                storage: self.data.storage.clone(),
174
                schema: self.data.schema.clone(),
175
                effective_permissions: Some(effective_permissions),
176
            }),
177
        }
178
    }
179

            
180
    /// Returns the name of the database.
181
    #[must_use]
182
1407
    pub fn name(&self) -> &str {
183
1407
        self.data.name.as_ref()
184
1407
    }
185

            
186
    /// Creates a `Storage` with a single-database named "default" with its data stored at `path`.
187
25
    pub async fn open<DB: Schema>(configuration: StorageConfiguration) -> Result<Self, Error> {
188
379
        let storage = Storage::open(configuration.with_schema::<DB>()?).await?;
189

            
190
40
        storage.create_database::<DB>("default", true).await?;
191

            
192
25
        Ok(storage.database::<DB>("default").await?)
193
25
    }
194

            
195
    /// Returns the [`Storage`] that this database belongs to.
196
    #[must_use]
197
6242466
    pub fn storage(&self) -> &'_ Storage {
198
6242466
        &self.data.storage
199
6242466
    }
200

            
201
    /// Returns the [`Schematic`] for the schema for this database.
202
    #[must_use]
203
2632146
    pub fn schematic(&self) -> &'_ Schematic {
204
2632146
        &self.data.schema
205
2632146
    }
206

            
207
1727375
    pub(crate) fn roots(&self) -> &'_ nebari::Roots<AnyFile> {
208
1727375
        &self.data.context.roots
209
1727375
    }
210

            
211
59916
    async fn for_each_in_view<
212
59916
        F: FnMut(ViewEntryCollection) -> Result<(), bonsaidb_core::Error> + Send + Sync,
213
59916
    >(
214
59916
        &self,
215
59916
        view: &dyn view::Serialized,
216
59916
        key: Option<QueryKey<Bytes>>,
217
59916
        order: Sort,
218
59916
        limit: Option<usize>,
219
59916
        access_policy: AccessPolicy,
220
59916
        mut callback: F,
221
59916
    ) -> Result<(), bonsaidb_core::Error> {
222
59916
        if matches!(access_policy, AccessPolicy::UpdateBefore) {
223
44763
            self.data
224
44763
                .storage
225
44763
                .tasks()
226
44763
                .update_view_if_needed(view, self)
227
38276
                .await?;
228
15153
        }
229

            
230
59916
        let view_entries = self
231
59916
            .roots()
232
59916
            .tree(self.collection_tree(
233
59916
                &view.collection(),
234
59916
                view_entries_tree_name(&view.view_name()),
235
59916
            )?)
236
59916
            .map_err(Error::from)?;
237

            
238
        {
239
59916
            for entry in Self::create_view_iterator(&view_entries, key, order, limit)? {
240
31734
                callback(entry)?;
241
            }
242
        }
243

            
244
59916
        if matches!(access_policy, AccessPolicy::UpdateAfter) {
245
5
            let db = self.clone();
246
5
            let view_name = view.view_name();
247
5
            tokio::task::spawn(async move {
248
5
                let view = db
249
5
                    .data
250
5
                    .schema
251
5
                    .view_by_name(&view_name)
252
5
                    .expect("query made with view that isn't registered with this database");
253
5
                db.data
254
5
                    .storage
255
5
                    .tasks()
256
12
                    .update_view_if_needed(view, &db)
257
12
                    .await
258
5
            });
259
59911
        }
260

            
261
59916
        Ok(())
262
59916
    }
263

            
264
40578
    async fn for_each_view_entry<
265
40578
        V: schema::View,
266
40578
        F: FnMut(ViewEntryCollection) -> Result<(), bonsaidb_core::Error> + Send + Sync,
267
40578
    >(
268
40578
        &self,
269
40578
        key: Option<QueryKey<V::Key>>,
270
40578
        order: Sort,
271
40578
        limit: Option<usize>,
272
40578
        access_policy: AccessPolicy,
273
40578
        callback: F,
274
40579
    ) -> Result<(), bonsaidb_core::Error> {
275
40579
        let view = self
276
40579
            .data
277
40579
            .schema
278
40579
            .view::<V>()
279
40579
            .expect("query made with view that isn't registered with this database");
280
40579

            
281
40579
        self.for_each_in_view(
282
40579
            view,
283
40579
            key.map(|key| key.serialized()).transpose()?,
284
40579
            order,
285
40579
            limit,
286
40579
            access_policy,
287
40579
            callback,
288
36062
        )
289
36062
        .await
290
40579
    }
291

            
292
    #[cfg(feature = "internal-apis")]
293
    #[doc(hidden)]
294
197350
    pub async fn internal_get_from_collection_id(
295
197350
        &self,
296
197350
        id: DocumentId,
297
197350
        collection: &CollectionName,
298
197350
    ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error> {
299
7894
        self.get_from_collection_id(id, collection).await
300
7894
    }
301

            
302
    #[cfg(feature = "internal-apis")]
303
    #[doc(hidden)]
304
200
    pub async fn list_from_collection(
305
200
        &self,
306
200
        ids: Range<DocumentId>,
307
200
        order: Sort,
308
200
        limit: Option<usize>,
309
200
        collection: &CollectionName,
310
200
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
311
8
        self.list(ids, order, limit, collection).await
312
8
    }
313

            
314
    #[cfg(feature = "internal-apis")]
315
    #[doc(hidden)]
316
100
    pub async fn internal_get_multiple_from_collection_id(
317
100
        &self,
318
100
        ids: &[DocumentId],
319
100
        collection: &CollectionName,
320
100
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
321
4
        self.get_multiple_from_collection_id(ids, collection).await
322
4
    }
323

            
324
    #[cfg(feature = "internal-apis")]
325
    #[doc(hidden)]
326
50
    pub async fn compact_collection_by_name(
327
50
        &self,
328
50
        collection: CollectionName,
329
50
    ) -> Result<(), bonsaidb_core::Error> {
330
2
        self.storage()
331
2
            .tasks()
332
2
            .compact_collection(self.clone(), collection)
333
2
            .await?;
334
2
        Ok(())
335
2
    }
336

            
337
    #[cfg(feature = "internal-apis")]
338
    #[doc(hidden)]
339
97250
    pub async fn query_by_name(
340
97250
        &self,
341
97250
        view: &ViewName,
342
97250
        key: Option<QueryKey<Bytes>>,
343
97250
        order: Sort,
344
97250
        limit: Option<usize>,
345
97250
        access_policy: AccessPolicy,
346
97250
    ) -> Result<Vec<bonsaidb_core::schema::view::map::Serialized>, bonsaidb_core::Error> {
347
3890
        if let Some(view) = self.schematic().view_by_name(view) {
348
3890
            let mut results = Vec::new();
349
3890
            self.for_each_in_view(view, key, order, limit, access_policy, |collection| {
350
3888
                let entry = ViewEntry::from(collection);
351
7794
                for mapping in entry.mappings {
352
3906
                    results.push(bonsaidb_core::schema::view::map::Serialized {
353
3906
                        source: mapping.source,
354
3906
                        key: entry.key.clone(),
355
3906
                        value: mapping.value,
356
3906
                    });
357
3906
                }
358
3888
                Ok(())
359
3890
            })
360
1886
            .await?;
361

            
362
3890
            Ok(results)
363
        } else {
364
            Err(bonsaidb_core::Error::CollectionNotFound)
365
        }
366
3890
    }
367

            
368
    #[cfg(feature = "internal-apis")]
369
    #[doc(hidden)]
370
96000
    pub async fn query_by_name_with_docs(
371
96000
        &self,
372
96000
        view: &ViewName,
373
96000
        key: Option<QueryKey<Bytes>>,
374
96000
        order: Sort,
375
96000
        limit: Option<usize>,
376
96000
        access_policy: AccessPolicy,
377
96000
    ) -> Result<bonsaidb_core::schema::view::map::MappedSerializedDocuments, bonsaidb_core::Error>
378
96000
    {
379
3840
        let results = self
380
3840
            .query_by_name(view, key, order, limit, access_policy)
381
1851
            .await?;
382
3840
        let view = self.schematic().view_by_name(view).unwrap(); // query() will fail if it's not present
383

            
384
3840
        let documents = self
385
3840
            .get_multiple_from_collection_id(
386
3840
                &results.iter().map(|m| m.source.id).collect::<Vec<_>>(),
387
3840
                &view.collection(),
388
3840
            )
389
2183
            .await?
390
3840
            .into_iter()
391
3840
            .map(|doc| (doc.header.id, doc))
392
3840
            .collect::<BTreeMap<_, _>>();
393
3840

            
394
3840
        Ok(
395
3840
            bonsaidb_core::schema::view::map::MappedSerializedDocuments {
396
3840
                mappings: results,
397
3840
                documents,
398
3840
            },
399
3840
        )
400
3840
    }
401

            
402
    #[cfg(feature = "internal-apis")]
403
    #[doc(hidden)]
404
192725
    pub async fn reduce_by_name(
405
192725
        &self,
406
192725
        view: &ViewName,
407
192725
        key: Option<QueryKey<Bytes>>,
408
192725
        access_policy: AccessPolicy,
409
192725
    ) -> Result<Vec<u8>, bonsaidb_core::Error> {
410
7710
        self.reduce_in_view(view, key, access_policy).await
411
7709
    }
412

            
413
    #[cfg(feature = "internal-apis")]
414
    #[doc(hidden)]
415
150
    pub async fn reduce_grouped_by_name(
416
150
        &self,
417
150
        view: &ViewName,
418
150
        key: Option<QueryKey<Bytes>>,
419
150
        access_policy: AccessPolicy,
420
150
    ) -> Result<Vec<MappedSerializedValue>, bonsaidb_core::Error> {
421
8
        self.grouped_reduce_in_view(view, key, access_policy).await
422
6
    }
423

            
424
    #[cfg(feature = "internal-apis")]
425
    #[doc(hidden)]
426
50
    pub async fn delete_docs_by_name(
427
50
        &self,
428
50
        view: &ViewName,
429
50
        key: Option<QueryKey<Bytes>>,
430
50
        access_policy: AccessPolicy,
431
50
    ) -> Result<u64, bonsaidb_core::Error> {
432
2
        let view = self
433
2
            .data
434
2
            .schema
435
2
            .view_by_name(view)
436
2
            .ok_or(bonsaidb_core::Error::CollectionNotFound)?;
437
2
        let collection = view.collection();
438
2
        let mut transaction = Transaction::default();
439
2
        self.for_each_in_view(view, key, Sort::Ascending, None, access_policy, |entry| {
440
2
            let entry = ViewEntry::from(entry);
441

            
442
6
            for mapping in entry.mappings {
443
4
                transaction.push(Operation::delete(collection.clone(), mapping.source));
444
4
            }
445

            
446
2
            Ok(())
447
2
        })
448
2
        .await?;
449

            
450
2
        let results = Connection::apply_transaction(self, transaction).await?;
451

            
452
2
        Ok(results.len() as u64)
453
2
    }
454

            
455
344891
    async fn get_from_collection_id(
456
344891
        &self,
457
344891
        id: DocumentId,
458
344891
        collection: &CollectionName,
459
344891
    ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error> {
460
13835
        let task_self = self.clone();
461
13835
        let collection = collection.clone();
462
13835
        tokio::task::spawn_blocking(move || {
463
13835
            let tree = task_self
464
13835
                .data
465
13835
                .context
466
13835
                .roots
467
13835
                .tree(task_self.collection_tree::<Versioned, _>(
468
13835
                    &collection,
469
13835
                    document_tree_name(&collection),
470
13835
                )?)
471
13835
                .map_err(Error::from)?;
472
13835
            if let Some(vec) = tree.get(id.as_ref()).map_err(Error::from)? {
473
13323
                Ok(Some(deserialize_document(&vec)?.into_owned()))
474
            } else {
475
511
                Ok(None)
476
            }
477
13835
        })
478
7944
        .await
479
13835
        .unwrap()
480
13835
    }
481

            
482
196795
    async fn get_multiple_from_collection_id(
483
196795
        &self,
484
196795
        ids: &[DocumentId],
485
196795
        collection: &CollectionName,
486
196795
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
487
7891
        let task_self = self.clone();
488
7891
        let ids = ids.to_vec();
489
7891
        let collection = collection.clone();
490
7891
        tokio::task::spawn_blocking(move || {
491
7891
            let tree = task_self
492
7891
                .data
493
7891
                .context
494
7891
                .roots
495
7891
                .tree(task_self.collection_tree::<Versioned, _>(
496
7891
                    &collection,
497
7891
                    document_tree_name(&collection),
498
7891
                )?)
499
7891
                .map_err(Error::from)?;
500
7891
            let mut ids = ids.iter().map(DocumentId::deref).collect::<Vec<_>>();
501
7891
            ids.sort();
502
7891
            let keys_and_values = tree.get_multiple(&ids).map_err(Error::from)?;
503

            
504
7891
            keys_and_values
505
7891
                .into_iter()
506
7897
                .map(|(_, value)| deserialize_document(&value).map(BorrowedDocument::into_owned))
507
7891
                .collect::<Result<Vec<_>, Error>>()
508
7891
        })
509
4397
        .await
510
7891
        .unwrap()
511
7891
        .map_err(bonsaidb_core::Error::from)
512
7891
    }
513

            
514
488
    pub(crate) async fn list(
515
488
        &self,
516
488
        ids: Range<DocumentId>,
517
488
        sort: Sort,
518
488
        limit: Option<usize>,
519
488
        collection: &CollectionName,
520
488
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
521
32
        let task_self = self.clone();
522
32
        let collection = collection.clone();
523
32
        tokio::task::spawn_blocking(move || {
524
32
            let tree = task_self
525
32
                .data
526
32
                .context
527
32
                .roots
528
32
                .tree(task_self.collection_tree::<Versioned, _>(
529
32
                    &collection,
530
32
                    document_tree_name(&collection),
531
32
                )?)
532
32
                .map_err(Error::from)?;
533
32
            let mut found_docs = Vec::new();
534
32
            let mut keys_read = 0;
535
32
            let ids = DocumentIdRange(ids);
536
32
            tree.scan(
537
32
                &ids.borrow_as_bytes(),
538
32
                match sort {
539
27
                    Sort::Ascending => true,
540
5
                    Sort::Descending => false,
541
                },
542
                |_, _, _| true,
543
32
                |_, _| {
544
44
                    if let Some(limit) = limit {
545
10
                        if keys_read >= limit {
546
5
                            return KeyEvaluation::Stop;
547
5
                        }
548
5

            
549
5
                        keys_read += 1;
550
34
                    }
551
39
                    KeyEvaluation::ReadData
552
44
                },
553
32
                |_, _, doc| {
554
                    found_docs.push(
555
39
                        deserialize_document(&doc)
556
39
                            .map(BorrowedDocument::into_owned)
557
39
                            .map_err(AbortError::Other)?,
558
                    );
559
39
                    Ok(())
560
39
                },
561
            )
562
32
            .map_err(|err| match err {
563
                AbortError::Other(err) => err,
564
                AbortError::Nebari(err) => crate::Error::from(err),
565
32
            })
566
32
            .unwrap();
567
32

            
568
32
            Ok(found_docs)
569
32
        })
570
28
        .await
571
32
        .unwrap()
572
32
    }
573

            
574
385585
    async fn reduce_in_view(
575
385585
        &self,
576
385585
        view_name: &ViewName,
577
385585
        key: Option<QueryKey<Bytes>>,
578
385585
        access_policy: AccessPolicy,
579
385585
    ) -> Result<Vec<u8>, bonsaidb_core::Error> {
580
15433
        let view = self
581
15433
            .data
582
15433
            .schema
583
15433
            .view_by_name(view_name)
584
15433
            .ok_or(bonsaidb_core::Error::CollectionNotFound)?;
585
15433
        let mut mappings = self
586
15435
            .grouped_reduce_in_view(view_name, key, access_policy)
587
317
            .await?;
588

            
589
15433
        let result = if mappings.len() == 1 {
590
9665
            mappings.pop().unwrap().value.into_vec()
591
        } else {
592
5768
            view.reduce(
593
5768
                &mappings
594
5768
                    .iter()
595
5905
                    .map(|map| (map.key.as_ref(), map.value.as_ref()))
596
5768
                    .collect::<Vec<_>>(),
597
5768
                true,
598
5768
            )
599
5768
            .map_err(Error::from)?
600
        };
601

            
602
15428
        Ok(result)
603
15433
    }
604

            
605
385789
    async fn grouped_reduce_in_view(
606
385789
        &self,
607
385789
        view_name: &ViewName,
608
385789
        key: Option<QueryKey<Bytes>>,
609
385789
        access_policy: AccessPolicy,
610
385789
    ) -> Result<Vec<MappedSerializedValue>, bonsaidb_core::Error> {
611
15445
        let view = self
612
15445
            .data
613
15445
            .schema
614
15445
            .view_by_name(view_name)
615
15445
            .ok_or(bonsaidb_core::Error::CollectionNotFound)?;
616
15445
        let mut mappings = Vec::new();
617
15596
        self.for_each_in_view(view, key, Sort::Ascending, None, access_policy, |entry| {
618
9839
            let entry = ViewEntry::from(entry);
619
9839
            mappings.push(MappedSerializedValue {
620
9839
                key: entry.key,
621
9839
                value: entry.reduced_value,
622
9839
            });
623
9839
            Ok(())
624
15598
        })
625
326
        .await?;
626

            
627
15445
        Ok(mappings)
628
15445
    }
629

            
630
398850
    fn apply_transaction_to_roots(
631
398850
        &self,
632
398850
        transaction: &Transaction,
633
398850
    ) -> Result<Vec<OperationResult>, Error> {
634
398850
        let mut open_trees = OpenTrees::default();
635
991699
        for op in &transaction.operations {
636
592926
            if !self.data.schema.contains_collection_id(&op.collection) {
637
77
                return Err(Error::Core(bonsaidb_core::Error::CollectionNotFound));
638
592849
            }
639

            
640
            #[cfg(any(feature = "encryption", feature = "compression"))]
641
592849
            let vault = if let Some(encryption_key) =
642
592849
                self.collection_encryption_key(&op.collection).cloned()
643
            {
644
                #[cfg(feature = "encryption")]
645
2436
                if let Some(mut vault) = self.storage().tree_vault().cloned() {
646
1260
                    vault.key = Some(encryption_key);
647
1260
                    Some(vault)
648
                } else {
649
1176
                    TreeVault::new_if_needed(
650
1176
                        Some(encryption_key),
651
1176
                        self.storage().vault(),
652
1176
                        #[cfg(feature = "compression")]
653
1176
                        None,
654
1176
                    )
655
                }
656

            
657
                #[cfg(not(feature = "encryption"))]
658
                {
659
                    drop(encryption_key);
660
                    return Err(Error::EncryptionDisabled);
661
                }
662
            } else {
663
590413
                self.storage().tree_vault().cloned()
664
            };
665

            
666
592849
            open_trees.open_trees_for_document_change(
667
592849
                &op.collection,
668
592849
                &self.data.schema,
669
592849
                #[cfg(any(feature = "encryption", feature = "compression"))]
670
592849
                vault,
671
592849
            );
672
        }
673

            
674
398773
        let mut roots_transaction = self
675
398773
            .data
676
398773
            .context
677
398773
            .roots
678
398773
            .transaction::<_, dyn AnyTreeRoot<AnyFile>>(&open_trees.trees)?;
679

            
680
398773
        let mut results = Vec::new();
681
398773
        let mut changed_documents = Vec::new();
682
978559
        for op in &transaction.operations {
683
592849
            let result = self.execute_operation(
684
592849
                op,
685
592849
                &mut roots_transaction,
686
592849
                &open_trees.trees_index_by_name,
687
592849
            )?;
688

            
689
579786
            match &result {
690
543976
                OperationResult::DocumentUpdated { header, collection } => {
691
543976
                    changed_documents.push(ChangedDocument {
692
543976
                        collection: collection.clone(),
693
543976
                        id: header.id,
694
543976
                        deleted: false,
695
543976
                    });
696
543976
                }
697
35810
                OperationResult::DocumentDeleted { id, collection } => {
698
35810
                    changed_documents.push(ChangedDocument {
699
35810
                        collection: collection.clone(),
700
35810
                        id: *id,
701
35810
                        deleted: true,
702
35810
                    });
703
35810
                }
704
                OperationResult::Success => {}
705
            }
706
579786
            results.push(result);
707
        }
708

            
709
        // Insert invalidations for each record changed
710
387285
        for (collection, changed_documents) in &changed_documents
711
385710
            .iter()
712
579786
            .group_by(|doc| doc.collection.clone())
713
        {
714
387284
            if let Some(views) = self.data.schema.views_in_collection(&collection) {
715
228908
                let changed_documents = changed_documents.collect::<Vec<_>>();
716
1000040
                for view in views {
717
771132
                    if !view.unique() {
718
734824
                        let view_name = view.view_name();
719
1536631
                        for changed_document in &changed_documents {
720
801807
                            let invalidated_docs = roots_transaction
721
801807
                                .tree::<Unversioned>(
722
801807
                                    open_trees.trees_index_by_name
723
801807
                                        [&view_invalidated_docs_tree_name(&view_name)],
724
801807
                                )
725
801807
                                .unwrap();
726
801807
                            invalidated_docs.set(changed_document.id.as_ref().to_vec(), b"")?;
727
                        }
728
36308
                    }
729
                }
730
158376
            }
731
        }
732

            
733
385710
        roots_transaction
734
385710
            .entry_mut()
735
385710
            .set_data(pot::to_vec(&Changes::Documents(changed_documents))?)?;
736

            
737
385710
        roots_transaction.commit()?;
738

            
739
385710
        Ok(results)
740
398850
    }
741

            
742
592849
    fn execute_operation(
743
592849
        &self,
744
592849
        operation: &Operation,
745
592849
        transaction: &mut ExecutingTransaction<AnyFile>,
746
592849
        tree_index_map: &HashMap<String, usize>,
747
592849
    ) -> Result<OperationResult, Error> {
748
592849
        match &operation.command {
749
421976
            Command::Insert { id, contents } => {
750
421976
                self.execute_insert(operation, transaction, tree_index_map, *id, contents)
751
            }
752
122384
            Command::Update { header, contents } => self.execute_update(
753
122384
                operation,
754
122384
                transaction,
755
122384
                tree_index_map,
756
122384
                header.id,
757
122384
                Some(&header.revision),
758
122384
                contents,
759
122384
            ),
760
12679
            Command::Overwrite { id, contents } => {
761
12679
                self.execute_update(operation, transaction, tree_index_map, *id, None, contents)
762
            }
763
35810
            Command::Delete { header } => {
764
35810
                self.execute_delete(operation, transaction, tree_index_map, header)
765
            }
766
        }
767
592849
    }
768

            
769
135063
    fn execute_update(
770
135063
        &self,
771
135063
        operation: &Operation,
772
135063
        transaction: &mut ExecutingTransaction<AnyFile>,
773
135063
        tree_index_map: &HashMap<String, usize>,
774
135063
        id: DocumentId,
775
135063
        check_revision: Option<&Revision>,
776
135063
        contents: &[u8],
777
135063
    ) -> Result<OperationResult, crate::Error> {
778
135063
        let documents = transaction
779
135063
            .tree::<Versioned>(tree_index_map[&document_tree_name(&operation.collection)])
780
135063
            .unwrap();
781
135063
        let document_id = ArcBytes::from(id.as_ref());
782
135063
        let mut result = None;
783
135063
        let mut updated = false;
784
135063
        documents.modify(
785
135063
            vec![document_id.clone()],
786
135063
            nebari::tree::Operation::CompareSwap(CompareSwap::new(&mut |_key,
787
                                                                        value: Option<
788
                ArcBytes<'_>,
789
            >| {
790
135063
                if let Some(old) = value {
791
134986
                    let doc = match deserialize_document(&old) {
792
134986
                        Ok(doc) => doc,
793
                        Err(err) => {
794
                            result = Some(Err(err));
795
                            return nebari::tree::KeyOperation::Skip;
796
                        }
797
                    };
798
134986
                    if check_revision.is_none() || Some(&doc.header.revision) == check_revision {
799
134909
                        if let Some(updated_revision) = doc.header.revision.next_revision(contents)
800
                        {
801
134755
                            let updated_header = Header {
802
134755
                                id,
803
134755
                                revision: updated_revision,
804
134755
                            };
805
134755
                            let serialized_doc = match serialize_document(&BorrowedDocument {
806
134755
                                header: updated_header.clone(),
807
134755
                                contents: CowBytes::from(contents),
808
134755
                            }) {
809
134755
                                Ok(bytes) => bytes,
810
                                Err(err) => {
811
                                    result = Some(Err(Error::from(err)));
812
                                    return nebari::tree::KeyOperation::Skip;
813
                                }
814
                            };
815
134755
                            result = Some(Ok(OperationResult::DocumentUpdated {
816
134755
                                collection: operation.collection.clone(),
817
134755
                                header: updated_header,
818
134755
                            }));
819
134755
                            updated = true;
820
134755
                            return nebari::tree::KeyOperation::Set(ArcBytes::from(serialized_doc));
821
154
                        }
822
154

            
823
154
                        // If no new revision was made, it means an attempt to
824
154
                        // save a document with the same contents was made.
825
154
                        // We'll return a success but not actually give a new
826
154
                        // version
827
154
                        result = Some(Ok(OperationResult::DocumentUpdated {
828
154
                            collection: operation.collection.clone(),
829
154
                            header: doc.header,
830
154
                        }));
831
77
                    } else {
832
77
                        result = Some(Err(Error::Core(bonsaidb_core::Error::DocumentConflict(
833
77
                            operation.collection.clone(),
834
77
                            Box::new(doc.header),
835
77
                        ))));
836
77
                    }
837
77
                } else if check_revision.is_none() {
838
                    let doc = BorrowedDocument::new(id, contents);
839
                    match serialize_document(&doc).map(|bytes| (doc, bytes)) {
840
                        Ok((doc, serialized)) => {
841
                            result = Some(Ok(OperationResult::DocumentUpdated {
842
                                collection: operation.collection.clone(),
843
                                header: doc.header,
844
                            }));
845
                            return nebari::tree::KeyOperation::Set(ArcBytes::from(serialized));
846
                        }
847
                        Err(err) => {
848
                            result = Some(Err(Error::from(err)));
849
                        }
850
                    }
851
77
                } else {
852
77
                    result = Some(Err(Error::Core(bonsaidb_core::Error::DocumentNotFound(
853
77
                        operation.collection.clone(),
854
77
                        Box::new(id),
855
77
                    ))));
856
77
                }
857
308
                nebari::tree::KeyOperation::Skip
858
135063
            })),
859
135063
        )?;
860

            
861
135063
        if updated {
862
134755
            self.update_unique_views(&document_id, operation, transaction, tree_index_map)?;
863
308
        }
864

            
865
134909
        result.expect("nebari should invoke the callback even when the key isn't found")
866
135063
    }
867

            
868
421976
    fn execute_insert(
869
421976
        &self,
870
421976
        operation: &Operation,
871
421976
        transaction: &mut ExecutingTransaction<AnyFile>,
872
421976
        tree_index_map: &HashMap<String, usize>,
873
421976
        id: Option<DocumentId>,
874
421976
        contents: &[u8],
875
421976
    ) -> Result<OperationResult, Error> {
876
421976
        let documents = transaction
877
421976
            .tree::<Versioned>(tree_index_map[&document_tree_name(&operation.collection)])
878
421976
            .unwrap();
879
421976
        let id = if let Some(id) = id {
880
201433
            id
881
220543
        } else if let Some(last_key) = documents.last_key()? {
882
201543
            let id = DocumentId::try_from(last_key.as_slice())?;
883
201543
            self.data
884
201543
                .schema
885
201543
                .next_id_for_collection(&operation.collection, Some(id))?
886
        } else {
887
19000
            self.data
888
19000
                .schema
889
19000
                .next_id_for_collection(&operation.collection, None)?
890
        };
891

            
892
421951
        let doc = BorrowedDocument::new(id, contents);
893
421951
        let serialized: Vec<u8> = serialize_document(&doc)?;
894
421951
        let document_id = ArcBytes::from(doc.header.id.as_ref().to_vec());
895
421951
        if let Some(document) = documents.replace(document_id.clone(), serialized)? {
896
12603
            let doc = deserialize_document(&document)?;
897
12603
            Err(Error::Core(bonsaidb_core::Error::DocumentConflict(
898
12603
                operation.collection.clone(),
899
12603
                Box::new(doc.header),
900
12603
            )))
901
        } else {
902
409348
            self.update_unique_views(&document_id, operation, transaction, tree_index_map)?;
903

            
904
409221
            Ok(OperationResult::DocumentUpdated {
905
409221
                collection: operation.collection.clone(),
906
409221
                header: doc.header,
907
409221
            })
908
        }
909
421976
    }
910

            
911
35810
    fn execute_delete(
912
35810
        &self,
913
35810
        operation: &Operation,
914
35810
        transaction: &mut ExecutingTransaction<AnyFile>,
915
35810
        tree_index_map: &HashMap<String, usize>,
916
35810
        header: &Header,
917
35810
    ) -> Result<OperationResult, Error> {
918
35810
        let documents = transaction
919
35810
            .tree::<Versioned>(tree_index_map[&document_tree_name(&operation.collection)])
920
35810
            .unwrap();
921
35810
        if let Some(vec) = documents.remove(header.id.as_ref())? {
922
35810
            let doc = deserialize_document(&vec)?;
923
35810
            if &doc.header == header {
924
35810
                self.update_unique_views(
925
35810
                    doc.header.id.as_ref(),
926
35810
                    operation,
927
35810
                    transaction,
928
35810
                    tree_index_map,
929
35810
                )?;
930

            
931
35810
                Ok(OperationResult::DocumentDeleted {
932
35810
                    collection: operation.collection.clone(),
933
35810
                    id: header.id,
934
35810
                })
935
            } else {
936
                Err(Error::Core(bonsaidb_core::Error::DocumentConflict(
937
                    operation.collection.clone(),
938
                    Box::new(header.clone()),
939
                )))
940
            }
941
        } else {
942
            Err(Error::Core(bonsaidb_core::Error::DocumentNotFound(
943
                operation.collection.clone(),
944
                Box::new(header.id),
945
            )))
946
        }
947
35810
    }
948

            
949
    fn update_unique_views(
950
        &self,
951
        document_id: &[u8],
952
        operation: &Operation,
953
        transaction: &mut ExecutingTransaction<AnyFile>,
954
        tree_index_map: &HashMap<String, usize>,
955
    ) -> Result<(), Error> {
956
579913
        if let Some(unique_views) = self
957
579913
            .data
958
579913
            .schema
959
579913
            .unique_views_in_collection(&operation.collection)
960
        {
961
145097
            for view in unique_views {
962
72689
                let name = view.view_name();
963
72689
                mapper::DocumentRequest {
964
72689
                    database: self,
965
72689
                    document_id,
966
72689
                    map_request: &mapper::Map {
967
72689
                        database: self.data.name.clone(),
968
72689
                        collection: operation.collection.clone(),
969
72689
                        view_name: name.clone(),
970
72689
                    },
971
72689
                    transaction,
972
72689
                    document_map_index: tree_index_map[&view_document_map_tree_name(&name)],
973
72689
                    documents_index: tree_index_map[&document_tree_name(&operation.collection)],
974
72689
                    omitted_entries_index: tree_index_map[&view_omitted_docs_tree_name(&name)],
975
72689
                    view_entries_index: tree_index_map[&view_entries_tree_name(&name)],
976
72689
                    view,
977
72689
                }
978
72689
                .map()?;
979
            }
980
507224
        }
981

            
982
579632
        Ok(())
983
579913
    }
984

            
985
623700
    fn create_view_iterator<'a, K: for<'k> Key<'k> + 'a>(
986
623700
        view_entries: &'a Tree<Unversioned, AnyFile>,
987
623700
        key: Option<QueryKey<K>>,
988
623700
        order: Sort,
989
623700
        limit: Option<usize>,
990
623700
    ) -> Result<Vec<ViewEntryCollection>, Error> {
991
623700
        let mut values = Vec::new();
992
623700
        let forwards = match order {
993
623623
            Sort::Ascending => true,
994
77
            Sort::Descending => false,
995
        };
996
623700
        let mut values_read = 0;
997
623700
        if let Some(key) = key {
998
620390
            match key {
999
229
                QueryKey::Range(range) => {
229
                    let range = range
229
                        .as_ord_bytes()
229
                        .map_err(view::Error::key_serialization)?;
229
                    view_entries.scan::<Infallible, _, _, _, _>(
458
                        &range.map_ref(|bytes| &bytes[..]),
229
                        forwards,
529
                        |_, _, _| true,
229
                        |_, _| {
1308
                            if let Some(limit) = limit {
154
                                if values_read >= limit {
77
                                    return KeyEvaluation::Stop;
77
                                }
77
                                values_read += 1;
1154
                            }
1231
                            KeyEvaluation::ReadData
1308
                        },
1231
                        |_key, _index, value| {
1231
                            values.push(value);
1231
                            Ok(())
1231
                        },
229
                    )?;
                }
619982
                QueryKey::Matches(key) => {
619982
                    let key = key
619982
                        .as_ord_bytes()
619982
                        .map_err(view::Error::key_serialization)?
619982
                        .to_vec();
619982

            
619982
                    values.extend(view_entries.get(&key)?);
                }
179
                QueryKey::Multiple(list) => {
179
                    let mut list = list
179
                        .into_iter()
358
                        .map(|key| {
358
                            key.as_ord_bytes()
358
                                .map(|bytes| bytes.to_vec())
358
                                .map_err(view::Error::key_serialization)
358
                        })
179
                        .collect::<Result<Vec<_>, _>>()?;

            
179
                    list.sort();
179

            
179
                    values.extend(
179
                        view_entries
179
                            .get_multiple(&list.iter().map(Vec::as_slice).collect::<Vec<_>>())?
179
                            .into_iter()
358
                            .map(|(_, value)| value),
                    );
                }
            }
        } else {
3310
            view_entries.scan::<Infallible, _, _, _, _>(
3310
                &(..),
3310
                forwards,
3310
                |_, _, _| true,
3310
                |_, _| {
4479
                    if let Some(limit) = limit {
                        if values_read >= limit {
                            return KeyEvaluation::Stop;
                        }
                        values_read += 1;
4479
                    }
4479
                    KeyEvaluation::ReadData
4535
                },
4535
                |_, _, value| {
4479
                    values.push(value);
4479
                    Ok(())
4535
                },
3310
            )?;
        }

            
623700
        values
623700
            .into_iter()
623700
            .map(|value| bincode::deserialize(&value).map_err(Error::from))
623700
            .collect::<Result<Vec<_>, Error>>()
623700
    }

            
    #[cfg(any(feature = "encryption", feature = "compression"))]
2438559
    pub(crate) fn collection_encryption_key(&self, collection: &CollectionName) -> Option<&KeyId> {
2438559
        self.schematic()
2438559
            .encryption_key_for_collection(collection)
2438559
            .or_else(|| self.storage().default_encryption_key())
2438559
    }

            
    #[cfg_attr(
        not(feature = "encryption"),
        allow(
            unused_mut,
            unused_variables,
            clippy::unused_self,
            clippy::let_and_return
        )
    )]
    #[allow(clippy::unnecessary_wraps)]
1845760
    pub(crate) fn collection_tree<R: Root, S: Into<Cow<'static, str>>>(
1845760
        &self,
1845760
        collection: &CollectionName,
1845760
        name: S,
1845760
    ) -> Result<TreeRoot<R, AnyFile>, Error> {
1845760
        let mut tree = R::tree(name);
1845760

            
1845760
        #[cfg(any(feature = "encryption", feature = "compression"))]
1845760
        match (
1845760
            self.collection_encryption_key(collection),
1845760
            self.storage().tree_vault().cloned(),
        ) {
18923
            (Some(override_key), Some(mut vault)) => {
18923
                #[cfg(feature = "encryption")]
18923
                {
18923
                    vault.key = Some(override_key.clone());
18923
                    tree = tree.with_vault(vault);
18923
                }

            
                #[cfg(not(feature = "encryption"))]
                {
                    return Err(Error::EncryptionDisabled);
                }
            }
426588
            (None, Some(vault)) => {
426588
                tree = tree.with_vault(vault);
426588
            }
1400249
            (key, None) => {
                #[cfg(feature = "encryption")]
1400249
                if let Some(vault) = TreeVault::new_if_needed(
1400249
                    key.cloned(),
1400249
                    self.storage().vault(),
1400249
                    #[cfg(feature = "compression")]
1400249
                    None,
1400249
                ) {
23832
                    tree = tree.with_vault(vault);
1376342
                }

            
                #[cfg(not(feature = "encryption"))]
                if key.is_some() {
                    return Err(Error::EncryptionDisabled);
                }
            }
        }

            
1845685
        Ok(tree)
1845685
    }

            
1
    pub(crate) async fn update_key_expiration_async<'key>(
1
        &self,
1
        tree_key: impl Into<Cow<'key, str>>,
1
        expiration: Option<Timestamp>,
1
    ) {
1
        self.data
1
            .context
1
            .update_key_expiration_async(tree_key, expiration)
            .await;
1
    }
}
#[derive(Serialize, Deserialize)]
struct LegacyHeader {
    id: u64,
    revision: Revision,
}
#[derive(Serialize, Deserialize)]
struct LegacyDocument<'a> {
    header: LegacyHeader,
    #[serde(borrow)]
    contents: &'a [u8],
}

            
818458
pub(crate) fn deserialize_document(bytes: &[u8]) -> Result<BorrowedDocument<'_>, Error> {
818458
    match pot::from_slice::<BorrowedDocument<'_>>(bytes) {
818458
        Ok(document) => Ok(document),
        Err(err) => match bincode::deserialize::<LegacyDocument<'_>>(bytes) {
            Ok(legacy_doc) => Ok(BorrowedDocument {
                header: Header {
                    id: DocumentId::from_u64(legacy_doc.header.id),
                    revision: legacy_doc.header.revision,
                },
                contents: CowBytes::from(legacy_doc.contents),
            }),
            Err(_) => Err(Error::from(err)),
        },
    }
818458
}

            
556706
fn serialize_document(document: &BorrowedDocument<'_>) -> Result<Vec<u8>, bonsaidb_core::Error> {
556706
    pot::to_vec(document)
556706
        .map_err(Error::from)
556706
        .map_err(bonsaidb_core::Error::from)
556706
}

            
#[async_trait]
impl Connection for Database {
1196450
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(transaction)))]
    async fn apply_transaction(
        &self,
        transaction: Transaction,
398850
    ) -> Result<Vec<OperationResult>, bonsaidb_core::Error> {
398850
        let task_self = self.clone();
398850
        let mut unique_view_tasks = Vec::new();
400425
        for collection_name in transaction
398850
            .operations
398850
            .iter()
592926
            .map(|op| &op.collection)
398850
            .collect::<HashSet<_>>()
        {
400424
            if let Some(views) = self.data.schema.views_in_collection(collection_name) {
1064437
                for view in views {
822491
                    if view.unique() {
36589
                        if let Some(task) = self
36589
                            .data
36589
                            .storage
36589
                            .tasks()
36589
                            .spawn_integrity_check(view, self)
                            .await?
810
                        {
810
                            unique_view_tasks.push(task);
35754
                        }
785902
                    }
                }
158453
            }
        }

            
398850
        let mut unique_view_mapping_tasks = Vec::new();
399660
        for task in unique_view_tasks {
810
            if let Some(spawned_task) = task
810
                .receive()
785
                .await
810
                .map_err(Error::from)?
810
                .map_err(Error::from)?
50
            {
50
                unique_view_mapping_tasks.push(spawned_task);
760
            }
        }
398900
        for task in unique_view_mapping_tasks {
75
            let mut task = fast_async_lock!(task);
75
            if let Some(task) = task.take() {
75
                task.receive()
50
                    .await
50
                    .map_err(Error::from)?
50
                    .map_err(Error::from)?;
            }
        }

            
398850
        tokio::task::spawn_blocking(move || task_self.apply_transaction_to_roots(&transaction))
336422
            .await
398850
            .map_err(|err| bonsaidb_core::Error::Database(err.to_string()))?
398850
            .map_err(bonsaidb_core::Error::from)
797700
    }

            
17823
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(id)))]
    async fn get<C, PrimaryKey>(
        &self,
        id: PrimaryKey,
    ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error>
    where
        C: schema::Collection,
        PrimaryKey: Into<AnyDocumentId<C::PrimaryKey>> + Send,
5941
    {
5941
        self.get_from_collection_id(id.into().to_document_id()?, &C::collection_name())
3101
            .await
11882
    }

            
12141
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(ids)))]
    async fn get_multiple<C, PrimaryKey, DocumentIds, I>(
        &self,
        ids: DocumentIds,
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error>
    where
        C: schema::Collection,
        DocumentIds: IntoIterator<Item = PrimaryKey, IntoIter = I> + Send + Sync,
        I: Iterator<Item = PrimaryKey> + Send + Sync,
        PrimaryKey: Into<AnyDocumentId<C::PrimaryKey>> + Send + Sync,
4047
    {
        self.get_multiple_from_collection_id(
4047
            &ids.into_iter()
4053
                .map(|id| id.into().to_document_id())
4047
                .collect::<Result<Vec<_>, _>>()?,
4047
            &C::collection_name(),
2211
        )
2211
        .await
8094
    }

            
36
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(ids, order, limit)))]
    async fn list<C, R, PrimaryKey>(
        &self,
        ids: R,
        order: Sort,
        limit: Option<usize>,
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error>
    where
        C: schema::Collection,
        R: Into<Range<PrimaryKey>> + Send,
        PrimaryKey: Into<AnyDocumentId<C::PrimaryKey>> + Send,
12
    {
        self.list(
18
            ids.into().map_result(|id| id.into().to_document_id())?,
12
            order,
12
            limit,
12
            &C::collection_name(),
12
        )
12
        .await
24
    }

            
    #[cfg_attr(
        feature = "tracing",
79308
        tracing::instrument(skip(starting_id, result_limit))
    )]
    async fn list_executed_transactions(
        &self,
        starting_id: Option<u64>,
        result_limit: Option<usize>,
26436
    ) -> Result<Vec<transaction::Executed>, bonsaidb_core::Error> {
26436
        let result_limit = result_limit
26436
            .unwrap_or(LIST_TRANSACTIONS_DEFAULT_RESULT_COUNT)
26436
            .min(LIST_TRANSACTIONS_MAX_RESULTS);
26436
        if result_limit > 0 {
26359
            let task_self = self.clone();
26359
            tokio::task::spawn_blocking::<_, Result<Vec<transaction::Executed>, Error>>(move || {
26359
                let range = if let Some(starting_id) = starting_id {
13449
                    Range::from(starting_id..)
                } else {
12910
                    Range::from(..)
                };

            
26359
                let mut entries = Vec::new();
200122
                task_self.roots().transactions().scan(range, |entry| {
200122
                    entries.push(entry);
200122
                    entries.len() < result_limit
200122
                })?;

            
26359
                entries
26359
                    .into_iter()
26359
                    .map(|entry| {
200122
                        if let Some(data) = entry.data() {
200122
                            let changes = match pot::from_slice(data) {
200122
                                Ok(changes) => changes,
                                Err(pot::Error::NotAPot) => {
                                    Changes::Documents(bincode::deserialize(entry.data().unwrap())?)
                                }
                                other => other?,
                            };
200122
                            Ok(Some(transaction::Executed {
200122
                                id: entry.id,
200122
                                changes,
200122
                            }))
                        } else {
                            Ok(None)
                        }
200122
                    })
26359
                    .filter_map(Result::transpose)
26359
                    .collect::<Result<Vec<_>, Error>>()
26359
            })
21108
            .await
26359
            .unwrap()
26359
            .map_err(bonsaidb_core::Error::from)
        } else {
            // A request was made to return an empty result? This should probably be
            // an error, but technically this is a correct response.
77
            Ok(Vec::default())
        }
52872
    }

            
    #[cfg_attr(
        feature = "tracing",
121728
        tracing::instrument(skip(key, order, limit, access_policy))
    )]
    #[must_use]
    async fn query<V: schema::SerializedView>(
        &self,
        key: Option<QueryKey<V::Key>>,
        order: Sort,
        limit: Option<usize>,
        access_policy: AccessPolicy,
    ) -> Result<Vec<Map<V::Key, V::Value>>, bonsaidb_core::Error>
    where
        Self: Sized,
40576
    {
40576
        let mut results = Vec::new();
40576
        self.for_each_view_entry::<V, _>(key, order, limit, access_policy, |collection| {
18002
            let entry = ViewEntry::from(collection);
18002
            let key = <V::Key as Key>::from_ord_bytes(&entry.key)
18002
                .map_err(view::Error::key_serialization)
18002
                .map_err(Error::from)?;
36037
            for entry in entry.mappings {
                results.push(Map::new(
18035
                    entry.source,
18035
                    key.clone(),
18035
                    V::deserialize(&entry.value)?,
                ));
            }
18002
            Ok(())
40576
        })
36059
        .await?;

            
40576
        Ok(results)
81152
    }

            
    #[cfg_attr(
        feature = "tracing",
12106
        tracing::instrument(skip(key, order, limit, access_policy))
    )]
    async fn query_with_docs<V: schema::SerializedView>(
        &self,
        key: Option<QueryKey<V::Key>>,
        order: Sort,
        limit: Option<usize>,
        access_policy: AccessPolicy,
    ) -> Result<MappedDocuments<OwnedDocument, V>, bonsaidb_core::Error>
    where
        Self: Sized,
4036
    {
4038
        let results = Connection::query::<V>(self, key, order, limit, access_policy).await?;

            
4036
        let documents = self
4040
            .get_multiple::<V::Collection, _, _, _>(results.iter().map(|m| m.source.id))
2200
            .await?
4036
            .into_iter()
4040
            .map(|doc| (doc.header.id, doc))
4036
            .collect::<BTreeMap<_, _>>();
4036

            
4036
        Ok(MappedDocuments {
4036
            mappings: results,
4036
            documents,
4036
        })
8072
    }

            
23172
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(key, access_policy)))]
    async fn reduce<V: schema::SerializedView>(
        &self,
        key: Option<QueryKey<V::Key>>,
        access_policy: AccessPolicy,
    ) -> Result<V::Value, bonsaidb_core::Error>
    where
        Self: Sized,
7724
    {
7724
        let view = self
7724
            .data
7724
            .schema
7724
            .view::<V>()
7724
            .expect("query made with view that isn't registered with this database");

            
7721
        let result = self
            .reduce_in_view(
7724
                &view.view_name(),
7724
                key.map(|key| key.serialized()).transpose()?,
7724
                access_policy,
167
            )
167
            .await?;
7721
        let value = V::deserialize(&result)?;

            
7721
        Ok(value)
15448
    }

            
18
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(key, access_policy)))]
    async fn reduce_grouped<V: schema::SerializedView>(
        &self,
        key: Option<QueryKey<V::Key>>,
        access_policy: AccessPolicy,
    ) -> Result<Vec<MappedValue<V::Key, V::Value>>, bonsaidb_core::Error>
    where
        Self: Sized,
6
    {
6
        let view = self
6
            .data
6
            .schema
6
            .view::<V>()
6
            .expect("query made with view that isn't registered with this database");

            
6
        let results = self
            .grouped_reduce_in_view(
6
                &view.view_name(),
6
                key.map(|key| key.serialized()).transpose()?,
6
                access_policy,
3
            )
3
            .await?;
6
        results
6
            .into_iter()
9
            .map(|map| {
9
                Ok(MappedValue::new(
9
                    V::Key::from_ord_bytes(&map.key).map_err(view::Error::key_serialization)?,
9
                    V::deserialize(&map.value)?,
                ))
9
            })
6
            .collect::<Result<Vec<_>, bonsaidb_core::Error>>()
12
    }

            
3
    async fn delete_docs<V: schema::SerializedView>(
3
        &self,
3
        key: Option<QueryKey<V::Key>>,
3
        access_policy: AccessPolicy,
3
    ) -> Result<u64, bonsaidb_core::Error>
3
    where
3
        Self: Sized,
3
    {
3
        let collection = <V::Collection as Collection>::collection_name();
3
        let mut transaction = Transaction::default();
3
        self.for_each_view_entry::<V, _>(
3
            key,
3
            Sort::Ascending,
3
            None,
3
            access_policy,
3
            |entry_collection| {
3
                let entry = ViewEntry::from(entry_collection);

            
9
                for mapping in entry.mappings {
6
                    transaction.push(Operation::delete(collection.clone(), mapping.source));
6
                }

            
3
                Ok(())
3
            },
3
        )
3
        .await?;

            
3
        let results = Connection::apply_transaction(self, transaction).await?;

            
3
        Ok(results.len() as u64)
6
    }

            
1124037
    #[cfg_attr(feature = "tracing", tracing::instrument)]
374679
    async fn last_transaction_id(&self) -> Result<Option<u64>, bonsaidb_core::Error> {
374679
        Ok(self.roots().transactions().current_transaction_id())
374679
    }

            
9
    #[cfg_attr(feature = "tracing", tracing::instrument)]
3
    async fn compact_collection<C: schema::Collection>(&self) -> Result<(), bonsaidb_core::Error> {
3
        self.storage()
3
            .tasks()
3
            .compact_collection(self.clone(), C::collection_name())
3
            .await?;
3
        Ok(())
6
    }

            
231
    #[cfg_attr(feature = "tracing", tracing::instrument)]
77
    async fn compact(&self) -> Result<(), bonsaidb_core::Error> {
77
        self.storage()
77
            .tasks()
77
            .compact_database(self.clone())
77
            .await?;
77
        Ok(())
154
    }

            
231
    #[cfg_attr(feature = "tracing", tracing::instrument)]
77
    async fn compact_key_value_store(&self) -> Result<(), bonsaidb_core::Error> {
77
        self.storage()
77
            .tasks()
77
            .compact_key_value_store(self.clone())
77
            .await?;
77
        Ok(())
154
    }
}

            
type ViewIterator<'a> =
    Box<dyn Iterator<Item = Result<(ArcBytes<'static>, ArcBytes<'static>), Error>> + 'a>;

            
struct ViewEntryCollectionIterator<'a> {
    iterator: ViewIterator<'a>,
}

            
impl<'a> Iterator for ViewEntryCollectionIterator<'a> {
    type Item = Result<ViewEntryCollection, crate::Error>;

            
    fn next(&mut self) -> Option<Self::Item> {
        self.iterator.next().map(|item| {
            item.map_err(crate::Error::from)
                .and_then(|(_, value)| bincode::deserialize(&value).map_err(Error::from))
        })
    }
}

            
1317963
#[derive(Debug, Clone)]
pub(crate) struct Context {
    data: Arc<ContextData>,
}

            
impl Deref for Context {
    type Target = ContextData;

            
2676977
    fn deref(&self) -> &Self::Target {
2676977
        &self.data
2676977
    }
}

            
#[derive(Debug)]
pub(crate) struct ContextData {
    pub(crate) roots: Roots<AnyFile>,
    key_value_state: Arc<Mutex<keyvalue::KeyValueState>>,
    runtime: tokio::runtime::Handle,
}

            
impl Borrow<Roots<AnyFile>> for Context {
    fn borrow(&self) -> &Roots<AnyFile> {
        &self.data.roots
    }
}

            
impl Context {
20845
    pub(crate) fn new(roots: Roots<AnyFile>, key_value_persistence: KeyValuePersistence) -> Self {
20845
        let (background_sender, background_receiver) =
20845
            watch::channel(BackgroundWorkerProcessTarget::Never);
20845
        let key_value_state = Arc::new(Mutex::new(keyvalue::KeyValueState::new(
20845
            key_value_persistence,
20845
            roots.clone(),
20845
            background_sender,
20845
        )));
20845
        let context = Self {
20845
            data: Arc::new(ContextData {
20845
                roots,
20845
                key_value_state: key_value_state.clone(),
20845
                runtime: tokio::runtime::Handle::current(),
20845
            }),
20845
        };
20845
        tokio::task::spawn(keyvalue::background_worker(
20845
            key_value_state,
20845
            background_receiver,
20845
        ));
20845
        context
20845
    }

            
778739
    pub(crate) async fn perform_kv_operation(
778739
        &self,
778739
        op: KeyOperation,
778739
    ) -> Result<Output, bonsaidb_core::Error> {
778764
        let mut state = fast_async_lock!(self.data.key_value_state);
778764
        state
778764
            .perform_kv_operation(op, &self.data.key_value_state)
            .await
778764
    }

            
11
    pub(crate) async fn update_key_expiration_async<'key>(
11
        &self,
11
        tree_key: impl Into<Cow<'key, str>>,
11
        expiration: Option<Timestamp>,
11
    ) {
11
        let mut state = fast_async_lock!(self.data.key_value_state);
11
        state.update_key_expiration(tree_key, expiration);
11
    }
}

            
impl Drop for ContextData {
18871
    fn drop(&mut self) {
18871
        let key_value_state = self.key_value_state.clone();
18871
        self.runtime.spawn(async move {
14409
            let mut state = fast_async_lock!(key_value_state);
14409
            state.shutdown(&key_value_state).await
18871
        });
18871
    }
}

            
1938476
pub fn document_tree_name(collection: &CollectionName) -> String {
1938476
    format!("collection.{:#}", collection)
1938476
}

            
pub struct DocumentIdRange(Range<DocumentId>);

            
impl<'a> BorrowByteRange<'a> for DocumentIdRange {
488
    fn borrow_as_bytes(&'a self) -> BorrowedRange<'a> {
488
        BorrowedRange {
488
            start: match &self.0.start {
257
                connection::Bound::Unbounded => ops::Bound::Unbounded,
231
                connection::Bound::Included(docid) => ops::Bound::Included(docid.as_ref()),
                connection::Bound::Excluded(docid) => ops::Bound::Excluded(docid.as_ref()),
            },
488
            end: match &self.0.end {
257
                connection::Bound::Unbounded => ops::Bound::Unbounded,
154
                connection::Bound::Included(docid) => ops::Bound::Included(docid.as_ref()),
77
                connection::Bound::Excluded(docid) => ops::Bound::Excluded(docid.as_ref()),
            },
        }
488
    }
}