1
use std::{
2
    borrow::{Borrow, Cow},
3
    collections::{BTreeMap, HashMap, HashSet},
4
    convert::Infallible,
5
    ops::{self, Deref},
6
    sync::Arc,
7
    u8,
8
};
9

            
10
use async_lock::Mutex;
11
use async_trait::async_trait;
12
#[cfg(any(feature = "encryption", feature = "compression"))]
13
use bonsaidb_core::document::KeyId;
14
use bonsaidb_core::{
15
    arc_bytes::{
16
        serde::{Bytes, CowBytes},
17
        ArcBytes,
18
    },
19
    connection::{self, AccessPolicy, Connection, QueryKey, Range, Sort, StorageConnection},
20
    document::{AnyDocumentId, BorrowedDocument, DocumentId, Header, OwnedDocument, Revision},
21
    key::Key,
22
    keyvalue::{KeyOperation, Output, Timestamp},
23
    limits::{LIST_TRANSACTIONS_DEFAULT_RESULT_COUNT, LIST_TRANSACTIONS_MAX_RESULTS},
24
    permissions::Permissions,
25
    schema::{
26
        self,
27
        view::{
28
            self,
29
            map::{MappedDocuments, MappedSerializedValue},
30
        },
31
        Collection, CollectionName, Map, MappedValue, Schema, Schematic, ViewName,
32
    },
33
    transaction::{
34
        self, ChangedDocument, Changes, Command, DocumentChanges, Operation, OperationResult,
35
        Transaction,
36
    },
37
};
38
use bonsaidb_utils::fast_async_lock;
39
use itertools::Itertools;
40
use nebari::{
41
    io::any::AnyFile,
42
    tree::{
43
        AnyTreeRoot, BorrowByteRange, BorrowedRange, CompareSwap, KeyEvaluation, Root, TreeRoot,
44
        Unversioned, Versioned,
45
    },
46
    AbortError, ExecutingTransaction, Roots, Tree,
47
};
48
use serde::{Deserialize, Serialize};
49
use tokio::sync::watch;
50

            
51
#[cfg(feature = "encryption")]
52
use crate::storage::TreeVault;
53
use crate::{
54
    config::{Builder, KeyValuePersistence, StorageConfiguration},
55
    database::keyvalue::BackgroundWorkerProcessTarget,
56
    error::Error,
57
    open_trees::OpenTrees,
58
    views::{
59
        mapper::{self},
60
        view_document_map_tree_name, view_entries_tree_name, view_invalidated_docs_tree_name,
61
        ViewEntry,
62
    },
63
    Storage,
64
};
65

            
66
pub mod keyvalue;
67

            
68
pub(crate) mod compat;
69
pub mod pubsub;
70

            
71
/// A local, file-based database.
72
///
73
/// ## Using `Database` to create a single database
74
///
75
/// `Database`provides an easy mechanism to open and access a single database:
76
///
77
/// ```rust
78
/// // `bonsaidb_core` is re-exported to `bonsaidb::core` or `bonsaidb_local::core`.
79
/// use bonsaidb_core::schema::Collection;
80
/// // `bonsaidb_local` is re-exported to `bonsaidb::local` if using the omnibus crate.
81
/// use bonsaidb_local::{
82
///     config::{Builder, StorageConfiguration},
83
///     Database,
84
/// };
85
/// use serde::{Deserialize, Serialize};
86
///
87
/// #[derive(Debug, Serialize, Deserialize, Collection)]
88
/// #[collection(name = "blog-posts")]
89
/// # #[collection(core = bonsaidb_core)]
90
/// struct BlogPost {
91
///     pub title: String,
92
///     pub contents: String,
93
/// }
94
///
95
/// # async fn test_fn() -> Result<(), bonsaidb_core::Error> {
96
/// let db = Database::open::<BlogPost>(StorageConfiguration::new("my-db.bonsaidb")).await?;
97
/// #     Ok(())
98
/// # }
99
/// ```
100
///
101
/// Under the hood, this initializes a [`Storage`] instance pointing at
102
/// "./my-db.bonsaidb". It then returns (or creates) a database named "default"
103
/// with the schema `BlogPost`.
104
///
105
/// In this example, `BlogPost` implements the [`Collection`] trait, and all
106
/// collections can be used as a [`Schema`].
107
#[derive(Debug)]
108
pub struct Database {
109
    pub(crate) data: Arc<Data>,
110
}
111

            
112
#[derive(Debug)]
113
pub struct Data {
114
    pub name: Arc<Cow<'static, str>>,
115
    context: Context,
116
    pub(crate) storage: Storage,
117
    pub(crate) schema: Arc<Schematic>,
118
    #[allow(dead_code)] // This code was previously used, it works, but is currently unused.
119
    pub(crate) effective_permissions: Option<Permissions>,
120
}
121
impl Clone for Database {
122
1368337
    fn clone(&self) -> Self {
123
1368337
        Self {
124
1368337
            data: self.data.clone(),
125
1368337
        }
126
1368337
    }
127
}
128

            
129
impl Database {
130
    /// Opens a local file as a bonsaidb.
131
91229
    pub(crate) async fn new<DB: Schema, S: Into<Cow<'static, str>> + Send>(
132
91229
        name: S,
133
91229
        context: Context,
134
91229
        storage: Storage,
135
91229
    ) -> Result<Self, Error> {
136
91229
        let name = name.into();
137
91229
        let schema = Arc::new(DB::schematic()?);
138
91229
        let db = Self {
139
91229
            data: Arc::new(Data {
140
91229
                name: Arc::new(name),
141
91229
                context,
142
91229
                storage: storage.clone(),
143
91229
                schema,
144
91229
                effective_permissions: None,
145
91229
            }),
146
91229
        };
147
91229

            
148
91229
        if db.data.storage.check_view_integrity_on_database_open() {
149
16
            for view in db.data.schema.views() {
150
16
                db.data
151
16
                    .storage
152
16
                    .tasks()
153
16
                    .spawn_integrity_check(view, &db)
154
                    .await?;
155
            }
156
91225
        }
157

            
158
91229
        storage.tasks().spawn_key_value_expiration_loader(&db).await;
159

            
160
91229
        Ok(db)
161
91229
    }
162

            
163
    /// Returns a clone with `effective_permissions`. Replaces any previously applied permissions.
164
    ///
165
    /// # Unstable
166
    ///
167
    /// See [this issue](https://github.com/khonsulabs/bonsaidb/issues/68).
168
    #[doc(hidden)]
169
    #[must_use]
170
    pub fn with_effective_permissions(&self, effective_permissions: Permissions) -> Self {
171
        Self {
172
            data: Arc::new(Data {
173
                name: self.data.name.clone(),
174
                context: self.data.context.clone(),
175
                storage: self.data.storage.clone(),
176
                schema: self.data.schema.clone(),
177
                effective_permissions: Some(effective_permissions),
178
            }),
179
        }
180
    }
181

            
182
    /// Returns the name of the database.
183
    #[must_use]
184
1462
    pub fn name(&self) -> &str {
185
1462
        self.data.name.as_ref()
186
1462
    }
187

            
188
    /// Creates a `Storage` with a single-database named "default" with its data stored at `path`.
189
25
    pub async fn open<DB: Schema>(configuration: StorageConfiguration) -> Result<Self, Error> {
190
403
        let storage = Storage::open(configuration.with_schema::<DB>()?).await?;
191

            
192
33
        storage.create_database::<DB>("default", true).await?;
193

            
194
25
        Ok(storage.database::<DB>("default").await?)
195
25
    }
196

            
197
    /// Returns the [`Storage`] that this database belongs to.
198
    #[must_use]
199
6385545
    pub fn storage(&self) -> &'_ Storage {
200
6385545
        &self.data.storage
201
6385545
    }
202

            
203
    /// Returns the [`Schematic`] for the schema for this database.
204
    #[must_use]
205
2675144
    pub fn schematic(&self) -> &'_ Schematic {
206
2675144
        &self.data.schema
207
2675144
    }
208

            
209
1712098
    pub(crate) fn roots(&self) -> &'_ nebari::Roots<AnyFile> {
210
1712098
        &self.data.context.roots
211
1712098
    }
212

            
213
61997
    async fn for_each_in_view<
214
61997
        F: FnMut(ViewEntry) -> Result<(), bonsaidb_core::Error> + Send + Sync,
215
61997
    >(
216
61997
        &self,
217
61997
        view: &dyn view::Serialized,
218
61997
        key: Option<QueryKey<Bytes>>,
219
61997
        order: Sort,
220
61997
        limit: Option<usize>,
221
61997
        access_policy: AccessPolicy,
222
61997
        mut callback: F,
223
61997
    ) -> Result<(), bonsaidb_core::Error> {
224
61997
        if matches!(access_policy, AccessPolicy::UpdateBefore) {
225
46335
            self.data
226
46335
                .storage
227
46335
                .tasks()
228
46381
                .update_view_if_needed(view, self)
229
42163
                .await?;
230
15662
        }
231

            
232
61997
        let view_entries = self
233
61997
            .roots()
234
61997
            .tree(self.collection_tree(
235
61997
                &view.collection(),
236
61997
                view_entries_tree_name(&view.view_name()),
237
61997
            )?)
238
61997
            .map_err(Error::from)?;
239

            
240
        {
241
61997
            for entry in Self::create_view_iterator(&view_entries, key, order, limit)? {
242
27883
                callback(entry)?;
243
            }
244
        }
245

            
246
61995
        if matches!(access_policy, AccessPolicy::UpdateAfter) {
247
5
            let db = self.clone();
248
5
            let view_name = view.view_name();
249
5
            tokio::task::spawn(async move {
250
5
                let view = db
251
5
                    .data
252
5
                    .schema
253
5
                    .view_by_name(&view_name)
254
5
                    .expect("query made with view that isn't registered with this database");
255
5
                db.data
256
5
                    .storage
257
5
                    .tasks()
258
10
                    .update_view_if_needed(view, &db)
259
10
                    .await
260
5
            });
261
61991
        }
262

            
263
61996
        Ok(())
264
61996
    }
265

            
266
42088
    async fn for_each_view_entry<
267
42088
        V: schema::View,
268
42088
        F: FnMut(ViewEntry) -> Result<(), bonsaidb_core::Error> + Send + Sync,
269
42088
    >(
270
42088
        &self,
271
42088
        key: Option<QueryKey<V::Key>>,
272
42088
        order: Sort,
273
42088
        limit: Option<usize>,
274
42088
        access_policy: AccessPolicy,
275
42088
        callback: F,
276
42089
    ) -> Result<(), bonsaidb_core::Error> {
277
42089
        let view = self
278
42089
            .data
279
42089
            .schema
280
42089
            .view::<V>()
281
42089
            .expect("query made with view that isn't registered with this database");
282
42089

            
283
42089
        self.for_each_in_view(
284
42089
            view,
285
42089
            key.map(|key| key.serialized()).transpose()?,
286
42089
            order,
287
42089
            limit,
288
42089
            access_policy,
289
42089
            callback,
290
39900
        )
291
39900
        .await
292
42088
    }
293

            
294
    #[cfg(feature = "internal-apis")]
295
    #[doc(hidden)]
296
212004
    pub async fn internal_get_from_collection_id(
297
212004
        &self,
298
212004
        id: DocumentId,
299
212004
        collection: &CollectionName,
300
212004
    ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error> {
301
8154
        self.get_from_collection_id(id, collection).await
302
8154
    }
303

            
304
    #[cfg(feature = "internal-apis")]
305
    #[doc(hidden)]
306
208
    pub async fn list_from_collection(
307
208
        &self,
308
208
        ids: Range<DocumentId>,
309
208
        order: Sort,
310
208
        limit: Option<usize>,
311
208
        collection: &CollectionName,
312
208
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
313
8
        self.list(ids, order, limit, collection).await
314
8
    }
315

            
316
    #[cfg(feature = "internal-apis")]
317
    #[doc(hidden)]
318
104
    pub async fn count_from_collection(
319
104
        &self,
320
104
        ids: Range<DocumentId>,
321
104
        collection: &CollectionName,
322
104
    ) -> Result<u64, bonsaidb_core::Error> {
323
4
        self.count(ids, collection).await
324
4
    }
325

            
326
    #[cfg(feature = "internal-apis")]
327
    #[doc(hidden)]
328
104
    pub async fn internal_get_multiple_from_collection_id(
329
104
        &self,
330
104
        ids: &[DocumentId],
331
104
        collection: &CollectionName,
332
104
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
333
4
        self.get_multiple_from_collection_id(ids, collection).await
334
4
    }
335

            
336
    #[cfg(feature = "internal-apis")]
337
    #[doc(hidden)]
338
52
    pub async fn compact_collection_by_name(
339
52
        &self,
340
52
        collection: CollectionName,
341
52
    ) -> Result<(), bonsaidb_core::Error> {
342
2
        self.storage()
343
2
            .tasks()
344
2
            .compact_collection(self.clone(), collection)
345
2
            .await?;
346
2
        Ok(())
347
2
    }
348

            
349
    #[cfg(feature = "internal-apis")]
350
    #[doc(hidden)]
351
102154
    pub async fn query_by_name(
352
102154
        &self,
353
102154
        view: &ViewName,
354
102154
        key: Option<QueryKey<Bytes>>,
355
102154
        order: Sort,
356
102154
        limit: Option<usize>,
357
102154
        access_policy: AccessPolicy,
358
102154
    ) -> Result<Vec<bonsaidb_core::schema::view::map::Serialized>, bonsaidb_core::Error> {
359
3929
        if let Some(view) = self.schematic().view_by_name(view) {
360
3929
            let mut results = Vec::new();
361
3929
            self.for_each_in_view(view, key, order, limit, access_policy, |entry| {
362
7870
                for mapping in entry.mappings {
363
3944
                    results.push(bonsaidb_core::schema::view::map::Serialized {
364
3944
                        source: mapping.source,
365
3944
                        key: entry.key.clone(),
366
3944
                        value: mapping.value,
367
3944
                    });
368
3944
                }
369
3926
                Ok(())
370
3929
            })
371
1912
            .await?;
372

            
373
3929
            Ok(results)
374
        } else {
375
            Err(bonsaidb_core::Error::CollectionNotFound)
376
        }
377
3929
    }
378

            
379
    #[cfg(feature = "internal-apis")]
380
    #[doc(hidden)]
381
100828
    pub async fn query_by_name_with_docs(
382
100828
        &self,
383
100828
        view: &ViewName,
384
100828
        key: Option<QueryKey<Bytes>>,
385
100828
        order: Sort,
386
100828
        limit: Option<usize>,
387
100828
        access_policy: AccessPolicy,
388
100828
    ) -> Result<bonsaidb_core::schema::view::map::MappedSerializedDocuments, bonsaidb_core::Error>
389
100828
    {
390
3878
        let results = self
391
3878
            .query_by_name(view, key, order, limit, access_policy)
392
1877
            .await?;
393
3878
        let view = self.schematic().view_by_name(view).unwrap(); // query() will fail if it's not present
394

            
395
3878
        let documents = self
396
3878
            .get_multiple_from_collection_id(
397
3878
                &results.iter().map(|m| m.source.id).collect::<Vec<_>>(),
398
3878
                &view.collection(),
399
3878
            )
400
2108
            .await?
401
3878
            .into_iter()
402
3878
            .map(|doc| (doc.header.id, doc))
403
3878
            .collect::<BTreeMap<_, _>>();
404
3878

            
405
3878
        Ok(
406
3878
            bonsaidb_core::schema::view::map::MappedSerializedDocuments {
407
3878
                mappings: results,
408
3878
                documents,
409
3878
            },
410
3878
        )
411
3878
    }
412

            
413
    #[cfg(feature = "internal-apis")]
414
    #[doc(hidden)]
415
207194
    pub async fn reduce_by_name(
416
207194
        &self,
417
207194
        view: &ViewName,
418
207194
        key: Option<QueryKey<Bytes>>,
419
207194
        access_policy: AccessPolicy,
420
207194
    ) -> Result<Vec<u8>, bonsaidb_core::Error> {
421
7970
        self.reduce_in_view(view, key, access_policy).await
422
7969
    }
423

            
424
    #[cfg(feature = "internal-apis")]
425
    #[doc(hidden)]
426
156
    pub async fn reduce_grouped_by_name(
427
156
        &self,
428
156
        view: &ViewName,
429
156
        key: Option<QueryKey<Bytes>>,
430
156
        access_policy: AccessPolicy,
431
156
    ) -> Result<Vec<MappedSerializedValue>, bonsaidb_core::Error> {
432
7
        self.grouped_reduce_in_view(view, key, access_policy).await
433
6
    }
434

            
435
    #[cfg(feature = "internal-apis")]
436
    #[doc(hidden)]
437
52
    pub async fn delete_docs_by_name(
438
52
        &self,
439
52
        view: &ViewName,
440
52
        key: Option<QueryKey<Bytes>>,
441
52
        access_policy: AccessPolicy,
442
52
    ) -> Result<u64, bonsaidb_core::Error> {
443
2
        let view = self
444
2
            .data
445
2
            .schema
446
2
            .view_by_name(view)
447
2
            .ok_or(bonsaidb_core::Error::CollectionNotFound)?;
448
2
        let collection = view.collection();
449
2
        let mut transaction = Transaction::default();
450
2
        self.for_each_in_view(view, key, Sort::Ascending, None, access_policy, |entry| {
451
6
            for mapping in entry.mappings {
452
4
                transaction.push(Operation::delete(collection.clone(), mapping.source));
453
4
            }
454

            
455
2
            Ok(())
456
2
        })
457
2
        .await?;
458

            
459
2
        let results = Connection::apply_transaction(self, transaction).await?;
460

            
461
2
        Ok(results.len() as u64)
462
2
    }
463

            
464
372209
    async fn get_from_collection_id(
465
372209
        &self,
466
372209
        id: DocumentId,
467
372209
        collection: &CollectionName,
468
372209
    ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error> {
469
14359
        let task_self = self.clone();
470
14359
        let collection = collection.clone();
471
14359
        tokio::task::spawn_blocking(move || {
472
14359
            let tree = task_self
473
14359
                .data
474
14359
                .context
475
14359
                .roots
476
14359
                .tree(task_self.collection_tree::<Versioned, _>(
477
14359
                    &collection,
478
14359
                    document_tree_name(&collection),
479
14359
                )?)
480
14359
                .map_err(Error::from)?;
481
14359
            if let Some(vec) = tree.get(id.as_ref()).map_err(Error::from)? {
482
13843
                Ok(Some(deserialize_document(&vec)?.into_owned()))
483
            } else {
484
515
                Ok(None)
485
            }
486
14359
        })
487
8561
        .await
488
14359
        .unwrap()
489
14359
    }
490

            
491
206660
    async fn get_multiple_from_collection_id(
492
206660
        &self,
493
206660
        ids: &[DocumentId],
494
206660
        collection: &CollectionName,
495
206660
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
496
7985
        let task_self = self.clone();
497
7985
        let mut ids = ids.to_vec();
498
7985
        let collection = collection.clone();
499
7985
        tokio::task::spawn_blocking(move || {
500
7985
            let tree = task_self
501
7985
                .data
502
7985
                .context
503
7985
                .roots
504
7985
                .tree(task_self.collection_tree::<Versioned, _>(
505
7985
                    &collection,
506
7985
                    document_tree_name(&collection),
507
7985
                )?)
508
7985
                .map_err(Error::from)?;
509
7985
            ids.sort();
510
7985
            let keys_and_values = tree
511
8003
                .get_multiple(ids.iter().map(|id| id.as_ref()))
512
7985
                .map_err(Error::from)?;
513

            
514
7985
            keys_and_values
515
7985
                .into_iter()
516
8003
                .map(|(_, value)| deserialize_document(&value).map(BorrowedDocument::into_owned))
517
7985
                .collect::<Result<Vec<_>, Error>>()
518
7985
        })
519
4631
        .await
520
7985
        .unwrap()
521
7985
        .map_err(bonsaidb_core::Error::from)
522
7985
    }
523

            
524
519
    pub(crate) async fn list(
525
519
        &self,
526
519
        ids: Range<DocumentId>,
527
519
        sort: Sort,
528
519
        limit: Option<usize>,
529
519
        collection: &CollectionName,
530
519
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
531
44
        let task_self = self.clone();
532
44
        let collection = collection.clone();
533
44
        tokio::task::spawn_blocking(move || {
534
44
            let tree = task_self
535
44
                .data
536
44
                .context
537
44
                .roots
538
44
                .tree(task_self.collection_tree::<Versioned, _>(
539
44
                    &collection,
540
44
                    document_tree_name(&collection),
541
44
                )?)
542
44
                .map_err(Error::from)?;
543
44
            let mut found_docs = Vec::new();
544
44
            let mut keys_read = 0;
545
44
            let ids = DocumentIdRange(ids);
546
44
            tree.scan(
547
44
                &ids.borrow_as_bytes(),
548
44
                match sort {
549
39
                    Sort::Ascending => true,
550
5
                    Sort::Descending => false,
551
                },
552
24
                |_, _, _| true,
553
44
                |_, _| {
554
104
                    if let Some(limit) = limit {
555
10
                        if keys_read >= limit {
556
5
                            return KeyEvaluation::Stop;
557
5
                        }
558
5

            
559
5
                        keys_read += 1;
560
94
                    }
561
99
                    KeyEvaluation::ReadData
562
104
                },
563
44
                |_, _, doc| {
564
                    found_docs.push(
565
99
                        deserialize_document(&doc)
566
99
                            .map(BorrowedDocument::into_owned)
567
99
                            .map_err(AbortError::Other)?,
568
                    );
569
99
                    Ok(())
570
99
                },
571
            )
572
44
            .map_err(|err| match err {
573
                AbortError::Other(err) => err,
574
                AbortError::Nebari(err) => crate::Error::from(err),
575
44
            })
576
44
            .unwrap();
577
44

            
578
44
            Ok(found_docs)
579
44
        })
580
40
        .await
581
44
        .unwrap()
582
44
    }
583

            
584
160
    pub(crate) async fn count(
585
160
        &self,
586
160
        ids: Range<DocumentId>,
587
160
        collection: &CollectionName,
588
160
    ) -> Result<u64, bonsaidb_core::Error> {
589
10
        let task_self = self.clone();
590
10
        let collection = collection.clone();
591
10
        // TODO this should be able to use a reduce operation from Nebari https://github.com/khonsulabs/nebari/issues/23
592
10
        tokio::task::spawn_blocking(move || {
593
10
            let tree = task_self
594
10
                .data
595
10
                .context
596
10
                .roots
597
10
                .tree(task_self.collection_tree::<Versioned, _>(
598
10
                    &collection,
599
10
                    document_tree_name(&collection),
600
10
                )?)
601
10
                .map_err(Error::from)?;
602
10
            let mut keys_found = 0;
603
10
            let ids = DocumentIdRange(ids);
604
10
            tree.scan(
605
10
                &ids.borrow_as_bytes(),
606
10
                true,
607
10
                |_, _, _| true,
608
20
                |_, _| {
609
20
                    keys_found += 1;
610
20
                    KeyEvaluation::Skip
611
20
                },
612
10
                |_, _, _| unreachable!(),
613
10
            )
614
10
            .map_err(|err| match err {
615
                AbortError::Other(err) => err,
616
                AbortError::Nebari(err) => crate::Error::from(err),
617
10
            })
618
10
            .unwrap();
619
10

            
620
10
            Ok(keys_found)
621
10
        })
622
8
        .await
623
10
        .unwrap()
624
10
    }
625

            
626
414528
    async fn reduce_in_view(
627
414528
        &self,
628
414528
        view_name: &ViewName,
629
414528
        key: Option<QueryKey<Bytes>>,
630
414528
        access_policy: AccessPolicy,
631
414528
    ) -> Result<Vec<u8>, bonsaidb_core::Error> {
632
15953
        let view = self
633
15953
            .data
634
15953
            .schema
635
15953
            .view_by_name(view_name)
636
15953
            .ok_or(bonsaidb_core::Error::CollectionNotFound)?;
637
15953
        let mut mappings = self
638
15955
            .grouped_reduce_in_view(view_name, key, access_policy)
639
329
            .await?;
640

            
641
15953
        let result = if mappings.len() == 1 {
642
5113
            mappings.pop().unwrap().value.into_vec()
643
        } else {
644
10840
            view.reduce(
645
10840
                &mappings
646
10840
                    .iter()
647
10977
                    .map(|map| (map.key.as_ref(), map.value.as_ref()))
648
10840
                    .collect::<Vec<_>>(),
649
10840
                true,
650
10840
            )
651
10840
            .map_err(Error::from)?
652
        };
653

            
654
15948
        Ok(result)
655
15953
    }
656

            
657
414726
    async fn grouped_reduce_in_view(
658
414726
        &self,
659
414726
        view_name: &ViewName,
660
414726
        key: Option<QueryKey<Bytes>>,
661
414726
        access_policy: AccessPolicy,
662
414726
    ) -> Result<Vec<MappedSerializedValue>, bonsaidb_core::Error> {
663
15977
        let view = self
664
15977
            .data
665
15977
            .schema
666
15977
            .view_by_name(view_name)
667
15977
            .ok_or(bonsaidb_core::Error::CollectionNotFound)?;
668
15977
        let mut mappings = Vec::new();
669
16148
        self.for_each_in_view(view, key, Sort::Ascending, None, access_policy, |entry| {
670
5323
            mappings.push(MappedSerializedValue {
671
5323
                key: entry.key,
672
5323
                value: entry.reduced_value,
673
5323
            });
674
5323
            Ok(())
675
16150
        })
676
349
        .await?;
677

            
678
15977
        Ok(mappings)
679
15977
    }
680

            
681
416467
    fn apply_transaction_to_roots(
682
416467
        &self,
683
416467
        transaction: &Transaction,
684
416467
    ) -> Result<Vec<OperationResult>, Error> {
685
416467
        let mut open_trees = OpenTrees::default();
686
1047069
        for op in &transaction.operations {
687
630682
            if !self.data.schema.contains_collection_id(&op.collection) {
688
80
                return Err(Error::Core(bonsaidb_core::Error::CollectionNotFound));
689
630602
            }
690

            
691
            #[cfg(any(feature = "encryption", feature = "compression"))]
692
630602
            let vault = if let Some(encryption_key) =
693
630602
                self.collection_encryption_key(&op.collection).cloned()
694
            {
695
                #[cfg(feature = "encryption")]
696
2587
                if let Some(mut vault) = self.storage().tree_vault().cloned() {
697
1364
                    vault.key = Some(encryption_key);
698
1364
                    Some(vault)
699
                } else {
700
1223
                    TreeVault::new_if_needed(
701
1223
                        Some(encryption_key),
702
1223
                        self.storage().vault(),
703
1223
                        #[cfg(feature = "compression")]
704
1223
                        None,
705
1223
                    )
706
                }
707

            
708
                #[cfg(not(feature = "encryption"))]
709
                {
710
                    drop(encryption_key);
711
                    return Err(Error::EncryptionDisabled);
712
                }
713
            } else {
714
628015
                self.storage().tree_vault().cloned()
715
            };
716

            
717
630602
            open_trees.open_trees_for_document_change(
718
630602
                &op.collection,
719
630602
                &self.data.schema,
720
630602
                #[cfg(any(feature = "encryption", feature = "compression"))]
721
630602
                vault,
722
630602
            );
723
        }
724

            
725
416387
        let mut roots_transaction = self
726
416387
            .data
727
416387
            .context
728
416387
            .roots
729
416387
            .transaction::<_, dyn AnyTreeRoot<AnyFile>>(&open_trees.trees)?;
730

            
731
416387
        let mut results = Vec::new();
732
416387
        let mut changed_documents = Vec::new();
733
416387
        let mut collection_indexes = HashMap::new();
734
416387
        let mut collections = Vec::new();
735
1033398
        for op in &transaction.operations {
736
630602
            let result = self.execute_operation(
737
630602
                op,
738
630602
                &mut roots_transaction,
739
630602
                &open_trees.trees_index_by_name,
740
630602
            )?;
741

            
742
617011
            if let Some((collection, id, deleted)) = match &result {
743
580651
                OperationResult::DocumentUpdated { header, collection } => {
744
580651
                    Some((collection, header.id, false))
745
                }
746
36360
                OperationResult::DocumentDeleted { id, collection } => {
747
36360
                    Some((collection, *id, true))
748
                }
749
                OperationResult::Success => None,
750
617011
            } {
751
617011
                let collection = match collection_indexes.get(collection) {
752
212578
                    Some(index) => *index,
753
                    None => {
754
404433
                        if let Ok(id) = u16::try_from(collections.len()) {
755
404433
                            collection_indexes.insert(collection.clone(), id);
756
404433
                            collections.push(collection.clone());
757
404433
                            id
758
                        } else {
759
                            return Err(Error::TransactionTooLarge);
760
                        }
761
                    }
762
                };
763
617011
                changed_documents.push(ChangedDocument {
764
617011
                    collection,
765
617011
                    id,
766
617011
                    deleted,
767
617011
                });
768
            }
769
617011
            results.push(result);
770
        }
771

            
772
402796
        self.invalidate_changed_documents(
773
402796
            &mut roots_transaction,
774
402796
            &open_trees,
775
402796
            &collections,
776
402796
            &changed_documents,
777
402796
        )?;
778

            
779
402796
        roots_transaction
780
402796
            .entry_mut()
781
402796
            .set_data(compat::serialize_executed_transaction_changes(
782
402796
                &Changes::Documents(DocumentChanges {
783
402796
                    collections,
784
402796
                    documents: changed_documents,
785
402796
                }),
786
402796
            )?)?;
787

            
788
402796
        roots_transaction.commit()?;
789

            
790
402796
        Ok(results)
791
416467
    }
792

            
793
402796
    fn invalidate_changed_documents(
794
402796
        &self,
795
402796
        roots_transaction: &mut ExecutingTransaction<AnyFile>,
796
402796
        open_trees: &OpenTrees,
797
402796
        collections: &[CollectionName],
798
402796
        changed_documents: &[ChangedDocument],
799
402796
    ) -> Result<(), Error> {
800
404434
        for (collection, changed_documents) in &changed_documents
801
402796
            .iter()
802
617011
            .group_by(|doc| &collections[usize::from(doc.collection)])
803
        {
804
404433
            if let Some(views) = self.data.schema.views_in_collection(collection) {
805
238370
                let changed_documents = changed_documents.collect::<Vec<_>>();
806
802086
                for view in views.into_iter().filter(|view| !view.unique()) {
807
764269
                    let view_name = view.view_name();
808
764269
                    let tree_name = view_invalidated_docs_tree_name(&view_name);
809
1597004
                    for changed_document in &changed_documents {
810
832735
                        let mut invalidated_docs = roots_transaction
811
832735
                            .tree::<Unversioned>(open_trees.trees_index_by_name[&tree_name])
812
832735
                            .unwrap();
813
832735
                        invalidated_docs.set(changed_document.id.as_ref().to_vec(), b"")?;
814
                    }
815
                }
816
166063
            }
817
        }
818
402796
        Ok(())
819
402796
    }
820

            
821
630602
    fn execute_operation(
822
630602
        &self,
823
630602
        operation: &Operation,
824
630602
        transaction: &mut ExecutingTransaction<AnyFile>,
825
630602
        tree_index_map: &HashMap<String, usize>,
826
630602
    ) -> Result<OperationResult, Error> {
827
630602
        match &operation.command {
828
450658
            Command::Insert { id, contents } => {
829
450658
                self.execute_insert(operation, transaction, tree_index_map, *id, contents)
830
            }
831
130398
            Command::Update { header, contents } => self.execute_update(
832
130398
                operation,
833
130398
                transaction,
834
130398
                tree_index_map,
835
130398
                header.id,
836
130398
                Some(&header.revision),
837
130398
                contents,
838
130398
            ),
839
13186
            Command::Overwrite { id, contents } => {
840
13186
                self.execute_update(operation, transaction, tree_index_map, *id, None, contents)
841
            }
842
36360
            Command::Delete { header } => {
843
36360
                self.execute_delete(operation, transaction, tree_index_map, header)
844
            }
845
        }
846
630602
    }
847

            
848
143584
    fn execute_update(
849
143584
        &self,
850
143584
        operation: &Operation,
851
143584
        transaction: &mut ExecutingTransaction<AnyFile>,
852
143584
        tree_index_map: &HashMap<String, usize>,
853
143584
        id: DocumentId,
854
143584
        check_revision: Option<&Revision>,
855
143584
        contents: &[u8],
856
143584
    ) -> Result<OperationResult, crate::Error> {
857
143584
        let mut documents = transaction
858
143584
            .tree::<Versioned>(tree_index_map[&document_tree_name(&operation.collection)])
859
143584
            .unwrap();
860
143584
        let document_id = ArcBytes::from(id.to_vec());
861
143584
        let mut result = None;
862
143584
        let mut updated = false;
863
143584
        documents.modify(
864
143584
            vec![document_id.clone()],
865
143584
            nebari::tree::Operation::CompareSwap(CompareSwap::new(&mut |_key,
866
                                                                        value: Option<
867
                ArcBytes<'_>,
868
            >| {
869
143584
                if let Some(old) = value {
870
143504
                    let doc = match deserialize_document(&old) {
871
143504
                        Ok(doc) => doc,
872
                        Err(err) => {
873
                            result = Some(Err(err));
874
                            return nebari::tree::KeyOperation::Skip;
875
                        }
876
                    };
877
143504
                    if check_revision.is_none() || Some(&doc.header.revision) == check_revision {
878
143424
                        if let Some(updated_revision) = doc.header.revision.next_revision(contents)
879
                        {
880
143264
                            let updated_header = Header {
881
143264
                                id,
882
143264
                                revision: updated_revision,
883
143264
                            };
884
143264
                            let serialized_doc = match serialize_document(&BorrowedDocument {
885
143264
                                header: updated_header.clone(),
886
143264
                                contents: CowBytes::from(contents),
887
143264
                            }) {
888
143264
                                Ok(bytes) => bytes,
889
                                Err(err) => {
890
                                    result = Some(Err(Error::from(err)));
891
                                    return nebari::tree::KeyOperation::Skip;
892
                                }
893
                            };
894
143264
                            result = Some(Ok(OperationResult::DocumentUpdated {
895
143264
                                collection: operation.collection.clone(),
896
143264
                                header: updated_header,
897
143264
                            }));
898
143264
                            updated = true;
899
143264
                            return nebari::tree::KeyOperation::Set(ArcBytes::from(serialized_doc));
900
160
                        }
901
160

            
902
160
                        // If no new revision was made, it means an attempt to
903
160
                        // save a document with the same contents was made.
904
160
                        // We'll return a success but not actually give a new
905
160
                        // version
906
160
                        result = Some(Ok(OperationResult::DocumentUpdated {
907
160
                            collection: operation.collection.clone(),
908
160
                            header: doc.header,
909
160
                        }));
910
80
                    } else {
911
80
                        result = Some(Err(Error::Core(bonsaidb_core::Error::DocumentConflict(
912
80
                            operation.collection.clone(),
913
80
                            Box::new(doc.header),
914
80
                        ))));
915
80
                    }
916
80
                } else if check_revision.is_none() {
917
                    let doc = BorrowedDocument::new(id, contents);
918
                    match serialize_document(&doc).map(|bytes| (doc, bytes)) {
919
                        Ok((doc, serialized)) => {
920
                            result = Some(Ok(OperationResult::DocumentUpdated {
921
                                collection: operation.collection.clone(),
922
                                header: doc.header,
923
                            }));
924
                            return nebari::tree::KeyOperation::Set(ArcBytes::from(serialized));
925
                        }
926
                        Err(err) => {
927
                            result = Some(Err(Error::from(err)));
928
                        }
929
                    }
930
80
                } else {
931
80
                    result = Some(Err(Error::Core(bonsaidb_core::Error::DocumentNotFound(
932
80
                        operation.collection.clone(),
933
80
                        Box::new(id),
934
80
                    ))));
935
80
                }
936
320
                nebari::tree::KeyOperation::Skip
937
143584
            })),
938
143584
        )?;
939
143584
        drop(documents);
940
143584

            
941
143584
        if updated {
942
143264
            self.update_unique_views(&document_id, operation, transaction, tree_index_map)?;
943
320
        }
944

            
945
143424
        result.expect("nebari should invoke the callback even when the key isn't found")
946
143584
    }
947

            
948
450658
    fn execute_insert(
949
450658
        &self,
950
450658
        operation: &Operation,
951
450658
        transaction: &mut ExecutingTransaction<AnyFile>,
952
450658
        tree_index_map: &HashMap<String, usize>,
953
450658
        id: Option<DocumentId>,
954
450658
        contents: &[u8],
955
450658
    ) -> Result<OperationResult, Error> {
956
450658
        let mut documents = transaction
957
450658
            .tree::<Versioned>(tree_index_map[&document_tree_name(&operation.collection)])
958
450658
            .unwrap();
959
450658
        let id = if let Some(id) = id {
960
232890
            id
961
217768
        } else if let Some(last_key) = documents.last_key()? {
962
198005
            let id = DocumentId::try_from(last_key.as_slice())?;
963
198005
            self.data
964
198005
                .schema
965
198005
                .next_id_for_collection(&operation.collection, Some(id))?
966
        } else {
967
19763
            self.data
968
19763
                .schema
969
19763
                .next_id_for_collection(&operation.collection, None)?
970
        };
971

            
972
450632
        let doc = BorrowedDocument::new(id, contents);
973
450632
        let serialized: Vec<u8> = serialize_document(&doc)?;
974
450632
        let document_id = ArcBytes::from(doc.header.id.as_ref().to_vec());
975
450632
        if let Some(document) = documents.replace(document_id.clone(), serialized)? {
976
13107
            let doc = deserialize_document(&document)?;
977
13107
            Err(Error::Core(bonsaidb_core::Error::DocumentConflict(
978
13107
                operation.collection.clone(),
979
13107
                Box::new(doc.header),
980
13107
            )))
981
        } else {
982
437525
            drop(documents);
983
437525
            self.update_unique_views(&document_id, operation, transaction, tree_index_map)?;
984

            
985
437387
            Ok(OperationResult::DocumentUpdated {
986
437387
                collection: operation.collection.clone(),
987
437387
                header: doc.header,
988
437387
            })
989
        }
990
450658
    }
991

            
992
36360
    fn execute_delete(
993
36360
        &self,
994
36360
        operation: &Operation,
995
36360
        transaction: &mut ExecutingTransaction<AnyFile>,
996
36360
        tree_index_map: &HashMap<String, usize>,
997
36360
        header: &Header,
998
36360
    ) -> Result<OperationResult, Error> {
999
36360
        let mut documents = transaction
36360
            .tree::<Versioned>(tree_index_map[&document_tree_name(&operation.collection)])
36360
            .unwrap();
36360
        if let Some(vec) = documents.remove(header.id.as_ref())? {
36360
            drop(documents);
36360
            let doc = deserialize_document(&vec)?;
36360
            if &doc.header == header {
36360
                self.update_unique_views(
36360
                    &ArcBytes::from(doc.header.id.to_vec()),
36360
                    operation,
36360
                    transaction,
36360
                    tree_index_map,
36360
                )?;

            
36360
                Ok(OperationResult::DocumentDeleted {
36360
                    collection: operation.collection.clone(),
36360
                    id: header.id,
36360
                })
            } else {
                Err(Error::Core(bonsaidb_core::Error::DocumentConflict(
                    operation.collection.clone(),
                    Box::new(header.clone()),
                )))
            }
        } else {
            Err(Error::Core(bonsaidb_core::Error::DocumentNotFound(
                operation.collection.clone(),
                Box::new(header.id),
            )))
        }
36360
    }

            
    fn update_unique_views(
        &self,
        document_id: &ArcBytes<'static>,
        operation: &Operation,
        transaction: &mut ExecutingTransaction<AnyFile>,
        tree_index_map: &HashMap<String, usize>,
    ) -> Result<(), Error> {
617149
        if let Some(unique_views) = self
617149
            .data
617149
            .schema
617149
            .unique_views_in_collection(&operation.collection)
        {
85539
            let documents = transaction
85539
                .unlocked_tree(tree_index_map[&document_tree_name(&operation.collection)])
85539
                .unwrap();
170780
            for view in unique_views {
85539
                let name = view.view_name();
85539
                let document_map = transaction
85539
                    .unlocked_tree(tree_index_map[&view_document_map_tree_name(&name)])
85539
                    .unwrap();
85539
                let view_entries = transaction
85539
                    .unlocked_tree(tree_index_map[&view_entries_tree_name(&name)])
85539
                    .unwrap();
85539
                mapper::DocumentRequest {
85539
                    database: self,
85539
                    document_ids: vec![document_id.clone()],
85539
                    map_request: &mapper::Map {
85539
                        database: self.data.name.clone(),
85539
                        collection: operation.collection.clone(),
85539
                        view_name: name.clone(),
85539
                    },
85539
                    document_map,
85539
                    documents,
85539
                    view_entries,
85539
                    view,
85539
                }
85539
                .map()?;
            }
531610
        }

            
616851
        Ok(())
617149
    }

            
664197
    fn create_view_iterator<'a, K: for<'k> Key<'k> + 'a>(
664197
        view_entries: &'a Tree<Unversioned, AnyFile>,
664197
        key: Option<QueryKey<K>>,
664197
        order: Sort,
664197
        limit: Option<usize>,
664197
    ) -> Result<Vec<ViewEntry>, Error> {
664197
        let mut values = Vec::new();
664197
        let forwards = match order {
664117
            Sort::Ascending => true,
80
            Sort::Descending => false,
        };
664197
        let mut values_read = 0;
664197
        if let Some(key) = key {
660742
            match key {
238
                QueryKey::Range(range) => {
238
                    let range = range
238
                        .as_ord_bytes()
238
                        .map_err(view::Error::key_serialization)?;
238
                    view_entries.scan::<Infallible, _, _, _, _>(
476
                        &range.map_ref(|bytes| &bytes[..]),
238
                        forwards,
420
                        |_, _, _| true,
238
                        |_, _| {
1360
                            if let Some(limit) = limit {
160
                                if values_read >= limit {
80
                                    return KeyEvaluation::Stop;
80
                                }
80
                                values_read += 1;
1200
                            }
1280
                            KeyEvaluation::ReadData
1360
                        },
1280
                        |_key, _index, value| {
1280
                            values.push(value);
1280
                            Ok(())
1280
                        },
238
                    )?;
                }
660318
                QueryKey::Matches(key) => {
660318
                    let key = key
660318
                        .as_ord_bytes()
660318
                        .map_err(view::Error::key_serialization)?
660318
                        .to_vec();
660318

            
660318
                    values.extend(view_entries.get(&key)?);
                }
186
                QueryKey::Multiple(list) => {
186
                    let mut list = list
186
                        .into_iter()
372
                        .map(|key| {
372
                            key.as_ord_bytes()
372
                                .map(|bytes| bytes.to_vec())
372
                                .map_err(view::Error::key_serialization)
372
                        })
186
                        .collect::<Result<Vec<_>, _>>()?;

            
186
                    list.sort();
186

            
186
                    values.extend(
186
                        view_entries
186
                            .get_multiple(list.iter().map(Vec::as_slice))?
186
                            .into_iter()
372
                            .map(|(_, value)| value),
                    );
                }
            }
        } else {
3455
            view_entries.scan::<Infallible, _, _, _, _>(
3455
                &(..),
3455
                forwards,
3455
                |_, _, _| true,
3455
                |_, _| {
4708
                    if let Some(limit) = limit {
                        if values_read >= limit {
                            return KeyEvaluation::Stop;
                        }
                        values_read += 1;
4708
                    }
4708
                    KeyEvaluation::ReadData
4729
                },
4729
                |_, _, value| {
4708
                    values.push(value);
4708
                    Ok(())
4729
                },
3455
            )?;
        }

            
664197
        values
664197
            .into_iter()
664197
            .map(|value| bincode::deserialize(&value).map_err(Error::from))
664197
            .collect::<Result<Vec<_>, Error>>()
664197
    }

            
    #[cfg(any(feature = "encryption", feature = "compression"))]
2471734
    pub(crate) fn collection_encryption_key(&self, collection: &CollectionName) -> Option<&KeyId> {
2471734
        self.schematic()
2471734
            .encryption_key_for_collection(collection)
2471734
            .or_else(|| self.storage().default_encryption_key())
2471734
    }

            
    #[cfg_attr(
        not(feature = "encryption"),
        allow(
            unused_mut,
            unused_variables,
            clippy::unused_self,
            clippy::let_and_return
        )
    )]
    #[allow(clippy::unnecessary_wraps)]
1841210
    pub(crate) fn collection_tree<R: Root, S: Into<Cow<'static, str>>>(
1841210
        &self,
1841210
        collection: &CollectionName,
1841210
        name: S,
1841210
    ) -> Result<TreeRoot<R, AnyFile>, Error> {
1841210
        let mut tree = R::tree(name);
1841210

            
1841210
        #[cfg(any(feature = "encryption", feature = "compression"))]
1841210
        match (
1841210
            self.collection_encryption_key(collection),
1841210
            self.storage().tree_vault().cloned(),
        ) {
24373
            (Some(override_key), Some(mut vault)) => {
24373
                #[cfg(feature = "encryption")]
24373
                {
24373
                    vault.key = Some(override_key.clone());
24373
                    tree = tree.with_vault(vault);
24373
                }

            
                #[cfg(not(feature = "encryption"))]
                {
                    return Err(Error::EncryptionDisabled);
                }
            }
425740
            (None, Some(vault)) => {
425740
                tree = tree.with_vault(vault);
425740
            }
1391097
            (key, None) => {
                #[cfg(feature = "encryption")]
1391097
                if let Some(vault) = TreeVault::new_if_needed(
1391097
                    key.cloned(),
1391097
                    self.storage().vault(),
1391097
                    #[cfg(feature = "compression")]
1391097
                    None,
1391097
                ) {
28707
                    tree = tree.with_vault(vault);
1362390
                }

            
                #[cfg(not(feature = "encryption"))]
                if key.is_some() {
                    return Err(Error::EncryptionDisabled);
                }
            }
        }

            
1841210
        Ok(tree)
1841210
    }

            
1
    pub(crate) async fn update_key_expiration_async<'key>(
1
        &self,
1
        tree_key: impl Into<Cow<'key, str>>,
1
        expiration: Option<Timestamp>,
1
    ) {
1
        self.data
1
            .context
1
            .update_key_expiration_async(tree_key, expiration)
            .await;
1
    }
}
59
#[derive(Serialize, Deserialize)]
struct LegacyHeader {
    id: u64,
    revision: Revision,
}
59
#[derive(Serialize, Deserialize)]
struct LegacyDocument<'a> {
    header: LegacyHeader,
    #[serde(borrow)]
    contents: &'a [u8],
}

            
867510
pub(crate) fn deserialize_document(bytes: &[u8]) -> Result<BorrowedDocument<'_>, Error> {
867510
    match pot::from_slice::<BorrowedDocument<'_>>(bytes) {
867451
        Ok(document) => Ok(document),
59
        Err(err) => match bincode::deserialize::<LegacyDocument<'_>>(bytes) {
59
            Ok(legacy_doc) => Ok(BorrowedDocument {
59
                header: Header {
59
                    id: DocumentId::from_u64(legacy_doc.header.id),
59
                    revision: legacy_doc.header.revision,
59
                },
59
                contents: CowBytes::from(legacy_doc.contents),
59
            }),
            Err(_) => Err(Error::from(err)),
        },
    }
867510
}

            
593896
fn serialize_document(document: &BorrowedDocument<'_>) -> Result<Vec<u8>, bonsaidb_core::Error> {
593896
    pot::to_vec(document)
593896
        .map_err(Error::from)
593896
        .map_err(bonsaidb_core::Error::from)
593896
}

            
#[async_trait]
impl Connection for Database {
1249401
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(transaction)))]
    async fn apply_transaction(
        &self,
        transaction: Transaction,
416467
    ) -> Result<Vec<OperationResult>, bonsaidb_core::Error> {
416467
        let task_self = self.clone();
416467
        let mut unique_view_tasks = Vec::new();
418105
        for collection_name in transaction
416467
            .operations
416467
            .iter()
630682
            .map(|op| &op.collection)
416467
            .collect::<HashSet<_>>()
        {
418104
            if let Some(views) = self.data.schema.views_in_collection(collection_name) {
1107387
                for view in views {
855452
                    if view.unique() {
38115
                        if let Some(task) = self
38115
                            .data
38115
                            .storage
38115
                            .tasks()
38115
                            .spawn_integrity_check(view, self)
                            .await?
850
                        {
850
                            unique_view_tasks.push(task);
37265
                        }
817337
                    }
                }
166169
            }
        }

            
416467
        let mut unique_view_mapping_tasks = Vec::new();
417317
        for task in unique_view_tasks {
850
            if let Some(spawned_task) = task
850
                .receive()
850
                .await
850
                .map_err(Error::from)?
850
                .map_err(Error::from)?
796
            {
796
                unique_view_mapping_tasks.push(spawned_task);
796
            }
        }
417263
        for task in unique_view_mapping_tasks {
796
            let mut task = fast_async_lock!(task);
796
            if let Some(task) = task.take() {
796
                task.receive()
614
                    .await
796
                    .map_err(Error::from)?
796
                    .map_err(Error::from)?;
            }
        }

            
416467
        tokio::task::spawn_blocking(move || task_self.apply_transaction_to_roots(&transaction))
357472
            .await
416467
            .map_err(|err| bonsaidb_core::Error::Database(err.to_string()))?
416467
            .map_err(bonsaidb_core::Error::from)
832934
    }

            
18615
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(id)))]
    async fn get<C, PrimaryKey>(
        &self,
        id: PrimaryKey,
    ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error>
    where
        C: schema::Collection,
        PrimaryKey: Into<AnyDocumentId<C::PrimaryKey>> + Send,
6205
    {
6205
        self.get_from_collection_id(id.into().to_document_id()?, &C::collection_name())
3735
            .await
12410
    }

            
12309
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(ids)))]
    async fn get_multiple<C, PrimaryKey, DocumentIds, I>(
        &self,
        ids: DocumentIds,
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error>
    where
        C: schema::Collection,
        DocumentIds: IntoIterator<Item = PrimaryKey, IntoIter = I> + Send + Sync,
        I: Iterator<Item = PrimaryKey> + Send + Sync,
        PrimaryKey: Into<AnyDocumentId<C::PrimaryKey>> + Send + Sync,
4103
    {
        self.get_multiple_from_collection_id(
4103
            &ids.into_iter()
4121
                .map(|id| id.into().to_document_id())
4103
                .collect::<Result<Vec<_>, _>>()?,
4103
            &C::collection_name(),
2521
        )
2521
        .await
8206
    }

            
72
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(ids, order, limit)))]
    async fn list<C, R, PrimaryKey>(
        &self,
        ids: R,
        order: Sort,
        limit: Option<usize>,
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error>
    where
        C: schema::Collection,
        R: Into<Range<PrimaryKey>> + Send,
        PrimaryKey: Into<AnyDocumentId<C::PrimaryKey>> + Send,
24
    {
        self.list(
26
            ids.into().map_result(|id| id.into().to_document_id())?,
24
            order,
24
            limit,
24
            &C::collection_name(),
24
        )
24
        .await
48
    }

            
18
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(ids)))]
    async fn count<C, R, PrimaryKey>(&self, ids: R) -> Result<u64, bonsaidb_core::Error>
    where
        C: schema::Collection,
        R: Into<Range<PrimaryKey>> + Send,
        PrimaryKey: Into<AnyDocumentId<C::PrimaryKey>> + Send,
6
    {
        self.count(
6
            ids.into().map_result(|id| id.into().to_document_id())?,
6
            &C::collection_name(),
6
        )
6
        .await
12
    }

            
    #[cfg_attr(
        feature = "tracing",
82512
        tracing::instrument(skip(starting_id, result_limit))
    )]
    async fn list_executed_transactions(
        &self,
        starting_id: Option<u64>,
        result_limit: Option<usize>,
27504
    ) -> Result<Vec<transaction::Executed>, bonsaidb_core::Error> {
27504
        let result_limit = result_limit
27504
            .unwrap_or(LIST_TRANSACTIONS_DEFAULT_RESULT_COUNT)
27504
            .min(LIST_TRANSACTIONS_MAX_RESULTS);
27504
        if result_limit > 0 {
27424
            let task_self = self.clone();
27424
            tokio::task::spawn_blocking::<_, Result<Vec<transaction::Executed>, Error>>(move || {
27424
                let range = if let Some(starting_id) = starting_id {
13986
                    Range::from(starting_id..)
                } else {
13438
                    Range::from(..)
                };

            
27424
                let mut entries = Vec::new();
208112
                task_self.roots().transactions().scan(range, |entry| {
208112
                    entries.push(entry);
208112
                    entries.len() < result_limit
208112
                })?;

            
27424
                entries
27424
                    .into_iter()
27424
                    .map(|entry| {
208112
                        if let Some(data) = entry.data() {
208054
                            let changes = compat::deserialize_executed_transaction_changes(data)?;
208054
                            Ok(Some(transaction::Executed {
208054
                                id: entry.id,
208054
                                changes,
208054
                            }))
                        } else {
58
                            Ok(None)
                        }
208112
                    })
27424
                    .filter_map(Result::transpose)
27424
                    .collect::<Result<Vec<_>, Error>>()
27424
            })
21703
            .await
27424
            .unwrap()
27424
            .map_err(bonsaidb_core::Error::from)
        } else {
            // A request was made to return an empty result? This should probably be
            // an error, but technically this is a correct response.
80
            Ok(Vec::default())
        }
55008
    }

            
    #[cfg_attr(
        feature = "tracing",
126258
        tracing::instrument(skip(key, order, limit, access_policy))
    )]
    #[must_use]
    async fn query<V: schema::SerializedView>(
        &self,
        key: Option<QueryKey<V::Key>>,
        order: Sort,
        limit: Option<usize>,
        access_policy: AccessPolicy,
    ) -> Result<Vec<Map<V::Key, V::Value>>, bonsaidb_core::Error>
    where
        Self: Sized,
42086
    {
42086
        let mut results = Vec::new();
42086
        self.for_each_view_entry::<V, _>(key, order, limit, access_policy, |entry| {
18629
            let key = <V::Key as Key>::from_ord_bytes(&entry.key)
18629
                .map_err(view::Error::key_serialization)
18629
                .map_err(Error::from)?;
37302
            for entry in entry.mappings {
                results.push(Map::new(
18673
                    entry.source,
18673
                    key.clone(),
18673
                    V::deserialize(&entry.value)?,
                ));
            }
18629
            Ok(())
42130
        })
39897
        .await?;

            
42084
        Ok(results)
84170
    }

            
    #[cfg_attr(
        feature = "tracing",
12275
        tracing::instrument(skip(key, order, limit, access_policy))
    )]
    async fn query_with_docs<V: schema::SerializedView>(
        &self,
        key: Option<QueryKey<V::Key>>,
        order: Sort,
        limit: Option<usize>,
        access_policy: AccessPolicy,
    ) -> Result<MappedDocuments<OwnedDocument, V>, bonsaidb_core::Error>
    where
        Self: Sized,
4092
    {
4138
        let results = Connection::query::<V>(self, key, order, limit, access_policy).await?;

            
4092
        let documents = self
4106
            .get_multiple::<V::Collection, _, _, _>(results.iter().map(|m| m.source.id))
2510
            .await?
4092
            .into_iter()
4106
            .map(|doc| (doc.header.id, doc))
4092
            .collect::<BTreeMap<_, _>>();
4092

            
4092
        Ok(MappedDocuments {
4092
            mappings: results,
4092
            documents,
4092
        })
8184
    }

            
23952
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(key, access_policy)))]
    async fn reduce<V: schema::SerializedView>(
        &self,
        key: Option<QueryKey<V::Key>>,
        access_policy: AccessPolicy,
    ) -> Result<V::Value, bonsaidb_core::Error>
    where
        Self: Sized,
7984
    {
7984
        let view = self
7984
            .data
7984
            .schema
7984
            .view::<V>()
7984
            .expect("query made with view that isn't registered with this database");

            
7980
        let result = self
            .reduce_in_view(
7984
                &view.view_name(),
7984
                key.map(|key| key.serialized()).transpose()?,
7984
                access_policy,
170
            )
170
            .await?;
7981
        let value = V::deserialize(&result)?;

            
7981
        Ok(value)
15968
    }

            
54
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(key, access_policy)))]
    async fn reduce_grouped<V: schema::SerializedView>(
        &self,
        key: Option<QueryKey<V::Key>>,
        access_policy: AccessPolicy,
    ) -> Result<Vec<MappedValue<V::Key, V::Value>>, bonsaidb_core::Error>
    where
        Self: Sized,
18
    {
18
        let view = self
18
            .data
18
            .schema
18
            .view::<V>()
18
            .expect("query made with view that isn't registered with this database");

            
18
        let results = self
            .grouped_reduce_in_view(
18
                &view.view_name(),
18
                key.map(|key| key.serialized()).transpose()?,
18
                access_policy,
15
            )
15
            .await?;
18
        results
18
            .into_iter()
45
            .map(|map| {
45
                Ok(MappedValue::new(
45
                    V::Key::from_ord_bytes(&map.key).map_err(view::Error::key_serialization)?,
45
                    V::deserialize(&map.value)?,
                ))
45
            })
18
            .collect::<Result<Vec<_>, bonsaidb_core::Error>>()
36
    }

            
3
    async fn delete_docs<V: schema::SerializedView>(
3
        &self,
3
        key: Option<QueryKey<V::Key>>,
3
        access_policy: AccessPolicy,
3
    ) -> Result<u64, bonsaidb_core::Error>
3
    where
3
        Self: Sized,
3
    {
3
        let collection = <V::Collection as Collection>::collection_name();
3
        let mut transaction = Transaction::default();
3
        self.for_each_view_entry::<V, _>(key, Sort::Ascending, None, access_policy, |entry| {
9
            for mapping in entry.mappings {
6
                transaction.push(Operation::delete(collection.clone(), mapping.source));
6
            }

            
3
            Ok(())
3
        })
3
        .await?;

            
3
        let results = Connection::apply_transaction(self, transaction).await?;

            
3
        Ok(results.len() as u64)
6
    }

            
1201020
    #[cfg_attr(feature = "tracing", tracing::instrument)]
400340
    async fn last_transaction_id(&self) -> Result<Option<u64>, bonsaidb_core::Error> {
400340
        Ok(self.roots().transactions().current_transaction_id())
400340
    }

            
9
    #[cfg_attr(feature = "tracing", tracing::instrument)]
3
    async fn compact_collection<C: schema::Collection>(&self) -> Result<(), bonsaidb_core::Error> {
3
        self.storage()
3
            .tasks()
3
            .compact_collection(self.clone(), C::collection_name())
3
            .await?;
3
        Ok(())
6
    }

            
240
    #[cfg_attr(feature = "tracing", tracing::instrument)]
80
    async fn compact(&self) -> Result<(), bonsaidb_core::Error> {
80
        self.storage()
80
            .tasks()
80
            .compact_database(self.clone())
80
            .await?;
80
        Ok(())
160
    }

            
240
    #[cfg_attr(feature = "tracing", tracing::instrument)]
80
    async fn compact_key_value_store(&self) -> Result<(), bonsaidb_core::Error> {
80
        self.storage()
80
            .tasks()
80
            .compact_key_value_store(self.clone())
80
            .await?;
80
        Ok(())
160
    }
}

            
type ViewIterator<'a> =
    Box<dyn Iterator<Item = Result<(ArcBytes<'static>, ArcBytes<'static>), Error>> + 'a>;

            
struct ViewEntryCollectionIterator<'a> {
    iterator: ViewIterator<'a>,
}

            
impl<'a> Iterator for ViewEntryCollectionIterator<'a> {
    type Item = Result<ViewEntry, crate::Error>;

            
    fn next(&mut self) -> Option<Self::Item> {
        self.iterator.next().map(|item| {
            item.map_err(crate::Error::from)
                .and_then(|(_, value)| bincode::deserialize(&value).map_err(Error::from))
        })
    }
}

            
1386229
#[derive(Debug, Clone)]
pub(crate) struct Context {
    data: Arc<ContextData>,
}

            
impl Deref for Context {
    type Target = ContextData;

            
2717051
    fn deref(&self) -> &Self::Target {
2717051
        &self.data
2717051
    }
}

            
#[derive(Debug)]
pub(crate) struct ContextData {
    pub(crate) roots: Roots<AnyFile>,
    key_value_state: Arc<Mutex<keyvalue::KeyValueState>>,
    runtime: tokio::runtime::Handle,
}

            
impl Borrow<Roots<AnyFile>> for Context {
    fn borrow(&self) -> &Roots<AnyFile> {
        &self.data.roots
    }
}

            
impl Context {
21693
    pub(crate) fn new(roots: Roots<AnyFile>, key_value_persistence: KeyValuePersistence) -> Self {
21693
        let (background_sender, background_receiver) =
21693
            watch::channel(BackgroundWorkerProcessTarget::Never);
21693
        let key_value_state = Arc::new(Mutex::new(keyvalue::KeyValueState::new(
21693
            key_value_persistence,
21693
            roots.clone(),
21693
            background_sender,
21693
        )));
21693
        let context = Self {
21693
            data: Arc::new(ContextData {
21693
                roots,
21693
                key_value_state: key_value_state.clone(),
21693
                runtime: tokio::runtime::Handle::current(),
21693
            }),
21693
        };
21693
        tokio::task::spawn(keyvalue::background_worker(
21693
            key_value_state,
21693
            background_receiver,
21693
        ));
21693
        context
21693
    }

            
809138
    pub(crate) async fn perform_kv_operation(
809138
        &self,
809138
        op: KeyOperation,
809138
    ) -> Result<Output, bonsaidb_core::Error> {
809138
        let mut state = fast_async_lock!(self.data.key_value_state);
809138
        state
809138
            .perform_kv_operation(op, &self.data.key_value_state)
            .await
809138
    }

            
11
    pub(crate) async fn update_key_expiration_async<'key>(
11
        &self,
11
        tree_key: impl Into<Cow<'key, str>>,
11
        expiration: Option<Timestamp>,
11
    ) {
11
        let mut state = fast_async_lock!(self.data.key_value_state);
11
        state.update_key_expiration(tree_key, expiration);
11
    }
}

            
impl Drop for ContextData {
19719
    fn drop(&mut self) {
19719
        let key_value_state = self.key_value_state.clone();
19719
        self.runtime.spawn(async move {
15326
            let mut state = fast_async_lock!(key_value_state);
15326
            state.shutdown(&key_value_state).await
19719
        });
19719
    }
}

            
2078075
pub fn document_tree_name(collection: &CollectionName) -> String {
2078075
    format!("collection.{:#}", collection)
2078075
}

            
pub struct DocumentIdRange(Range<DocumentId>);

            
impl<'a> BorrowByteRange<'a> for DocumentIdRange {
679
    fn borrow_as_bytes(&'a self) -> BorrowedRange<'a> {
679
        BorrowedRange {
679
            start: match &self.0.start {
359
                connection::Bound::Unbounded => ops::Bound::Unbounded,
320
                connection::Bound::Included(docid) => ops::Bound::Included(docid.as_ref()),
                connection::Bound::Excluded(docid) => ops::Bound::Excluded(docid.as_ref()),
            },
679
            end: match &self.0.end {
359
                connection::Bound::Unbounded => ops::Bound::Unbounded,
240
                connection::Bound::Included(docid) => ops::Bound::Included(docid.as_ref()),
80
                connection::Bound::Excluded(docid) => ops::Bound::Excluded(docid.as_ref()),
            },
        }
679
    }
}