1
use std::{
2
    borrow::{Borrow, Cow},
3
    collections::{BTreeMap, HashMap, HashSet},
4
    convert::Infallible,
5
    ops::Deref,
6
    sync::Arc,
7
    u8,
8
};
9

            
10
use async_lock::Mutex;
11
use async_trait::async_trait;
12
use bonsaidb_core::{
13
    arc_bytes::{serde::Bytes, ArcBytes},
14
    connection::{AccessPolicy, Connection, QueryKey, Range, Sort, StorageConnection},
15
    document::{BorrowedDocument, Document, Header, KeyId, OwnedDocument},
16
    keyvalue::{KeyOperation, Output, Timestamp},
17
    limits::{LIST_TRANSACTIONS_DEFAULT_RESULT_COUNT, LIST_TRANSACTIONS_MAX_RESULTS},
18
    permissions::Permissions,
19
    schema::{
20
        self,
21
        view::{
22
            self,
23
            map::{MappedDocuments, MappedSerializedValue},
24
        },
25
        Collection, CollectionName, Key, Map, MappedValue, Schema, Schematic, ViewName,
26
    },
27
    transaction::{
28
        self, ChangedDocument, Changes, Command, Operation, OperationResult, Transaction,
29
    },
30
};
31
use bonsaidb_utils::fast_async_lock;
32
use byteorder::{BigEndian, ByteOrder};
33
use itertools::Itertools;
34
use nebari::{
35
    io::fs::StdFile,
36
    tree::{
37
        AnyTreeRoot, BorrowByteRange, KeyEvaluation, Root, TreeRoot, U64Range, Unversioned,
38
        Versioned,
39
    },
40
    AbortError, ExecutingTransaction, Roots, Tree,
41
};
42
use tokio::sync::watch;
43

            
44
#[cfg(feature = "encryption")]
45
use crate::vault::TreeVault;
46
use crate::{
47
    config::{Builder, KeyValuePersistence, StorageConfiguration},
48
    error::Error,
49
    open_trees::OpenTrees,
50
    views::{
51
        mapper::{self, ViewEntryCollection},
52
        view_document_map_tree_name, view_entries_tree_name, view_invalidated_docs_tree_name,
53
        view_omitted_docs_tree_name, ViewEntry,
54
    },
55
    Storage,
56
};
57

            
58
pub mod keyvalue;
59

            
60
pub mod pubsub;
61

            
62
/// A local, file-based database.
63
#[derive(Debug)]
64
pub struct Database {
65
    pub(crate) data: Arc<Data>,
66
}
67

            
68
#[derive(Debug)]
69
pub struct Data {
70
    pub name: Arc<Cow<'static, str>>,
71
    context: Context,
72
    pub(crate) storage: Storage,
73
    pub(crate) schema: Arc<Schematic>,
74
    #[allow(dead_code)] // This code was previously used, it works, but is currently unused.
75
    pub(crate) effective_permissions: Option<Permissions>,
76
}
77
impl Clone for Database {
78
919335
    fn clone(&self) -> Self {
79
919335
        Self {
80
919335
            data: self.data.clone(),
81
919335
        }
82
919335
    }
83
}
84

            
85
impl Database {
86
    /// Opens a local file as a bonsaidb.
87
83221
    pub(crate) async fn new<DB: Schema, S: Into<Cow<'static, str>> + Send>(
88
83221
        name: S,
89
83221
        context: Context,
90
83221
        storage: Storage,
91
83221
    ) -> Result<Self, Error> {
92
83199
        let name = name.into();
93
83199
        let schema = Arc::new(DB::schematic()?);
94
83199
        let db = Self {
95
83199
            data: Arc::new(Data {
96
83199
                name: Arc::new(name),
97
83199
                context,
98
83199
                storage: storage.clone(),
99
83199
                schema,
100
83199
                effective_permissions: None,
101
83199
            }),
102
83199
        };
103
83199

            
104
83199
        if db.data.storage.check_view_integrity_on_database_open() {
105
16
            for view in db.data.schema.views() {
106
16
                db.data
107
16
                    .storage
108
16
                    .tasks()
109
16
                    .spawn_integrity_check(view, &db)
110
                    .await?;
111
            }
112
83195
        }
113

            
114
83199
        storage.tasks().spawn_key_value_expiration_loader(&db).await;
115

            
116
83221
        Ok(db)
117
83221
    }
118

            
119
    /// Returns a clone with `effective_permissions`. Replaces any previously applied permissions.
120
    ///
121
    /// # Unstable
122
    ///
123
    /// See [this issue](https://github.com/khonsulabs/bonsaidb/issues/68).
124
    #[doc(hidden)]
125
    #[must_use]
126
    pub fn with_effective_permissions(&self, effective_permissions: Permissions) -> Self {
127
        Self {
128
            data: Arc::new(Data {
129
                name: self.data.name.clone(),
130
                context: self.data.context.clone(),
131
                storage: self.data.storage.clone(),
132
                schema: self.data.schema.clone(),
133
                effective_permissions: Some(effective_permissions),
134
            }),
135
        }
136
    }
137

            
138
    /// Returns the name of the database.
139
    #[must_use]
140
1235
    pub fn name(&self) -> &str {
141
1235
        self.data.name.as_ref()
142
1235
    }
143

            
144
    /// Creates a `Storage` with a single-database named "default" with its data stored at `path`.
145
19
    pub async fn open<DB: Schema>(configuration: StorageConfiguration) -> Result<Self, Error> {
146
306
        let storage = Storage::open(configuration.with_schema::<DB>()?).await?;
147

            
148
33
        storage.create_database::<DB>("default", true).await?;
149

            
150
19
        Ok(storage.database::<DB>("default").await?)
151
19
    }
152

            
153
    /// Returns the [`Storage`] that this database belongs to.
154
    #[must_use]
155
2293152
    pub fn storage(&self) -> &'_ Storage {
156
2293152
        &self.data.storage
157
2293152
    }
158

            
159
    /// Returns the [`Schematic`] for the schema for this database.
160
    #[must_use]
161
1959101
    pub fn schematic(&self) -> &'_ Schematic {
162
1959101
        &self.data.schema
163
1959101
    }
164

            
165
1222206
    pub(crate) fn roots(&self) -> &'_ nebari::Roots<StdFile> {
166
1222206
        &self.data.context.roots
167
1222206
    }
168

            
169
49169
    async fn for_each_in_view<
170
49169
        F: FnMut(ViewEntryCollection) -> Result<(), bonsaidb_core::Error> + Send + Sync,
171
49169
    >(
172
49169
        &self,
173
49169
        view: &dyn view::Serialized,
174
49169
        key: Option<QueryKey<Bytes>>,
175
49169
        order: Sort,
176
49169
        limit: Option<usize>,
177
49169
        access_policy: AccessPolicy,
178
49169
        mut callback: F,
179
49169
    ) -> Result<(), bonsaidb_core::Error> {
180
49169
        if matches!(access_policy, AccessPolicy::UpdateBefore) {
181
37725
            self.data
182
37725
                .storage
183
37725
                .tasks()
184
37725
                .update_view_if_needed(view, self)
185
32619
                .await?;
186
11444
        }
187

            
188
49169
        let view_entries = self
189
49169
            .roots()
190
49169
            .tree(self.collection_tree(
191
49169
                &view.collection(),
192
49169
                view_entries_tree_name(&view.view_name()),
193
49169
            )?)
194
49169
            .map_err(Error::from)?;
195

            
196
        {
197
49169
            for entry in Self::create_view_iterator(&view_entries, key, order, limit)? {
198
24711
                callback(entry)?;
199
            }
200
        }
201

            
202
49169
        if matches!(access_policy, AccessPolicy::UpdateAfter) {
203
4
            let db = self.clone();
204
4
            let view_name = view.view_name();
205
4
            tokio::task::spawn(async move {
206
4
                let view = db
207
4
                    .data
208
4
                    .schema
209
4
                    .view_by_name(&view_name)
210
4
                    .expect("query made with view that isn't registered with this database");
211
4
                db.data
212
4
                    .storage
213
4
                    .tasks()
214
10
                    .update_view_if_needed(view, &db)
215
10
                    .await
216
4
            });
217
49077
        }
218

            
219
49081
        Ok(())
220
49081
    }
221

            
222
33554
    async fn for_each_view_entry<
223
33554
        V: schema::View,
224
33554
        F: FnMut(ViewEntryCollection) -> Result<(), bonsaidb_core::Error> + Send + Sync,
225
33554
    >(
226
33554
        &self,
227
33554
        key: Option<QueryKey<V::Key>>,
228
33554
        order: Sort,
229
33554
        limit: Option<usize>,
230
33554
        access_policy: AccessPolicy,
231
33554
        callback: F,
232
33554
    ) -> Result<(), bonsaidb_core::Error> {
233
33554
        let view = self
234
33554
            .data
235
33554
            .schema
236
33554
            .view::<V>()
237
33554
            .expect("query made with view that isn't registered with this database");
238
33554

            
239
33554
        self.for_each_in_view(
240
33554
            view,
241
33554
            key.map(|key| key.serialized()).transpose()?,
242
33554
            order,
243
33554
            limit,
244
33554
            access_policy,
245
33554
            callback,
246
30494
        )
247
30494
        .await
248
33554
    }
249

            
250
    #[cfg(feature = "internal-apis")]
251
    #[doc(hidden)]
252
173008
    pub async fn internal_get_from_collection_id(
253
173008
        &self,
254
173008
        id: u64,
255
173008
        collection: &CollectionName,
256
173008
    ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error> {
257
7864
        self.get_from_collection_id(id, collection).await
258
7864
    }
259

            
260
    #[cfg(feature = "internal-apis")]
261
    #[doc(hidden)]
262
132
    pub async fn list_from_collection(
263
132
        &self,
264
132
        ids: Range<u64>,
265
132
        order: Sort,
266
132
        limit: Option<usize>,
267
132
        collection: &CollectionName,
268
132
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
269
6
        self.list(ids, order, limit, collection).await
270
6
    }
271

            
272
    #[cfg(feature = "internal-apis")]
273
    #[doc(hidden)]
274
88
    pub async fn internal_get_multiple_from_collection_id(
275
88
        &self,
276
88
        ids: &[u64],
277
88
        collection: &CollectionName,
278
88
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
279
4
        self.get_multiple_from_collection_id(ids, collection).await
280
4
    }
281

            
282
    #[cfg(feature = "internal-apis")]
283
    #[doc(hidden)]
284
44
    pub async fn compact_collection_by_name(
285
44
        &self,
286
44
        collection: CollectionName,
287
44
    ) -> Result<(), bonsaidb_core::Error> {
288
2
        self.storage()
289
2
            .tasks()
290
2
            .compact_collection(self.clone(), collection)
291
2
            .await?;
292
2
        Ok(())
293
2
    }
294

            
295
    #[cfg(feature = "internal-apis")]
296
    #[doc(hidden)]
297
86812
    pub async fn query_by_name(
298
86812
        &self,
299
86812
        view: &ViewName,
300
86812
        key: Option<QueryKey<Bytes>>,
301
86812
        order: Sort,
302
86812
        limit: Option<usize>,
303
86812
        access_policy: AccessPolicy,
304
86812
    ) -> Result<Vec<bonsaidb_core::schema::view::map::Serialized>, bonsaidb_core::Error> {
305
3946
        if let Some(view) = self.schematic().view_by_name(view) {
306
3946
            let mut results = Vec::new();
307
3946
            self.for_each_in_view(view, key, order, limit, access_policy, |collection| {
308
3944
                let entry = ViewEntry::from(collection);
309
7906
                for mapping in entry.mappings {
310
3962
                    results.push(bonsaidb_core::schema::view::map::Serialized {
311
3962
                        source: mapping.source,
312
3962
                        key: entry.key.clone(),
313
3962
                        value: mapping.value,
314
3962
                    });
315
3962
                }
316
3944
                Ok(())
317
3946
            })
318
1879
            .await?;
319

            
320
3945
            Ok(results)
321
        } else {
322
            Err(bonsaidb_core::Error::CollectionNotFound)
323
        }
324
3945
    }
325

            
326
    #[cfg(feature = "internal-apis")]
327
    #[doc(hidden)]
328
85712
    pub async fn query_by_name_with_docs(
329
85712
        &self,
330
85712
        view: &ViewName,
331
85712
        key: Option<QueryKey<Bytes>>,
332
85712
        order: Sort,
333
85712
        limit: Option<usize>,
334
85712
        access_policy: AccessPolicy,
335
85712
    ) -> Result<bonsaidb_core::schema::view::map::MappedSerializedDocuments, bonsaidb_core::Error>
336
85712
    {
337
3896
        let results = self
338
3896
            .query_by_name(view, key, order, limit, access_policy)
339
1844
            .await?;
340
3896
        let view = self.schematic().view_by_name(view).unwrap(); // query() will fail if it's not present
341

            
342
3896
        let documents = self
343
3896
            .get_multiple_from_collection_id(
344
3896
                &results.iter().map(|m| m.source.id).collect::<Vec<_>>(),
345
3896
                &view.collection(),
346
3896
            )
347
2240
            .await?
348
3896
            .into_iter()
349
3896
            .map(|doc| (doc.header.id, doc))
350
3896
            .collect::<BTreeMap<_, _>>();
351
3896

            
352
3896
        Ok(
353
3896
            bonsaidb_core::schema::view::map::MappedSerializedDocuments {
354
3896
                mappings: results,
355
3896
                documents,
356
3896
            },
357
3896
        )
358
3896
    }
359

            
360
    #[cfg(feature = "internal-apis")]
361
    #[doc(hidden)]
362
170742
    pub async fn reduce_by_name(
363
170742
        &self,
364
170742
        view: &ViewName,
365
170742
        key: Option<QueryKey<Bytes>>,
366
170742
        access_policy: AccessPolicy,
367
170742
    ) -> Result<Vec<u8>, bonsaidb_core::Error> {
368
7762
        self.reduce_in_view(view, key, access_policy).await
369
7761
    }
370

            
371
    #[cfg(feature = "internal-apis")]
372
    #[doc(hidden)]
373
132
    pub async fn reduce_grouped_by_name(
374
132
        &self,
375
132
        view: &ViewName,
376
132
        key: Option<QueryKey<Bytes>>,
377
132
        access_policy: AccessPolicy,
378
132
    ) -> Result<Vec<MappedSerializedValue>, bonsaidb_core::Error> {
379
7
        self.grouped_reduce_in_view(view, key, access_policy).await
380
6
    }
381

            
382
    #[cfg(feature = "internal-apis")]
383
    #[doc(hidden)]
384
44
    pub async fn delete_docs_by_name(
385
44
        &self,
386
44
        view: &ViewName,
387
44
        key: Option<QueryKey<Bytes>>,
388
44
        access_policy: AccessPolicy,
389
44
    ) -> Result<u64, bonsaidb_core::Error> {
390
2
        let view = self
391
2
            .data
392
2
            .schema
393
2
            .view_by_name(view)
394
2
            .ok_or(bonsaidb_core::Error::CollectionNotFound)?;
395
2
        let collection = view.collection();
396
2
        let mut transaction = Transaction::default();
397
2
        self.for_each_in_view(view, key, Sort::Ascending, None, access_policy, |entry| {
398
2
            let entry = ViewEntry::from(entry);
399

            
400
6
            for mapping in entry.mappings {
401
4
                transaction.push(Operation::delete(collection.clone(), mapping.source));
402
4
            }
403

            
404
2
            Ok(())
405
2
        })
406
2
        .await?;
407

            
408
2
        let results = Connection::apply_transaction(self, transaction).await?;
409

            
410
2
        Ok(results.len() as u64)
411
2
    }
412

            
413
237909
    async fn get_from_collection_id(
414
237909
        &self,
415
237909
        id: u64,
416
237909
        collection: &CollectionName,
417
237909
    ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error> {
418
10836
        let task_self = self.clone();
419
10836
        let collection = collection.clone();
420
10836
        tokio::task::spawn_blocking(move || {
421
10836
            let tree = task_self
422
10836
                .data
423
10836
                .context
424
10836
                .roots
425
10836
                .tree(task_self.collection_tree::<Versioned, _>(
426
10836
                    &collection,
427
10836
                    document_tree_name(&collection),
428
10836
                )?)
429
10836
                .map_err(Error::from)?;
430
10326
            if let Some(vec) = tree
431
                .get(
432
10836
                    &id.as_big_endian_bytes()
433
10836
                        .map_err(view::Error::key_serialization)?,
434
                )
435
10836
                .map_err(Error::from)?
436
            {
437
10326
                Ok(Some(deserialize_document(&vec)?.into_owned()))
438
            } else {
439
509
                Ok(None)
440
            }
441
10836
        })
442
6269
        .await
443
10836
        .unwrap()
444
10836
    }
445

            
446
132692
    async fn get_multiple_from_collection_id(
447
132692
        &self,
448
132692
        ids: &[u64],
449
132692
        collection: &CollectionName,
450
132692
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
451
6041
        let task_self = self.clone();
452
6046
        let ids = ids.iter().map(|id| id.to_be_bytes()).collect::<Vec<_>>();
453
6041
        let collection = collection.clone();
454
6041
        tokio::task::spawn_blocking(move || {
455
6041
            let tree = task_self
456
6041
                .data
457
6041
                .context
458
6041
                .roots
459
6041
                .tree(task_self.collection_tree::<Versioned, _>(
460
6041
                    &collection,
461
6041
                    document_tree_name(&collection),
462
6041
                )?)
463
6041
                .map_err(Error::from)?;
464
6046
            let mut ids = ids.iter().map(|id| &id[..]).collect::<Vec<_>>();
465
6041
            ids.sort();
466
6041
            let keys_and_values = tree.get_multiple(&ids).map_err(Error::from)?;
467

            
468
6041
            keys_and_values
469
6041
                .into_iter()
470
6046
                .map(|(_, value)| deserialize_document(&value).map(BorrowedDocument::into_owned))
471
6041
                .collect()
472
6041
        })
473
3319
        .await
474
6041
        .unwrap()
475
6041
    }
476

            
477
360
    pub(crate) async fn list(
478
360
        &self,
479
360
        ids: Range<u64>,
480
360
        sort: Sort,
481
360
        limit: Option<usize>,
482
360
        collection: &CollectionName,
483
360
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
484
24
        let task_self = self.clone();
485
24
        let collection = collection.clone();
486
24
        tokio::task::spawn_blocking(move || {
487
24
            let tree = task_self
488
24
                .data
489
24
                .context
490
24
                .roots
491
24
                .tree(task_self.collection_tree::<Versioned, _>(
492
24
                    &collection,
493
24
                    document_tree_name(&collection),
494
24
                )?)
495
24
                .map_err(Error::from)?;
496
24
            let mut found_docs = Vec::new();
497
24
            let mut keys_read = 0;
498
24
            let ids = U64Range::new(ids);
499
24
            tree.scan(
500
24
                &ids.borrow_as_bytes(),
501
24
                match sort {
502
20
                    Sort::Ascending => true,
503
4
                    Sort::Descending => false,
504
                },
505
                |_, _, _| true,
506
24
                |_, _| {
507
29
                    if let Some(limit) = limit {
508
8
                        if keys_read >= limit {
509
4
                            return KeyEvaluation::Stop;
510
4
                        }
511
4

            
512
4
                        keys_read += 1;
513
21
                    }
514
25
                    KeyEvaluation::ReadData
515
29
                },
516
24
                |_, _, doc| {
517
                    found_docs.push(
518
25
                        deserialize_document(&doc)
519
25
                            .map(BorrowedDocument::into_owned)
520
25
                            .map_err(AbortError::Other)?,
521
                    );
522
25
                    Ok(())
523
25
                },
524
            )
525
24
            .map_err(|err| match err {
526
                AbortError::Other(err) => err,
527
                AbortError::Nebari(err) => bonsaidb_core::Error::from(crate::Error::from(err)),
528
24
            })
529
24
            .unwrap();
530
24

            
531
24
            Ok(found_docs)
532
24
        })
533
24
        .await
534
24
        .unwrap()
535
24
    }
536

            
537
256349
    async fn reduce_in_view(
538
256349
        &self,
539
256349
        view_name: &ViewName,
540
256349
        key: Option<QueryKey<Bytes>>,
541
256349
        access_policy: AccessPolicy,
542
256349
    ) -> Result<Vec<u8>, bonsaidb_core::Error> {
543
11657
        let view = self
544
11657
            .data
545
11657
            .schema
546
11657
            .view_by_name(view_name)
547
11657
            .ok_or(bonsaidb_core::Error::CollectionNotFound)?;
548
11657
        let mut mappings = self
549
11659
            .grouped_reduce_in_view(view_name, key, access_policy)
550
237
            .await?;
551

            
552
11657
        let result = if mappings.len() == 1 {
553
6425
            mappings.pop().unwrap().value.into_vec()
554
        } else {
555
5232
            view.reduce(
556
5232
                &mappings
557
5232
                    .iter()
558
5369
                    .map(|map| (map.key.as_ref(), map.value.as_ref()))
559
5232
                    .collect::<Vec<_>>(),
560
5232
                true,
561
5232
            )
562
5232
            .map_err(Error::from)?
563
        };
564

            
565
11653
        Ok(result)
566
11657
    }
567

            
568
256527
    async fn grouped_reduce_in_view(
569
256527
        &self,
570
256527
        view_name: &ViewName,
571
256527
        key: Option<QueryKey<Bytes>>,
572
256527
        access_policy: AccessPolicy,
573
256527
    ) -> Result<Vec<MappedSerializedValue>, bonsaidb_core::Error> {
574
11667
        let view = self
575
11667
            .data
576
11667
            .schema
577
11667
            .view_by_name(view_name)
578
11667
            .ok_or(bonsaidb_core::Error::CollectionNotFound)?;
579
11667
        let mut mappings = Vec::new();
580
11818
        self.for_each_in_view(view, key, Sort::Ascending, None, access_policy, |entry| {
581
6596
            let entry = ViewEntry::from(entry);
582
6596
            mappings.push(MappedSerializedValue {
583
6596
                key: entry.key,
584
6596
                value: entry.reduced_value,
585
6596
            });
586
6596
            Ok(())
587
11820
        })
588
244
        .await?;
589

            
590
11667
        Ok(mappings)
591
11667
    }
592

            
593
300420
    fn apply_transaction_to_roots(
594
300420
        &self,
595
300420
        transaction: &Transaction,
596
300420
    ) -> Result<Vec<OperationResult>, Error> {
597
300420
        let mut open_trees = OpenTrees::default();
598
800423
        for op in &transaction.operations {
599
500070
            if !self.data.schema.contains_collection_id(&op.collection) {
600
67
                return Err(Error::Core(bonsaidb_core::Error::CollectionNotFound));
601
500003
            }
602
500003

            
603
500003
            open_trees.open_trees_for_document_change(
604
500003
                &op.collection,
605
500003
                &self.data.schema,
606
500003
                self.collection_encryption_key(&op.collection),
607
500003
                #[cfg(feature = "encryption")]
608
500003
                self.storage().vault(),
609
500003
            )?;
610
        }
611

            
612
300353
        let mut roots_transaction = self
613
300353
            .data
614
300353
            .context
615
300353
            .roots
616
300353
            .transaction::<_, dyn AnyTreeRoot<StdFile>>(&open_trees.trees)?;
617

            
618
300353
        let mut results = Vec::new();
619
300353
        let mut changed_documents = Vec::new();
620
788776
        for op in &transaction.operations {
621
499981
            let result = self.execute_operation(
622
499981
                op,
623
499981
                &mut roots_transaction,
624
499981
                &open_trees.trees_index_by_name,
625
499981
            )?;
626

            
627
488445
            match &result {
628
459532
                OperationResult::DocumentUpdated { header, collection } => {
629
459532
                    changed_documents.push(ChangedDocument {
630
459532
                        collection: collection.clone(),
631
459532
                        id: header.id,
632
459532
                        deleted: false,
633
459532
                    });
634
459532
                }
635
28891
                OperationResult::DocumentDeleted { id, collection } => {
636
28891
                    changed_documents.push(ChangedDocument {
637
28891
                        collection: collection.clone(),
638
28891
                        id: *id,
639
28891
                        deleted: true,
640
28891
                    });
641
28891
                }
642
                OperationResult::Success => {}
643
            }
644
488423
            results.push(result);
645
        }
646

            
647
        // Insert invalidations for each record changed
648
289785
        for (collection, changed_documents) in &changed_documents
649
288795
            .iter()
650
488467
            .group_by(|doc| doc.collection.clone())
651
        {
652
289784
            if let Some(views) = self.data.schema.views_in_collection(&collection) {
653
187307
                let changed_documents = changed_documents.collect::<Vec<_>>();
654
815854
                for view in views {
655
628481
                    if !view.unique() {
656
597135
                        let view_name = view.view_name();
657
1262672
                        for changed_document in &changed_documents {
658
665471
                            let invalidated_docs = roots_transaction
659
665471
                                .tree::<Unversioned>(
660
665471
                                    open_trees.trees_index_by_name
661
665471
                                        [&view_invalidated_docs_tree_name(&view_name)],
662
665471
                                )
663
665471
                                .unwrap();
664
665471
                            invalidated_docs.set(
665
665471
                                changed_document.id.as_big_endian_bytes().unwrap().to_vec(),
666
665471
                                b"",
667
665471
                            )?;
668
                        }
669
31346
                    }
670
                }
671
102477
            }
672
        }
673

            
674
288773
        roots_transaction
675
288773
            .entry_mut()
676
288773
            .set_data(pot::to_vec(&Changes::Documents(changed_documents))?)?;
677

            
678
288773
        roots_transaction.commit()?;
679

            
680
288817
        Ok(results)
681
300420
    }
682

            
683
500003
    fn execute_operation(
684
500003
        &self,
685
500003
        operation: &Operation,
686
500003
        transaction: &mut ExecutingTransaction<StdFile>,
687
500003
        tree_index_map: &HashMap<String, usize>,
688
500003
    ) -> Result<OperationResult, Error> {
689
500003
        match &operation.command {
690
385074
            Command::Insert { id, contents } => self.execute_insert(
691
385074
                operation,
692
385074
                transaction,
693
385074
                tree_index_map,
694
385074
                *id,
695
385074
                contents.to_vec(),
696
385074
            ),
697
86038
            Command::Update { header, contents } => self.execute_update(
698
86038
                operation,
699
86038
                transaction,
700
86038
                tree_index_map,
701
86038
                header,
702
86038
                contents.to_vec(),
703
86038
            ),
704
28891
            Command::Delete { header } => {
705
28891
                self.execute_delete(operation, transaction, tree_index_map, header)
706
            }
707
        }
708
500003
    }
709

            
710
86038
    fn execute_update(
711
86038
        &self,
712
86038
        operation: &Operation,
713
86038
        transaction: &mut ExecutingTransaction<StdFile>,
714
86038
        tree_index_map: &HashMap<String, usize>,
715
86038
        header: &Header,
716
86038
        contents: Vec<u8>,
717
86038
    ) -> Result<OperationResult, crate::Error> {
718
86038
        let documents = transaction
719
86038
            .tree::<Versioned>(tree_index_map[&document_tree_name(&operation.collection)])
720
86038
            .unwrap();
721
86038
        let document_id = header.id.as_big_endian_bytes().unwrap();
722
        // TODO switch to compare_swap
723

            
724
86038
        if let Some(vec) = documents.get(document_id.as_ref())? {
725
85971
            let doc = deserialize_document(&vec)?;
726
85971
            if doc.header.revision == header.revision {
727
85837
                if let Some(updated_doc) = doc.create_new_revision(contents) {
728
                    documents.set(
729
85770
                        updated_doc
730
85770
                            .header
731
85770
                            .id
732
85770
                            .as_big_endian_bytes()
733
85770
                            .unwrap()
734
85770
                            .as_ref()
735
85770
                            .to_vec(),
736
85770
                        serialize_document(&updated_doc)?,
737
                    )?;
738

            
739
85770
                    self.update_unique_views(&document_id, operation, transaction, tree_index_map)?;
740

            
741
85636
                    Ok(OperationResult::DocumentUpdated {
742
85636
                        collection: operation.collection.clone(),
743
85636
                        header: updated_doc.header,
744
85636
                    })
745
                } else {
746
                    // If no new revision was made, it means an attempt to
747
                    // save a document with the same contents was made.
748
                    // We'll return a success but not actually give a new
749
                    // version
750
67
                    Ok(OperationResult::DocumentUpdated {
751
67
                        collection: operation.collection.clone(),
752
67
                        header: doc.header,
753
67
                    })
754
                }
755
            } else {
756
134
                Err(Error::Core(bonsaidb_core::Error::DocumentConflict(
757
134
                    operation.collection.clone(),
758
134
                    header.id,
759
134
                )))
760
            }
761
        } else {
762
67
            Err(Error::Core(bonsaidb_core::Error::DocumentNotFound(
763
67
                operation.collection.clone(),
764
67
                header.id,
765
67
            )))
766
        }
767
86038
    }
768

            
769
385074
    fn execute_insert(
770
385074
        &self,
771
385074
        operation: &Operation,
772
385074
        transaction: &mut ExecutingTransaction<StdFile>,
773
385074
        tree_index_map: &HashMap<String, usize>,
774
385074
        id: Option<u64>,
775
385074
        contents: Vec<u8>,
776
385074
    ) -> Result<OperationResult, Error> {
777
385074
        let documents = transaction
778
385074
            .tree::<Versioned>(tree_index_map[&document_tree_name(&operation.collection)])
779
385074
            .unwrap();
780
385074
        let id = if let Some(id) = id {
781
196883
            id
782
        } else {
783
188191
            let last_key = documents
784
188191
                .last_key()?
785
188191
                .map(|bytes| BigEndian::read_u64(&bytes))
786
188191
                .unwrap_or_default();
787
188191
            last_key + 1
788
        };
789

            
790
385074
        let doc = BorrowedDocument::new(id, contents);
791
385074
        let serialized: Vec<u8> = serialize_document(&doc)?;
792
385074
        let document_id = ArcBytes::from(doc.header.id.as_big_endian_bytes().unwrap().to_vec());
793
385074
        if documents
794
385074
            .replace(document_id.clone(), serialized)?
795
385074
            .is_some()
796
        {
797
11090
            Err(Error::Core(bonsaidb_core::Error::DocumentConflict(
798
11090
                operation.collection.clone(),
799
11090
                id,
800
11090
            )))
801
        } else {
802
373984
            self.update_unique_views(&document_id, operation, transaction, tree_index_map)?;
803

            
804
373873
            Ok(OperationResult::DocumentUpdated {
805
373873
                collection: operation.collection.clone(),
806
373873
                header: doc.header,
807
373873
            })
808
        }
809
385074
    }
810

            
811
28891
    fn execute_delete(
812
28891
        &self,
813
28891
        operation: &Operation,
814
28891
        transaction: &mut ExecutingTransaction<StdFile>,
815
28891
        tree_index_map: &HashMap<String, usize>,
816
28891
        header: &Header,
817
28891
    ) -> Result<OperationResult, Error> {
818
28891
        let documents = transaction
819
28891
            .tree::<Versioned>(tree_index_map[&document_tree_name(&operation.collection)])
820
28891
            .unwrap();
821
28891
        let document_id = header.id.as_big_endian_bytes().unwrap();
822
28891
        if let Some(vec) = documents.remove(&document_id)? {
823
28891
            let doc = deserialize_document(&vec)?;
824
28891
            if &doc.header == header {
825
28891
                self.update_unique_views(
826
28891
                    document_id.as_ref(),
827
28891
                    operation,
828
28891
                    transaction,
829
28891
                    tree_index_map,
830
28891
                )?;
831

            
832
28891
                Ok(OperationResult::DocumentDeleted {
833
28891
                    collection: operation.collection.clone(),
834
28891
                    id: header.id,
835
28891
                })
836
            } else {
837
                Err(Error::Core(bonsaidb_core::Error::DocumentConflict(
838
                    operation.collection.clone(),
839
                    header.id,
840
                )))
841
            }
842
        } else {
843
            Err(Error::Core(bonsaidb_core::Error::DocumentNotFound(
844
                operation.collection.clone(),
845
                header.id,
846
            )))
847
        }
848
28891
    }
849

            
850
    fn update_unique_views(
851
        &self,
852
        document_id: &[u8],
853
        operation: &Operation,
854
        transaction: &mut ExecutingTransaction<StdFile>,
855
        tree_index_map: &HashMap<String, usize>,
856
    ) -> Result<(), Error> {
857
488645
        if let Some(unique_views) = self
858
488645
            .data
859
488645
            .schema
860
488645
            .unique_views_in_collection(&operation.collection)
861
        {
862
126935
            for view in unique_views {
863
63579
                let name = view.view_name();
864
63579
                mapper::DocumentRequest {
865
63579
                    database: self,
866
63579
                    document_id,
867
63579
                    map_request: &mapper::Map {
868
63579
                        database: self.data.name.clone(),
869
63579
                        collection: operation.collection.clone(),
870
63579
                        view_name: name.clone(),
871
63579
                    },
872
63579
                    transaction,
873
63579
                    document_map_index: tree_index_map[&view_document_map_tree_name(&name)],
874
63579
                    documents_index: tree_index_map[&document_tree_name(&operation.collection)],
875
63579
                    omitted_entries_index: tree_index_map[&view_omitted_docs_tree_name(&name)],
876
63579
                    view_entries_index: tree_index_map[&view_entries_tree_name(&name)],
877
63579
                    view,
878
63579
                }
879
63579
                .map()?;
880
            }
881
425044
        }
882

            
883
488400
        Ok(())
884
488645
    }
885

            
886
424544
    fn create_view_iterator<'a, K: for<'k> Key<'k> + 'a>(
887
424544
        view_entries: &'a Tree<Unversioned, StdFile>,
888
424544
        key: Option<QueryKey<K>>,
889
424544
        order: Sort,
890
424544
        limit: Option<usize>,
891
424544
    ) -> Result<Vec<ViewEntryCollection>, Error> {
892
424544
        let mut values = Vec::new();
893
424544
        let forwards = match order {
894
424477
            Sort::Ascending => true,
895
67
            Sort::Descending => false,
896
        };
897
424544
        let mut values_read = 0;
898
424544
        if let Some(key) = key {
899
421878
            match key {
900
200
                QueryKey::Range(range) => {
901
200
                    let range = range
902
200
                        .as_big_endian_bytes()
903
200
                        .map_err(view::Error::key_serialization)?;
904
200
                    view_entries.scan::<Infallible, _, _, _, _>(
905
400
                        &range.map_ref(|bytes| &bytes[..]),
906
200
                        forwards,
907
464
                        |_, _, _| true,
908
200
                        |_, _| {
909
1148
                            if let Some(limit) = limit {
910
134
                                if values_read >= limit {
911
67
                                    return KeyEvaluation::Stop;
912
67
                                }
913
67
                                values_read += 1;
914
1014
                            }
915
1081
                            KeyEvaluation::ReadData
916
1148
                        },
917
1081
                        |_key, _index, value| {
918
1081
                            values.push(value);
919
1081
                            Ok(())
920
1081
                        },
921
200
                    )?;
922
                }
923
421522
                QueryKey::Matches(key) => {
924
421522
                    let key = key
925
421522
                        .as_big_endian_bytes()
926
421522
                        .map_err(view::Error::key_serialization)?
927
421522
                        .to_vec();
928
421522

            
929
421522
                    values.extend(view_entries.get(&key)?);
930
                }
931
156
                QueryKey::Multiple(list) => {
932
156
                    let mut list = list
933
156
                        .into_iter()
934
312
                        .map(|key| {
935
312
                            key.as_big_endian_bytes()
936
312
                                .map(|bytes| bytes.to_vec())
937
312
                                .map_err(view::Error::key_serialization)
938
312
                        })
939
156
                        .collect::<Result<Vec<_>, _>>()?;
940

            
941
156
                    list.sort();
942
156

            
943
156
                    values.extend(
944
156
                        view_entries
945
156
                            .get_multiple(&list.iter().map(Vec::as_slice).collect::<Vec<_>>())?
946
156
                            .into_iter()
947
312
                            .map(|(_, value)| value),
948
                    );
949
                }
950
            }
951
        } else {
952
2666
            view_entries.scan::<Infallible, _, _, _, _>(
953
2666
                &(..),
954
2666
                forwards,
955
2666
                |_, _, _| true,
956
2666
                |_, _| {
957
3781
                    if let Some(limit) = limit {
958
                        if values_read >= limit {
959
                            return KeyEvaluation::Stop;
960
                        }
961
                        values_read += 1;
962
3781
                    }
963
3781
                    KeyEvaluation::ReadData
964
3810
                },
965
3810
                |_, _, value| {
966
3781
                    values.push(value);
967
3781
                    Ok(())
968
3810
                },
969
2666
            )?;
970
        }
971

            
972
424544
        values
973
424544
            .into_iter()
974
424544
            .map(|value| bincode::deserialize(&value).map_err(Error::from))
975
424544
            .collect::<Result<Vec<_>, Error>>()
976
424544
    }
977

            
978
1786302
    pub(crate) fn collection_encryption_key(&self, collection: &CollectionName) -> Option<&KeyId> {
979
1786302
        self.schematic()
980
1786302
            .encryption_key_for_collection(collection)
981
1786302
            .or_else(|| self.storage().default_encryption_key())
982
1786302
    }
983

            
984
    #[cfg_attr(
985
        not(feature = "encryption"),
986
        allow(unused_mut, unused_variables, clippy::let_and_return)
987
    )]
988
    #[cfg_attr(feature = "encryption", allow(clippy::unnecessary_wraps))]
989
1286343
    pub(crate) fn collection_tree<R: Root, S: Into<Cow<'static, str>>>(
990
1286343
        &self,
991
1286343
        collection: &CollectionName,
992
1286343
        name: S,
993
1286343
    ) -> Result<TreeRoot<R, StdFile>, Error> {
994
1286343
        let mut tree = R::tree(name);
995

            
996
1286343
        if let Some(key) = self.collection_encryption_key(collection) {
997
            #[cfg(feature = "encryption")]
998
36903
            {
999
36903
                tree = tree.with_vault(TreeVault {
36903
                    key: key.clone(),
36903
                    vault: self.storage().vault().clone(),
36903
                });
36903
            }

            
            #[cfg(not(feature = "encryption"))]
            {
                return Err(Error::EncryptionDisabled);
            }
1249440
        }

            
1286343
        Ok(tree)
1286343
    }

            
1
    pub(crate) async fn update_key_expiration_async<'key>(
1
        &self,
1
        tree_key: impl Into<Cow<'key, str>>,
1
        expiration: Option<Timestamp>,
1
    ) {
1
        self.data
1
            .context
1
            .update_key_expiration_async(tree_key, expiration)
            .await;
1
    }
}

            
574662
pub(crate) fn deserialize_document(
574662
    bytes: &[u8],
574662
) -> Result<BorrowedDocument<'_>, bonsaidb_core::Error> {
574662
    let document = bincode::deserialize::<BorrowedDocument<'_>>(bytes).map_err(Error::from)?;
574662
    Ok(document)
574662
}

            
470822
fn serialize_document(document: &BorrowedDocument<'_>) -> Result<Vec<u8>, bonsaidb_core::Error> {
470822
    bincode::serialize(document)
470822
        .map_err(Error::from)
470822
        .map_err(bonsaidb_core::Error::from)
470822
}

            
#[async_trait]
impl Connection for Database {
901172
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(transaction)))]
    async fn apply_transaction(
        &self,
        transaction: Transaction,
300398
    ) -> Result<Vec<OperationResult>, bonsaidb_core::Error> {
300420
        let task_self = self.clone();
300420
        let mut unique_view_tasks = Vec::new();
301454
        for collection_name in transaction
300420
            .operations
300420
            .iter()
500070
            .map(|op| &op.collection)
300420
            .collect::<HashSet<_>>()
        {
301453
            if let Some(views) = self.data.schema.views_in_collection(collection_name) {
872865
                for view in views {
674000
                    if view.unique() {
31635
                        if let Some(task) = self
31635
                            .data
31635
                            .storage
31635
                            .tasks()
31635
                            .spawn_integrity_check(view, self)
                            .await?
621
                        {
621
                            unique_view_tasks.push(task);
30970
                        }
642365
                    }
                }
102544
            }
        }
301041
        for task in unique_view_tasks {
643
            task.receive()
555
                .await
621
                .map_err(Error::from)?
621
                .map_err(Error::from)?;
        }

            
300420
        tokio::task::spawn_blocking(move || task_self.apply_transaction_to_roots(&transaction))
259542
            .await
300398
            .map_err(|err| bonsaidb_core::Error::Database(err.to_string()))?
300420
            .map_err(bonsaidb_core::Error::from)
600818
    }

            
8916
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(id)))]
    async fn get<C: schema::Collection>(
        &self,
        id: u64,
2972
    ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error> {
2972
        self.get_from_collection_id(id, &C::collection_name()).await
5944
    }

            
6423
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(ids)))]
    async fn get_multiple<C: schema::Collection>(
        &self,
        ids: &[u64],
2141
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
2141
        self.get_multiple_from_collection_id(ids, &C::collection_name())
1075
            .await
4282
    }

            
18
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(ids, order, limit)))]
    async fn list<C: schema::Collection, R: Into<Range<u64>> + Send>(
        &self,
        ids: R,
        order: Sort,
        limit: Option<usize>,
6
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
6
        self.list(ids.into(), order, limit, &C::collection_name())
6
            .await
12
    }

            
    #[cfg_attr(
        feature = "tracing",
69750
        tracing::instrument(skip(starting_id, result_limit))
    )]
    async fn list_executed_transactions(
        &self,
        starting_id: Option<u64>,
        result_limit: Option<usize>,
23250
    ) -> Result<Vec<transaction::Executed>, bonsaidb_core::Error> {
23250
        let result_limit = result_limit
23250
            .unwrap_or(LIST_TRANSACTIONS_DEFAULT_RESULT_COUNT)
23250
            .min(LIST_TRANSACTIONS_MAX_RESULTS);
23250
        if result_limit > 0 {
23183
            let task_self = self.clone();
23183
            tokio::task::spawn_blocking::<_, Result<Vec<transaction::Executed>, Error>>(move || {
23183
                let range = if let Some(starting_id) = starting_id {
11826
                    Range::from(starting_id..)
                } else {
11357
                    Range::from(..)
                };

            
23183
                let mut entries = Vec::new();
174503
                task_self.roots().transactions().scan(range, |entry| {
174503
                    entries.push(entry);
174503
                    entries.len() < result_limit
174503
                })?;

            
23183
                entries
23183
                    .into_iter()
23183
                    .map(|entry| {
174503
                        if let Some(data) = entry.data() {
174503
                            let changes = match pot::from_slice(data) {
174503
                                Ok(changes) => changes,
                                Err(pot::Error::NotAPot) => {
                                    Changes::Documents(bincode::deserialize(entry.data().unwrap())?)
                                }
                                other => other?,
                            };
174503
                            Ok(Some(transaction::Executed {
174503
                                id: entry.id,
174503
                                changes,
174503
                            }))
                        } else {
                            Ok(None)
                        }
174503
                    })
23183
                    .filter_map(Result::transpose)
23183
                    .collect::<Result<Vec<_>, Error>>()
23183
            })
18848
            .await
23183
            .unwrap()
23183
            .map_err(bonsaidb_core::Error::from)
        } else {
            // A request was made to return an empty result? This should probably be
            // an error, but technically this is a correct response.
67
            Ok(Vec::default())
        }
46500
    }

            
    #[cfg_attr(
        feature = "tracing",
100654
        tracing::instrument(skip(key, order, limit, access_policy))
    )]
    #[must_use]
    async fn query<V: schema::SerializedView>(
        &self,
        key: Option<QueryKey<V::Key>>,
        order: Sort,
        limit: Option<usize>,
        access_policy: AccessPolicy,
    ) -> Result<Vec<Map<V::Key, V::Value>>, bonsaidb_core::Error>
    where
        Self: Sized,
33530
    {
33530
        let mut results = Vec::new();
33530
        self.for_each_view_entry::<V, _>(key, order, limit, access_policy, |collection| {
14167
            let entry = ViewEntry::from(collection);
14167
            let key = <V::Key as Key>::from_big_endian_bytes(&entry.key)
14167
                .map_err(view::Error::key_serialization)
14167
                .map_err(Error::from)?;
28358
            for entry in entry.mappings {
                results.push(Map::new(
14191
                    entry.source,
14191
                    key.clone(),
14191
                    V::deserialize(&entry.value)?,
                ));
            }
14167
            Ok(())
33530
        })
30492
        .await?;

            
33530
        Ok(results)
67060
    }

            
    #[cfg_attr(
        feature = "tracing",
6394
        tracing::instrument(skip(key, order, limit, access_policy))
    )]
    async fn query_with_docs<V: schema::SerializedView>(
        &self,
        key: Option<QueryKey<V::Key>>,
        order: Sort,
        limit: Option<usize>,
        access_policy: AccessPolicy,
    ) -> Result<MappedDocuments<OwnedDocument, V>, bonsaidb_core::Error>
    where
        Self: Sized,
2132
    {
2134
        let results = Connection::query::<V>(self, key, order, limit, access_policy).await?;

            
2132
        let documents = self
2136
            .get_multiple::<V::Collection>(&results.iter().map(|m| m.source.id).collect::<Vec<_>>())
1065
            .await?
2132
            .into_iter()
2136
            .map(|doc| (doc.header.id, doc))
2132
            .collect::<BTreeMap<u64, _>>();
2132

            
2132
        Ok(MappedDocuments {
2132
            mappings: results,
2132
            documents,
2132
        })
4264
    }

            
11688
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(key, access_policy)))]
    async fn reduce<V: schema::SerializedView>(
        &self,
        key: Option<QueryKey<V::Key>>,
        access_policy: AccessPolicy,
    ) -> Result<V::Value, bonsaidb_core::Error>
    where
        Self: Sized,
3896
    {
3896
        let view = self
3896
            .data
3896
            .schema
3896
            .view::<V>()
3896
            .expect("query made with view that isn't registered with this database");

            
3894
        let result = self
            .reduce_in_view(
3896
                &view.view_name(),
3896
                key.map(|key| key.serialized()).transpose()?,
3896
                access_policy,
87
            )
87
            .await?;
3894
        let value = V::deserialize(&result)?;

            
3893
        Ok(value)
7791
    }

            
12
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(key, access_policy)))]
    async fn reduce_grouped<V: schema::SerializedView>(
        &self,
        key: Option<QueryKey<V::Key>>,
        access_policy: AccessPolicy,
    ) -> Result<Vec<MappedValue<V::Key, V::Value>>, bonsaidb_core::Error>
    where
        Self: Sized,
4
    {
4
        let view = self
4
            .data
4
            .schema
4
            .view::<V>()
4
            .expect("query made with view that isn't registered with this database");

            
4
        let results = self
            .grouped_reduce_in_view(
4
                &view.view_name(),
4
                key.map(|key| key.serialized()).transpose()?,
4
                access_policy,
2
            )
2
            .await?;
4
        results
4
            .into_iter()
6
            .map(|map| {
6
                Ok(MappedValue::new(
6
                    V::Key::from_big_endian_bytes(&map.key)
6
                        .map_err(view::Error::key_serialization)?,
6
                    V::deserialize(&map.value)?,
                ))
6
            })
4
            .collect::<Result<Vec<_>, bonsaidb_core::Error>>()
8
    }

            
2
    async fn delete_docs<V: schema::SerializedView>(
2
        &self,
2
        key: Option<QueryKey<V::Key>>,
2
        access_policy: AccessPolicy,
2
    ) -> Result<u64, bonsaidb_core::Error>
2
    where
2
        Self: Sized,
2
    {
2
        let collection = <V::Collection as Collection>::collection_name();
2
        let mut transaction = Transaction::default();
2
        self.for_each_view_entry::<V, _>(
2
            key,
2
            Sort::Ascending,
2
            None,
2
            access_policy,
2
            |entry_collection| {
2
                let entry = ViewEntry::from(entry_collection);

            
6
                for mapping in entry.mappings {
4
                    transaction.push(Operation::delete(collection.clone(), mapping.source));
4
                }

            
2
                Ok(())
2
            },
2
        )
2
        .await?;

            
2
        let results = Connection::apply_transaction(self, transaction).await?;

            
2
        Ok(results.len() as u64)
4
    }

            
797727
    #[cfg_attr(feature = "tracing", tracing::instrument)]
265953
    async fn last_transaction_id(&self) -> Result<Option<u64>, bonsaidb_core::Error> {
265953
        Ok(self.roots().transactions().current_transaction_id())
265953
    }

            
6
    #[cfg_attr(feature = "tracing", tracing::instrument)]
2
    async fn compact_collection<C: schema::Collection>(&self) -> Result<(), bonsaidb_core::Error> {
2
        self.storage()
2
            .tasks()
2
            .compact_collection(self.clone(), C::collection_name())
2
            .await?;
2
        Ok(())
4
    }

            
201
    #[cfg_attr(feature = "tracing", tracing::instrument)]
67
    async fn compact(&self) -> Result<(), bonsaidb_core::Error> {
67
        self.storage()
67
            .tasks()
67
            .compact_database(self.clone())
67
            .await?;
67
        Ok(())
134
    }

            
201
    #[cfg_attr(feature = "tracing", tracing::instrument)]
67
    async fn compact_key_value_store(&self) -> Result<(), bonsaidb_core::Error> {
67
        self.storage()
67
            .tasks()
67
            .compact_key_value_store(self.clone())
67
            .await?;
67
        Ok(())
134
    }
}

            
type ViewIterator<'a> =
    Box<dyn Iterator<Item = Result<(ArcBytes<'static>, ArcBytes<'static>), Error>> + 'a>;

            
struct ViewEntryCollectionIterator<'a> {
    iterator: ViewIterator<'a>,
}

            
impl<'a> Iterator for ViewEntryCollectionIterator<'a> {
    type Item = Result<ViewEntryCollection, crate::Error>;

            
    fn next(&mut self) -> Option<Self::Item> {
        self.iterator.next().map(|item| {
            item.map_err(crate::Error::from)
                .and_then(|(_, value)| bincode::deserialize(&value).map_err(Error::from))
        })
    }
}

            
1147522
#[derive(Debug, Clone)]
pub(crate) struct Context {
    data: Arc<ContextData>,
}

            
impl Deref for Context {
    type Target = ContextData;

            
1900527
    fn deref(&self) -> &Self::Target {
1900527
        &self.data
1900527
    }
}

            
#[derive(Debug)]
pub(crate) struct ContextData {
    pub(crate) roots: Roots<StdFile>,
    key_value_state: Arc<Mutex<keyvalue::KeyValueState>>,
    runtime: tokio::runtime::Handle,
}

            
impl Borrow<Roots<StdFile>> for Context {
    fn borrow(&self) -> &Roots<StdFile> {
        &self.data.roots
    }
}

            
impl Context {
17751
    pub(crate) fn new(roots: Roots<StdFile>, key_value_persistence: KeyValuePersistence) -> Self {
17751
        let (background_sender, background_receiver) = watch::channel(None);
17751
        let key_value_state = Arc::new(Mutex::new(keyvalue::KeyValueState::new(
17751
            key_value_persistence,
17751
            roots.clone(),
17751
            background_sender,
17751
        )));
17751
        let context = Self {
17751
            data: Arc::new(ContextData {
17751
                roots,
17751
                key_value_state: key_value_state.clone(),
17751
                runtime: tokio::runtime::Handle::current(),
17751
            }),
17751
        };
17751
        tokio::task::spawn(keyvalue::background_worker(
17751
            key_value_state,
17751
            background_receiver,
17751
        ));
17751
        context
17751
    }

            
677639
    pub(crate) async fn perform_kv_operation(
677639
        &self,
677639
        op: KeyOperation,
677639
    ) -> Result<Output, bonsaidb_core::Error> {
677639
        let mut state = fast_async_lock!(self.data.key_value_state);
677639
        state
677639
            .perform_kv_operation(op, &self.data.key_value_state)
            .await
677639
    }

            
11
    pub(crate) async fn update_key_expiration_async<'key>(
11
        &self,
11
        tree_key: impl Into<Cow<'key, str>>,
11
        expiration: Option<Timestamp>,
11
    ) {
11
        let mut state = fast_async_lock!(self.data.key_value_state);
11
        state.update_key_expiration(tree_key, expiration);
11
    }
}

            
impl Drop for ContextData {
15979
    fn drop(&mut self) {
15979
        let key_value_state = self.key_value_state.clone();
15979
        self.runtime.spawn(async move {
12325
            let mut state = fast_async_lock!(key_value_state);
12325
            state.shutdown(&key_value_state).await
15979
        });
15979
    }
}

            
1534300
pub fn document_tree_name(collection: &CollectionName) -> String {
1534300
    format!("collection.{:#}", collection)
1534300
}