1
use std::{
2
    borrow::{Borrow, Cow},
3
    collections::{BTreeMap, HashMap, HashSet},
4
    convert::Infallible,
5
    ops::Deref,
6
    sync::Arc,
7
    u8,
8
};
9

            
10
use async_lock::Mutex;
11
use async_trait::async_trait;
12
use bonsaidb_core::{
13
    arc_bytes::{serde::Bytes, ArcBytes},
14
    connection::{AccessPolicy, Connection, QueryKey, Range, Sort, StorageConnection},
15
    document::{BorrowedDocument, Document, Header, KeyId, OwnedDocument},
16
    keyvalue::{KeyOperation, Output, Timestamp},
17
    limits::{LIST_TRANSACTIONS_DEFAULT_RESULT_COUNT, LIST_TRANSACTIONS_MAX_RESULTS},
18
    permissions::Permissions,
19
    schema::{
20
        self,
21
        view::{
22
            self,
23
            map::{MappedDocuments, MappedSerializedValue},
24
        },
25
        Collection, CollectionName, Key, Map, MappedValue, Schema, Schematic, ViewName,
26
    },
27
    transaction::{
28
        self, ChangedDocument, Changes, Command, Operation, OperationResult, Transaction,
29
    },
30
};
31
use bonsaidb_utils::fast_async_lock;
32
use byteorder::{BigEndian, ByteOrder};
33
use itertools::Itertools;
34
use nebari::{
35
    io::any::AnyFile,
36
    tree::{
37
        AnyTreeRoot, BorrowByteRange, KeyEvaluation, Root, TreeRoot, U64Range, Unversioned,
38
        Versioned,
39
    },
40
    AbortError, ExecutingTransaction, Roots, Tree,
41
};
42
use tokio::sync::watch;
43

            
44
#[cfg(feature = "encryption")]
45
use crate::vault::TreeVault;
46
use crate::{
47
    config::{Builder, KeyValuePersistence, StorageConfiguration},
48
    error::Error,
49
    open_trees::OpenTrees,
50
    views::{
51
        mapper::{self, ViewEntryCollection},
52
        view_document_map_tree_name, view_entries_tree_name, view_invalidated_docs_tree_name,
53
        view_omitted_docs_tree_name, ViewEntry,
54
    },
55
    Storage,
56
};
57

            
58
pub mod keyvalue;
59

            
60
pub mod pubsub;
61

            
62
/// A local, file-based database.
63
///
64
/// ## Using `Database` to create a single database
65
///
66
/// `Database`provides an easy mechanism to open and access a single database:
67
///
68
/// ```rust
69
/// // `bonsaidb_core` is re-exported to `bonsaidb::core` or `bonsaidb_local::core`.
70
/// use bonsaidb_core::schema::Collection;
71
/// // `bonsaidb_local` is re-exported to `bonsaidb::local` if using the omnibus crate.
72
/// use bonsaidb_local::{
73
///     config::{Builder, StorageConfiguration},
74
///     Database,
75
/// };
76
/// use serde::{Deserialize, Serialize};
77
///
78
/// #[derive(Debug, Serialize, Deserialize, Collection)]
79
/// #[collection(name = "blog-posts")]
80
/// # #[collection(core = bonsaidb_core)]
81
/// struct BlogPost {
82
///     pub title: String,
83
///     pub contents: String,
84
/// }
85
///
86
/// # async fn test_fn() -> Result<(), bonsaidb_core::Error> {
87
/// let db = Database::open::<BlogPost>(StorageConfiguration::new("my-db.bonsaidb")).await?;
88
/// #     Ok(())
89
/// # }
90
/// ```
91
///
92
/// Under the hood, this initializes a [`Storage`] instance pointing at
93
/// "./my-db.bonsaidb". It then returns (or creates) a database named "default"
94
/// with the schema `BlogPost`.
95
///
96
/// In this example, `BlogPost` implements the [`Collection`] trait, and all
97
/// collections can be used as a [`Schema`].
98
#[derive(Debug)]
99
pub struct Database {
100
    pub(crate) data: Arc<Data>,
101
}
102

            
103
#[derive(Debug)]
104
pub struct Data {
105
    pub name: Arc<Cow<'static, str>>,
106
    context: Context,
107
    pub(crate) storage: Storage,
108
    pub(crate) schema: Arc<Schematic>,
109
    #[allow(dead_code)] // This code was previously used, it works, but is currently unused.
110
    pub(crate) effective_permissions: Option<Permissions>,
111
}
112
impl Clone for Database {
113
1042954
    fn clone(&self) -> Self {
114
1042954
        Self {
115
1042954
            data: self.data.clone(),
116
1042954
        }
117
1042954
    }
118
}
119

            
120
impl Database {
121
    /// Opens a local file as a bonsaidb.
122
87464
    pub(crate) async fn new<DB: Schema, S: Into<Cow<'static, str>> + Send>(
123
87464
        name: S,
124
87464
        context: Context,
125
87464
        storage: Storage,
126
87464
    ) -> Result<Self, Error> {
127
87464
        let name = name.into();
128
87464
        let schema = Arc::new(DB::schematic()?);
129
87464
        let db = Self {
130
87464
            data: Arc::new(Data {
131
87464
                name: Arc::new(name),
132
87464
                context,
133
87464
                storage: storage.clone(),
134
87464
                schema,
135
87464
                effective_permissions: None,
136
87464
            }),
137
87464
        };
138
87464

            
139
87464
        if db.data.storage.check_view_integrity_on_database_open() {
140
16
            for view in db.data.schema.views() {
141
16
                db.data
142
16
                    .storage
143
16
                    .tasks()
144
16
                    .spawn_integrity_check(view, &db)
145
                    .await?;
146
            }
147
87460
        }
148

            
149
87464
        storage.tasks().spawn_key_value_expiration_loader(&db).await;
150

            
151
87464
        Ok(db)
152
87464
    }
153

            
154
    /// Returns a clone with `effective_permissions`. Replaces any previously applied permissions.
155
    ///
156
    /// # Unstable
157
    ///
158
    /// See [this issue](https://github.com/khonsulabs/bonsaidb/issues/68).
159
    #[doc(hidden)]
160
    #[must_use]
161
    pub fn with_effective_permissions(&self, effective_permissions: Permissions) -> Self {
162
        Self {
163
            data: Arc::new(Data {
164
                name: self.data.name.clone(),
165
                context: self.data.context.clone(),
166
                storage: self.data.storage.clone(),
167
                schema: self.data.schema.clone(),
168
                effective_permissions: Some(effective_permissions),
169
            }),
170
        }
171
    }
172

            
173
    /// Returns the name of the database.
174
    #[must_use]
175
1352
    pub fn name(&self) -> &str {
176
1352
        self.data.name.as_ref()
177
1352
    }
178

            
179
    /// Creates a `Storage` with a single-database named "default" with its data stored at `path`.
180
19
    pub async fn open<DB: Schema>(configuration: StorageConfiguration) -> Result<Self, Error> {
181
309
        let storage = Storage::open(configuration.with_schema::<DB>()?).await?;
182

            
183
32
        storage.create_database::<DB>("default", true).await?;
184

            
185
19
        Ok(storage.database::<DB>("default").await?)
186
19
    }
187

            
188
    /// Returns the [`Storage`] that this database belongs to.
189
    #[must_use]
190
2473089
    pub fn storage(&self) -> &'_ Storage {
191
2473089
        &self.data.storage
192
2473089
    }
193

            
194
    /// Returns the [`Schematic`] for the schema for this database.
195
    #[must_use]
196
2156736
    pub fn schematic(&self) -> &'_ Schematic {
197
2156736
        &self.data.schema
198
2156736
    }
199

            
200
1382292
    pub(crate) fn roots(&self) -> &'_ nebari::Roots<AnyFile> {
201
1382292
        &self.data.context.roots
202
1382292
    }
203

            
204
52804
    async fn for_each_in_view<
205
52804
        F: FnMut(ViewEntryCollection) -> Result<(), bonsaidb_core::Error> + Send + Sync,
206
52804
    >(
207
52804
        &self,
208
52804
        view: &dyn view::Serialized,
209
52804
        key: Option<QueryKey<Bytes>>,
210
52804
        order: Sort,
211
52804
        limit: Option<usize>,
212
52804
        access_policy: AccessPolicy,
213
52804
        mut callback: F,
214
52804
    ) -> Result<(), bonsaidb_core::Error> {
215
52804
        if matches!(access_policy, AccessPolicy::UpdateBefore) {
216
40892
            self.data
217
40892
                .storage
218
40892
                .tasks()
219
40892
                .update_view_if_needed(view, self)
220
35516
                .await?;
221
11912
        }
222

            
223
52804
        let view_entries = self
224
52804
            .roots()
225
52804
            .tree(self.collection_tree(
226
52804
                &view.collection(),
227
52804
                view_entries_tree_name(&view.view_name()),
228
52804
            )?)
229
52804
            .map_err(Error::from)?;
230

            
231
        {
232
52804
            for entry in Self::create_view_iterator(&view_entries, key, order, limit)? {
233
26977
                callback(entry)?;
234
            }
235
        }
236

            
237
52804
        if matches!(access_policy, AccessPolicy::UpdateAfter) {
238
5
            let db = self.clone();
239
5
            let view_name = view.view_name();
240
5
            tokio::task::spawn(async move {
241
5
                let view = db
242
5
                    .data
243
5
                    .schema
244
5
                    .view_by_name(&view_name)
245
5
                    .expect("query made with view that isn't registered with this database");
246
5
                db.data
247
5
                    .storage
248
5
                    .tasks()
249
10
                    .update_view_if_needed(view, &db)
250
10
                    .await
251
5
            });
252
52799
        }
253

            
254
52804
        Ok(())
255
52804
    }
256

            
257
36586
    async fn for_each_view_entry<
258
36586
        V: schema::View,
259
36586
        F: FnMut(ViewEntryCollection) -> Result<(), bonsaidb_core::Error> + Send + Sync,
260
36586
    >(
261
36586
        &self,
262
36586
        key: Option<QueryKey<V::Key>>,
263
36586
        order: Sort,
264
36586
        limit: Option<usize>,
265
36586
        access_policy: AccessPolicy,
266
36586
        callback: F,
267
36586
    ) -> Result<(), bonsaidb_core::Error> {
268
36586
        let view = self
269
36586
            .data
270
36586
            .schema
271
36586
            .view::<V>()
272
36586
            .expect("query made with view that isn't registered with this database");
273
36586

            
274
36586
        self.for_each_in_view(
275
36586
            view,
276
36586
            key.map(|key| key.serialized()).transpose()?,
277
36586
            order,
278
36586
            limit,
279
36586
            access_policy,
280
36586
            callback,
281
33232
        )
282
33232
        .await
283
36586
    }
284

            
285
    #[cfg(feature = "internal-apis")]
286
    #[doc(hidden)]
287
198864
    pub async fn internal_get_from_collection_id(
288
198864
        &self,
289
198864
        id: u64,
290
198864
        collection: &CollectionName,
291
198864
    ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error> {
292
8286
        self.get_from_collection_id(id, collection).await
293
8286
    }
294

            
295
    #[cfg(feature = "internal-apis")]
296
    #[doc(hidden)]
297
192
    pub async fn list_from_collection(
298
192
        &self,
299
192
        ids: Range<u64>,
300
192
        order: Sort,
301
192
        limit: Option<usize>,
302
192
        collection: &CollectionName,
303
192
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
304
8
        self.list(ids, order, limit, collection).await
305
8
    }
306

            
307
    #[cfg(feature = "internal-apis")]
308
    #[doc(hidden)]
309
96
    pub async fn internal_get_multiple_from_collection_id(
310
96
        &self,
311
96
        ids: &[u64],
312
96
        collection: &CollectionName,
313
96
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
314
4
        self.get_multiple_from_collection_id(ids, collection).await
315
4
    }
316

            
317
    #[cfg(feature = "internal-apis")]
318
    #[doc(hidden)]
319
48
    pub async fn compact_collection_by_name(
320
48
        &self,
321
48
        collection: CollectionName,
322
48
    ) -> Result<(), bonsaidb_core::Error> {
323
2
        self.storage()
324
2
            .tasks()
325
2
            .compact_collection(self.clone(), collection)
326
2
            .await?;
327
2
        Ok(())
328
2
    }
329

            
330
    #[cfg(feature = "internal-apis")]
331
    #[doc(hidden)]
332
96912
    pub async fn query_by_name(
333
96912
        &self,
334
96912
        view: &ViewName,
335
96912
        key: Option<QueryKey<Bytes>>,
336
96912
        order: Sort,
337
96912
        limit: Option<usize>,
338
96912
        access_policy: AccessPolicy,
339
96912
    ) -> Result<Vec<bonsaidb_core::schema::view::map::Serialized>, bonsaidb_core::Error> {
340
4038
        if let Some(view) = self.schematic().view_by_name(view) {
341
4038
            let mut results = Vec::new();
342
4038
            self.for_each_in_view(view, key, order, limit, access_policy, |collection| {
343
4036
                let entry = ViewEntry::from(collection);
344
8090
                for mapping in entry.mappings {
345
4054
                    results.push(bonsaidb_core::schema::view::map::Serialized {
346
4054
                        source: mapping.source,
347
4054
                        key: entry.key.clone(),
348
4054
                        value: mapping.value,
349
4054
                    });
350
4054
                }
351
4036
                Ok(())
352
4038
            })
353
1992
            .await?;
354

            
355
4038
            Ok(results)
356
        } else {
357
            Err(bonsaidb_core::Error::CollectionNotFound)
358
        }
359
4038
    }
360

            
361
    #[cfg(feature = "internal-apis")]
362
    #[doc(hidden)]
363
95712
    pub async fn query_by_name_with_docs(
364
95712
        &self,
365
95712
        view: &ViewName,
366
95712
        key: Option<QueryKey<Bytes>>,
367
95712
        order: Sort,
368
95712
        limit: Option<usize>,
369
95712
        access_policy: AccessPolicy,
370
95712
    ) -> Result<bonsaidb_core::schema::view::map::MappedSerializedDocuments, bonsaidb_core::Error>
371
95712
    {
372
3988
        let results = self
373
3988
            .query_by_name(view, key, order, limit, access_policy)
374
1954
            .await?;
375
3988
        let view = self.schematic().view_by_name(view).unwrap(); // query() will fail if it's not present
376

            
377
3988
        let documents = self
378
3988
            .get_multiple_from_collection_id(
379
3988
                &results.iter().map(|m| m.source.id).collect::<Vec<_>>(),
380
3988
                &view.collection(),
381
3988
            )
382
2080
            .await?
383
3988
            .into_iter()
384
3988
            .map(|doc| (doc.header.id, doc))
385
3988
            .collect::<BTreeMap<_, _>>();
386
3988

            
387
3988
        Ok(
388
3988
            bonsaidb_core::schema::view::map::MappedSerializedDocuments {
389
3988
                mappings: results,
390
3988
                documents,
391
3988
            },
392
3988
        )
393
3988
    }
394

            
395
    #[cfg(feature = "internal-apis")]
396
    #[doc(hidden)]
397
194328
    pub async fn reduce_by_name(
398
194328
        &self,
399
194328
        view: &ViewName,
400
194328
        key: Option<QueryKey<Bytes>>,
401
194328
        access_policy: AccessPolicy,
402
194328
    ) -> Result<Vec<u8>, bonsaidb_core::Error> {
403
8098
        self.reduce_in_view(view, key, access_policy).await
404
8097
    }
405

            
406
    #[cfg(feature = "internal-apis")]
407
    #[doc(hidden)]
408
144
    pub async fn reduce_grouped_by_name(
409
144
        &self,
410
144
        view: &ViewName,
411
144
        key: Option<QueryKey<Bytes>>,
412
144
        access_policy: AccessPolicy,
413
144
    ) -> Result<Vec<MappedSerializedValue>, bonsaidb_core::Error> {
414
7
        self.grouped_reduce_in_view(view, key, access_policy).await
415
6
    }
416

            
417
    #[cfg(feature = "internal-apis")]
418
    #[doc(hidden)]
419
48
    pub async fn delete_docs_by_name(
420
48
        &self,
421
48
        view: &ViewName,
422
48
        key: Option<QueryKey<Bytes>>,
423
48
        access_policy: AccessPolicy,
424
48
    ) -> Result<u64, bonsaidb_core::Error> {
425
2
        let view = self
426
2
            .data
427
2
            .schema
428
2
            .view_by_name(view)
429
2
            .ok_or(bonsaidb_core::Error::CollectionNotFound)?;
430
2
        let collection = view.collection();
431
2
        let mut transaction = Transaction::default();
432
2
        self.for_each_in_view(view, key, Sort::Ascending, None, access_policy, |entry| {
433
2
            let entry = ViewEntry::from(entry);
434

            
435
6
            for mapping in entry.mappings {
436
4
                transaction.push(Operation::delete(collection.clone(), mapping.source));
437
4
            }
438

            
439
2
            Ok(())
440
2
        })
441
2
        .await?;
442

            
443
2
        let results = Connection::apply_transaction(self, transaction).await?;
444

            
445
2
        Ok(results.len() as u64)
446
2
    }
447

            
448
274747
    async fn get_from_collection_id(
449
274747
        &self,
450
274747
        id: u64,
451
274747
        collection: &CollectionName,
452
274747
    ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error> {
453
11489
        let task_self = self.clone();
454
11489
        let collection = collection.clone();
455
11489
        tokio::task::spawn_blocking(move || {
456
11489
            let tree = task_self
457
11489
                .data
458
11489
                .context
459
11489
                .roots
460
11489
                .tree(task_self.collection_tree::<Versioned, _>(
461
11489
                    &collection,
462
11489
                    document_tree_name(&collection),
463
11489
                )?)
464
11489
                .map_err(Error::from)?;
465
10977
            if let Some(vec) = tree
466
                .get(
467
11489
                    &id.as_big_endian_bytes()
468
11489
                        .map_err(view::Error::key_serialization)?,
469
                )
470
11489
                .map_err(Error::from)?
471
            {
472
10977
                Ok(Some(deserialize_document(&vec)?.into_owned()))
473
            } else {
474
511
                Ok(None)
475
            }
476
11489
        })
477
6467
        .await
478
11489
        .unwrap()
479
11489
    }
480

            
481
148124
    async fn get_multiple_from_collection_id(
482
148124
        &self,
483
148124
        ids: &[u64],
484
148124
        collection: &CollectionName,
485
148124
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
486
6191
        let task_self = self.clone();
487
6197
        let ids = ids.iter().map(|id| id.to_be_bytes()).collect::<Vec<_>>();
488
6191
        let collection = collection.clone();
489
6191
        tokio::task::spawn_blocking(move || {
490
6191
            let tree = task_self
491
6191
                .data
492
6191
                .context
493
6191
                .roots
494
6191
                .tree(task_self.collection_tree::<Versioned, _>(
495
6191
                    &collection,
496
6191
                    document_tree_name(&collection),
497
6191
                )?)
498
6191
                .map_err(Error::from)?;
499
6197
            let mut ids = ids.iter().map(|id| &id[..]).collect::<Vec<_>>();
500
6191
            ids.sort();
501
6191
            let keys_and_values = tree.get_multiple(&ids).map_err(Error::from)?;
502

            
503
6191
            keys_and_values
504
6191
                .into_iter()
505
6197
                .map(|(_, value)| deserialize_document(&value).map(BorrowedDocument::into_owned))
506
6191
                .collect()
507
6191
        })
508
3327
        .await
509
6191
        .unwrap()
510
6191
    }
511

            
512
469
    pub(crate) async fn list(
513
469
        &self,
514
469
        ids: Range<u64>,
515
469
        sort: Sort,
516
469
        limit: Option<usize>,
517
469
        collection: &CollectionName,
518
469
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
519
32
        let task_self = self.clone();
520
32
        let collection = collection.clone();
521
32
        tokio::task::spawn_blocking(move || {
522
32
            let tree = task_self
523
32
                .data
524
32
                .context
525
32
                .roots
526
32
                .tree(task_self.collection_tree::<Versioned, _>(
527
32
                    &collection,
528
32
                    document_tree_name(&collection),
529
32
                )?)
530
32
                .map_err(Error::from)?;
531
32
            let mut found_docs = Vec::new();
532
32
            let mut keys_read = 0;
533
32
            let ids = U64Range::new(ids);
534
32
            tree.scan(
535
32
                &ids.borrow_as_bytes(),
536
32
                match sort {
537
27
                    Sort::Ascending => true,
538
5
                    Sort::Descending => false,
539
                },
540
                |_, _, _| true,
541
32
                |_, _| {
542
44
                    if let Some(limit) = limit {
543
10
                        if keys_read >= limit {
544
5
                            return KeyEvaluation::Stop;
545
5
                        }
546
5

            
547
5
                        keys_read += 1;
548
34
                    }
549
39
                    KeyEvaluation::ReadData
550
44
                },
551
32
                |_, _, doc| {
552
                    found_docs.push(
553
39
                        deserialize_document(&doc)
554
39
                            .map(BorrowedDocument::into_owned)
555
39
                            .map_err(AbortError::Other)?,
556
                    );
557
39
                    Ok(())
558
39
                },
559
            )
560
32
            .map_err(|err| match err {
561
                AbortError::Other(err) => err,
562
                AbortError::Nebari(err) => bonsaidb_core::Error::from(crate::Error::from(err)),
563
32
            })
564
32
            .unwrap();
565
32

            
566
32
            Ok(found_docs)
567
32
        })
568
32
        .await
569
32
        .unwrap()
570
32
    }
571

            
572
291754
    async fn reduce_in_view(
573
291754
        &self,
574
291754
        view_name: &ViewName,
575
291754
        key: Option<QueryKey<Bytes>>,
576
291754
        access_policy: AccessPolicy,
577
291754
    ) -> Result<Vec<u8>, bonsaidb_core::Error> {
578
12166
        let view = self
579
12166
            .data
580
12166
            .schema
581
12166
            .view_by_name(view_name)
582
12166
            .ok_or(bonsaidb_core::Error::CollectionNotFound)?;
583
12166
        let mut mappings = self
584
12168
            .grouped_reduce_in_view(view_name, key, access_policy)
585
282
            .await?;
586

            
587
12166
        let result = if mappings.len() == 1 {
588
7520
            mappings.pop().unwrap().value.into_vec()
589
        } else {
590
4646
            view.reduce(
591
4646
                &mappings
592
4646
                    .iter()
593
4783
                    .map(|map| (map.key.as_ref(), map.value.as_ref()))
594
4646
                    .collect::<Vec<_>>(),
595
4646
                true,
596
4646
            )
597
4646
            .map_err(Error::from)?
598
        };
599

            
600
12161
        Ok(result)
601
12166
    }
602

            
603
291950
    async fn grouped_reduce_in_view(
604
291950
        &self,
605
291950
        view_name: &ViewName,
606
291950
        key: Option<QueryKey<Bytes>>,
607
291950
        access_policy: AccessPolicy,
608
291950
    ) -> Result<Vec<MappedSerializedValue>, bonsaidb_core::Error> {
609
12178
        let view = self
610
12178
            .data
611
12178
            .schema
612
12178
            .view_by_name(view_name)
613
12178
            .ok_or(bonsaidb_core::Error::CollectionNotFound)?;
614
12178
        let mut mappings = Vec::new();
615
12329
        self.for_each_in_view(view, key, Sort::Ascending, None, access_policy, |entry| {
616
7694
            let entry = ViewEntry::from(entry);
617
7694
            mappings.push(MappedSerializedValue {
618
7694
                key: entry.key,
619
7694
                value: entry.reduced_value,
620
7694
            });
621
7694
            Ok(())
622
12331
        })
623
290
        .await?;
624

            
625
12178
        Ok(mappings)
626
12178
    }
627

            
628
339184
    fn apply_transaction_to_roots(
629
339184
        &self,
630
339184
        transaction: &Transaction,
631
339184
    ) -> Result<Vec<OperationResult>, Error> {
632
339184
        let mut open_trees = OpenTrees::default();
633
842167
        for op in &transaction.operations {
634
503057
            if !self.data.schema.contains_collection_id(&op.collection) {
635
74
                return Err(Error::Core(bonsaidb_core::Error::CollectionNotFound));
636
502983
            }
637
502983

            
638
502983
            open_trees.open_trees_for_document_change(
639
502983
                &op.collection,
640
502983
                &self.data.schema,
641
502983
                self.collection_encryption_key(&op.collection),
642
502983
                #[cfg(feature = "encryption")]
643
502983
                self.storage().vault(),
644
502983
            )?;
645
        }
646

            
647
339110
        let mut roots_transaction = self
648
339110
            .data
649
339110
            .context
650
339110
            .roots
651
339110
            .transaction::<_, dyn AnyTreeRoot<AnyFile>>(&open_trees.trees)?;
652

            
653
339110
        let mut results = Vec::new();
654
339110
        let mut changed_documents = Vec::new();
655
829502
        for op in &transaction.operations {
656
502983
            let result = self.execute_operation(
657
502983
                op,
658
502983
                &mut roots_transaction,
659
502983
                &open_trees.trees_index_by_name,
660
502983
            )?;
661

            
662
490392
            match &result {
663
458366
                OperationResult::DocumentUpdated { header, collection } => {
664
458366
                    changed_documents.push(ChangedDocument {
665
458366
                        collection: collection.clone(),
666
458366
                        id: header.id,
667
458366
                        deleted: false,
668
458366
                    });
669
458366
                }
670
32026
                OperationResult::DocumentDeleted { id, collection } => {
671
32026
                    changed_documents.push(ChangedDocument {
672
32026
                        collection: collection.clone(),
673
32026
                        id: *id,
674
32026
                        deleted: true,
675
32026
                    });
676
32026
                }
677
                OperationResult::Success => {}
678
            }
679
490392
            results.push(result);
680
        }
681

            
682
        // Insert invalidations for each record changed
683
327647
        for (collection, changed_documents) in &changed_documents
684
326519
            .iter()
685
490392
            .group_by(|doc| doc.collection.clone())
686
        {
687
327646
            if let Some(views) = self.data.schema.views_in_collection(&collection) {
688
206421
                let changed_documents = changed_documents.collect::<Vec<_>>();
689
896966
                for view in views {
690
690545
                    if !view.unique() {
691
656211
                        let view_name = view.view_name();
692
1377326
                        for changed_document in &changed_documents {
693
721115
                            let invalidated_docs = roots_transaction
694
721115
                                .tree::<Unversioned>(
695
721115
                                    open_trees.trees_index_by_name
696
721115
                                        [&view_invalidated_docs_tree_name(&view_name)],
697
721115
                                )
698
721115
                                .unwrap();
699
721115
                            invalidated_docs.set(
700
721115
                                changed_document.id.as_big_endian_bytes().unwrap().to_vec(),
701
721115
                                b"",
702
721115
                            )?;
703
                        }
704
34334
                    }
705
                }
706
121225
            }
707
        }
708

            
709
326519
        roots_transaction
710
326519
            .entry_mut()
711
326519
            .set_data(pot::to_vec(&Changes::Documents(changed_documents))?)?;
712

            
713
326519
        roots_transaction.commit()?;
714

            
715
326519
        Ok(results)
716
339184
    }
717

            
718
502983
    fn execute_operation(
719
502983
        &self,
720
502983
        operation: &Operation,
721
502983
        transaction: &mut ExecutingTransaction<AnyFile>,
722
502983
        tree_index_map: &HashMap<String, usize>,
723
502983
    ) -> Result<OperationResult, Error> {
724
502983
        match &operation.command {
725
370241
            Command::Insert { id, contents } => self.execute_insert(
726
370241
                operation,
727
370241
                transaction,
728
370241
                tree_index_map,
729
370241
                *id,
730
370241
                contents.to_vec(),
731
370241
            ),
732
100716
            Command::Update { header, contents } => self.execute_update(
733
100716
                operation,
734
100716
                transaction,
735
100716
                tree_index_map,
736
100716
                header,
737
100716
                contents.to_vec(),
738
100716
            ),
739
32026
            Command::Delete { header } => {
740
32026
                self.execute_delete(operation, transaction, tree_index_map, header)
741
            }
742
        }
743
502983
    }
744

            
745
100716
    fn execute_update(
746
100716
        &self,
747
100716
        operation: &Operation,
748
100716
        transaction: &mut ExecutingTransaction<AnyFile>,
749
100716
        tree_index_map: &HashMap<String, usize>,
750
100716
        header: &Header,
751
100716
        contents: Vec<u8>,
752
100716
    ) -> Result<OperationResult, crate::Error> {
753
100716
        let documents = transaction
754
100716
            .tree::<Versioned>(tree_index_map[&document_tree_name(&operation.collection)])
755
100716
            .unwrap();
756
100716
        let document_id = header.id.as_big_endian_bytes().unwrap();
757
        // TODO switch to compare_swap
758

            
759
100716
        if let Some(vec) = documents.get(document_id.as_ref())? {
760
100642
            let doc = deserialize_document(&vec)?;
761
100642
            if doc.header.revision == header.revision {
762
100494
                if let Some(updated_doc) = doc.create_new_revision(contents) {
763
                    documents.set(
764
100420
                        updated_doc
765
100420
                            .header
766
100420
                            .id
767
100420
                            .as_big_endian_bytes()
768
100420
                            .unwrap()
769
100420
                            .as_ref()
770
100420
                            .to_vec(),
771
100420
                        serialize_document(&updated_doc)?,
772
                    )?;
773

            
774
100420
                    self.update_unique_views(&document_id, operation, transaction, tree_index_map)?;
775

            
776
100272
                    Ok(OperationResult::DocumentUpdated {
777
100272
                        collection: operation.collection.clone(),
778
100272
                        header: updated_doc.header,
779
100272
                    })
780
                } else {
781
                    // If no new revision was made, it means an attempt to
782
                    // save a document with the same contents was made.
783
                    // We'll return a success but not actually give a new
784
                    // version
785
74
                    Ok(OperationResult::DocumentUpdated {
786
74
                        collection: operation.collection.clone(),
787
74
                        header: doc.header,
788
74
                    })
789
                }
790
            } else {
791
148
                Err(Error::Core(bonsaidb_core::Error::DocumentConflict(
792
148
                    operation.collection.clone(),
793
148
                    header.id,
794
148
                )))
795
            }
796
        } else {
797
74
            Err(Error::Core(bonsaidb_core::Error::DocumentNotFound(
798
74
                operation.collection.clone(),
799
74
                header.id,
800
74
            )))
801
        }
802
100716
    }
803

            
804
370241
    fn execute_insert(
805
370241
        &self,
806
370241
        operation: &Operation,
807
370241
        transaction: &mut ExecutingTransaction<AnyFile>,
808
370241
        tree_index_map: &HashMap<String, usize>,
809
370241
        id: Option<u64>,
810
370241
        contents: Vec<u8>,
811
370241
    ) -> Result<OperationResult, Error> {
812
370241
        let documents = transaction
813
370241
            .tree::<Versioned>(tree_index_map[&document_tree_name(&operation.collection)])
814
370241
            .unwrap();
815
370241
        let id = if let Some(id) = id {
816
168560
            id
817
        } else {
818
201681
            let last_key = documents
819
201681
                .last_key()?
820
201681
                .map(|bytes| BigEndian::read_u64(&bytes))
821
201681
                .unwrap_or_default();
822
201681
            last_key + 1
823
        };
824

            
825
370241
        let doc = BorrowedDocument::new(id, contents);
826
370241
        let serialized: Vec<u8> = serialize_document(&doc)?;
827
370241
        let document_id = ArcBytes::from(doc.header.id.as_big_endian_bytes().unwrap().to_vec());
828
370241
        if documents
829
370241
            .replace(document_id.clone(), serialized)?
830
370241
            .is_some()
831
        {
832
12099
            Err(Error::Core(bonsaidb_core::Error::DocumentConflict(
833
12099
                operation.collection.clone(),
834
12099
                id,
835
12099
            )))
836
        } else {
837
358142
            self.update_unique_views(&document_id, operation, transaction, tree_index_map)?;
838

            
839
358020
            Ok(OperationResult::DocumentUpdated {
840
358020
                collection: operation.collection.clone(),
841
358020
                header: doc.header,
842
358020
            })
843
        }
844
370241
    }
845

            
846
32026
    fn execute_delete(
847
32026
        &self,
848
32026
        operation: &Operation,
849
32026
        transaction: &mut ExecutingTransaction<AnyFile>,
850
32026
        tree_index_map: &HashMap<String, usize>,
851
32026
        header: &Header,
852
32026
    ) -> Result<OperationResult, Error> {
853
32026
        let documents = transaction
854
32026
            .tree::<Versioned>(tree_index_map[&document_tree_name(&operation.collection)])
855
32026
            .unwrap();
856
32026
        let document_id = header.id.as_big_endian_bytes().unwrap();
857
32026
        if let Some(vec) = documents.remove(&document_id)? {
858
32026
            let doc = deserialize_document(&vec)?;
859
32026
            if &doc.header == header {
860
32026
                self.update_unique_views(
861
32026
                    document_id.as_ref(),
862
32026
                    operation,
863
32026
                    transaction,
864
32026
                    tree_index_map,
865
32026
                )?;
866

            
867
32026
                Ok(OperationResult::DocumentDeleted {
868
32026
                    collection: operation.collection.clone(),
869
32026
                    id: header.id,
870
32026
                })
871
            } else {
872
                Err(Error::Core(bonsaidb_core::Error::DocumentConflict(
873
                    operation.collection.clone(),
874
                    header.id,
875
                )))
876
            }
877
        } else {
878
            Err(Error::Core(bonsaidb_core::Error::DocumentNotFound(
879
                operation.collection.clone(),
880
                header.id,
881
            )))
882
        }
883
32026
    }
884

            
885
    fn update_unique_views(
886
        &self,
887
        document_id: &[u8],
888
        operation: &Operation,
889
        transaction: &mut ExecutingTransaction<AnyFile>,
890
        tree_index_map: &HashMap<String, usize>,
891
    ) -> Result<(), Error> {
892
490588
        if let Some(unique_views) = self
893
490588
            .data
894
490588
            .schema
895
490588
            .unique_views_in_collection(&operation.collection)
896
        {
897
134890
            for view in unique_views {
898
67580
                let name = view.view_name();
899
67580
                mapper::DocumentRequest {
900
67580
                    database: self,
901
67580
                    document_id,
902
67580
                    map_request: &mapper::Map {
903
67580
                        database: self.data.name.clone(),
904
67580
                        collection: operation.collection.clone(),
905
67580
                        view_name: name.clone(),
906
67580
                    },
907
67580
                    transaction,
908
67580
                    document_map_index: tree_index_map[&view_document_map_tree_name(&name)],
909
67580
                    documents_index: tree_index_map[&document_tree_name(&operation.collection)],
910
67580
                    omitted_entries_index: tree_index_map[&view_omitted_docs_tree_name(&name)],
911
67580
                    view_entries_index: tree_index_map[&view_entries_tree_name(&name)],
912
67580
                    view,
913
67580
                }
914
67580
                .map()?;
915
            }
916
423008
        }
917

            
918
490318
        Ok(())
919
490588
    }
920

            
921
478764
    fn create_view_iterator<'a, K: for<'k> Key<'k> + 'a>(
922
478764
        view_entries: &'a Tree<Unversioned, AnyFile>,
923
478764
        key: Option<QueryKey<K>>,
924
478764
        order: Sort,
925
478764
        limit: Option<usize>,
926
478764
    ) -> Result<Vec<ViewEntryCollection>, Error> {
927
478764
        let mut values = Vec::new();
928
478764
        let forwards = match order {
929
478690
            Sort::Ascending => true,
930
74
            Sort::Descending => false,
931
        };
932
478764
        let mut values_read = 0;
933
478764
        if let Some(key) = key {
934
475823
            match key {
935
220
                QueryKey::Range(range) => {
936
220
                    let range = range
937
220
                        .as_big_endian_bytes()
938
220
                        .map_err(view::Error::key_serialization)?;
939
220
                    view_entries.scan::<Infallible, _, _, _, _>(
940
440
                        &range.map_ref(|bytes| &bytes[..]),
941
220
                        forwards,
942
508
                        |_, _, _| true,
943
220
                        |_, _| {
944
1256
                            if let Some(limit) = limit {
945
148
                                if values_read >= limit {
946
74
                                    return KeyEvaluation::Stop;
947
74
                                }
948
74
                                values_read += 1;
949
1108
                            }
950
1182
                            KeyEvaluation::ReadData
951
1256
                        },
952
1182
                        |_key, _index, value| {
953
1182
                            values.push(value);
954
1182
                            Ok(())
955
1182
                        },
956
220
                    )?;
957
                }
958
475431
                QueryKey::Matches(key) => {
959
475431
                    let key = key
960
475431
                        .as_big_endian_bytes()
961
475431
                        .map_err(view::Error::key_serialization)?
962
475431
                        .to_vec();
963
475431

            
964
475431
                    values.extend(view_entries.get(&key)?);
965
                }
966
172
                QueryKey::Multiple(list) => {
967
172
                    let mut list = list
968
172
                        .into_iter()
969
344
                        .map(|key| {
970
344
                            key.as_big_endian_bytes()
971
344
                                .map(|bytes| bytes.to_vec())
972
344
                                .map_err(view::Error::key_serialization)
973
344
                        })
974
172
                        .collect::<Result<Vec<_>, _>>()?;
975

            
976
172
                    list.sort();
977
172

            
978
172
                    values.extend(
979
172
                        view_entries
980
172
                            .get_multiple(&list.iter().map(Vec::as_slice).collect::<Vec<_>>())?
981
172
                            .into_iter()
982
344
                            .map(|(_, value)| value),
983
                    );
984
                }
985
            }
986
        } else {
987
2941
            view_entries.scan::<Infallible, _, _, _, _>(
988
2941
                &(..),
989
2941
                forwards,
990
2941
                |_, _, _| true,
991
2941
                |_, _| {
992
4061
                    if let Some(limit) = limit {
993
                        if values_read >= limit {
994
                            return KeyEvaluation::Stop;
995
                        }
996
                        values_read += 1;
997
4061
                    }
998
4061
                    KeyEvaluation::ReadData
999
4117
                },
4117
                |_, _, value| {
4061
                    values.push(value);
4061
                    Ok(())
4117
                },
2941
            )?;
        }

            
478764
        values
478764
            .into_iter()
478764
            .map(|value| bincode::deserialize(&value).map_err(Error::from))
478764
            .collect::<Result<Vec<_>, Error>>()
478764
    }

            
1963740
    pub(crate) fn collection_encryption_key(&self, collection: &CollectionName) -> Option<&KeyId> {
1963740
        self.schematic()
1963740
            .encryption_key_for_collection(collection)
1963740
            .or_else(|| self.storage().default_encryption_key())
1963740
    }

            
    #[cfg_attr(
        not(feature = "encryption"),
        allow(unused_mut, unused_variables, clippy::let_and_return)
    )]
    #[cfg_attr(feature = "encryption", allow(clippy::unnecessary_wraps))]
1460781
    pub(crate) fn collection_tree<R: Root, S: Into<Cow<'static, str>>>(
1460781
        &self,
1460781
        collection: &CollectionName,
1460781
        name: S,
1460781
    ) -> Result<TreeRoot<R, AnyFile>, Error> {
1460781
        let mut tree = R::tree(name);

            
1460781
        if let Some(key) = self.collection_encryption_key(collection) {
            #[cfg(feature = "encryption")]
39439
            {
39439
                tree = tree.with_vault(TreeVault {
39439
                    key: key.clone(),
39439
                    vault: self.storage().vault().clone(),
39439
                });
39439
            }

            
            #[cfg(not(feature = "encryption"))]
            {
                return Err(Error::EncryptionDisabled);
            }
1421342
        }

            
1460781
        Ok(tree)
1460781
    }

            
1
    pub(crate) async fn update_key_expiration_async<'key>(
1
        &self,
1
        tree_key: impl Into<Cow<'key, str>>,
1
        expiration: Option<Timestamp>,
1
    ) {
1
        self.data
1
            .context
1
            .update_key_expiration_async(tree_key, expiration)
            .await;
1
    }
}

            
644346
pub(crate) fn deserialize_document(
644346
    bytes: &[u8],
644346
) -> Result<BorrowedDocument<'_>, bonsaidb_core::Error> {
644346
    let document = bincode::deserialize::<BorrowedDocument<'_>>(bytes).map_err(Error::from)?;
644346
    Ok(document)
644346
}

            
470661
fn serialize_document(document: &BorrowedDocument<'_>) -> Result<Vec<u8>, bonsaidb_core::Error> {
470661
    bincode::serialize(document)
470661
        .map_err(Error::from)
470661
        .map_err(bonsaidb_core::Error::from)
470661
}

            
#[async_trait]
impl Connection for Database {
1017552
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(transaction)))]
    async fn apply_transaction(
        &self,
        transaction: Transaction,
339184
    ) -> Result<Vec<OperationResult>, bonsaidb_core::Error> {
339184
        let task_self = self.clone();
339184
        let mut unique_view_tasks = Vec::new();
340312
        for collection_name in transaction
339184
            .operations
339184
            .iter()
503057
            .map(|op| &op.collection)
339184
            .collect::<HashSet<_>>()
        {
340311
            if let Some(views) = self.data.schema.views_in_collection(collection_name) {
959111
                for view in views {
740099
                    if view.unique() {
34604
                        if let Some(task) = self
34604
                            .data
34604
                            .storage
34604
                            .tasks()
34604
                            .spawn_integrity_check(view, self)
                            .await?
682
                        {
682
                            unique_view_tasks.push(task);
33922
                        }
705495
                    }
                }
121299
            }
        }
339866
        for task in unique_view_tasks {
682
            task.receive()
634
                .await
682
                .map_err(Error::from)?
682
                .map_err(Error::from)?;
        }

            
339184
        tokio::task::spawn_blocking(move || task_self.apply_transaction_to_roots(&transaction))
285472
            .await
339184
            .map_err(|err| bonsaidb_core::Error::Database(err.to_string()))?
339184
            .map_err(bonsaidb_core::Error::from)
678368
    }

            
9609
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(id)))]
    async fn get<C: schema::Collection>(
        &self,
        id: u64,
3203
    ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error> {
3203
        self.get_from_collection_id(id, &C::collection_name()).await
6406
    }

            
6597
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(ids)))]
    async fn get_multiple<C: schema::Collection>(
        &self,
        ids: &[u64],
2199
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
2199
        self.get_multiple_from_collection_id(ids, &C::collection_name())
1243
            .await
4398
    }

            
36
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(ids, order, limit)))]
    async fn list<C: schema::Collection, R: Into<Range<u64>> + Send>(
        &self,
        ids: R,
        order: Sort,
        limit: Option<usize>,
12
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
12
        self.list(ids.into(), order, limit, &C::collection_name())
12
            .await
24
    }

            
    #[cfg_attr(
        feature = "tracing",
76140
        tracing::instrument(skip(starting_id, result_limit))
    )]
    async fn list_executed_transactions(
        &self,
        starting_id: Option<u64>,
        result_limit: Option<usize>,
25380
    ) -> Result<Vec<transaction::Executed>, bonsaidb_core::Error> {
25380
        let result_limit = result_limit
25380
            .unwrap_or(LIST_TRANSACTIONS_DEFAULT_RESULT_COUNT)
25380
            .min(LIST_TRANSACTIONS_MAX_RESULTS);
25380
        if result_limit > 0 {
25306
            let task_self = self.clone();
25306
            tokio::task::spawn_blocking::<_, Result<Vec<transaction::Executed>, Error>>(move || {
25306
                let range = if let Some(starting_id) = starting_id {
12912
                    Range::from(starting_id..)
                } else {
12394
                    Range::from(..)
                };

            
25306
                let mut entries = Vec::new();
192286
                task_self.roots().transactions().scan(range, |entry| {
192286
                    entries.push(entry);
192286
                    entries.len() < result_limit
192286
                })?;

            
25306
                entries
25306
                    .into_iter()
25306
                    .map(|entry| {
192286
                        if let Some(data) = entry.data() {
192286
                            let changes = match pot::from_slice(data) {
192286
                                Ok(changes) => changes,
                                Err(pot::Error::NotAPot) => {
                                    Changes::Documents(bincode::deserialize(entry.data().unwrap())?)
                                }
                                other => other?,
                            };
192286
                            Ok(Some(transaction::Executed {
192286
                                id: entry.id,
192286
                                changes,
192286
                            }))
                        } else {
                            Ok(None)
                        }
192286
                    })
25306
                    .filter_map(Result::transpose)
25306
                    .collect::<Result<Vec<_>, Error>>()
25306
            })
19546
            .await
25306
            .unwrap()
25306
            .map_err(bonsaidb_core::Error::from)
        } else {
            // A request was made to return an empty result? This should probably be
            // an error, but technically this is a correct response.
74
            Ok(Vec::default())
        }
50760
    }

            
    #[cfg_attr(
        feature = "tracing",
109749
        tracing::instrument(skip(key, order, limit, access_policy))
    )]
    #[must_use]
    async fn query<V: schema::SerializedView>(
        &self,
        key: Option<QueryKey<V::Key>>,
        order: Sort,
        limit: Option<usize>,
        access_policy: AccessPolicy,
    ) -> Result<Vec<Map<V::Key, V::Value>>, bonsaidb_core::Error>
    where
        Self: Sized,
36583
    {
36583
        let mut results = Vec::new();
36583
        self.for_each_view_entry::<V, _>(key, order, limit, access_policy, |collection| {
15242
            let entry = ViewEntry::from(collection);
15242
            let key = <V::Key as Key>::from_big_endian_bytes(&entry.key)
15242
                .map_err(view::Error::key_serialization)
15242
                .map_err(Error::from)?;
30517
            for entry in entry.mappings {
                results.push(Map::new(
15275
                    entry.source,
15275
                    key.clone(),
15275
                    V::deserialize(&entry.value)?,
                ));
            }
15242
            Ok(())
36583
        })
33229
        .await?;

            
36583
        Ok(results)
73166
    }

            
    #[cfg_attr(
        feature = "tracing",
6564
        tracing::instrument(skip(key, order, limit, access_policy))
    )]
    async fn query_with_docs<V: schema::SerializedView>(
        &self,
        key: Option<QueryKey<V::Key>>,
        order: Sort,
        limit: Option<usize>,
        access_policy: AccessPolicy,
    ) -> Result<MappedDocuments<OwnedDocument, V>, bonsaidb_core::Error>
    where
        Self: Sized,
2188
    {
2190
        let results = Connection::query::<V>(self, key, order, limit, access_policy).await?;

            
2188
        let documents = self
2192
            .get_multiple::<V::Collection>(&results.iter().map(|m| m.source.id).collect::<Vec<_>>())
1232
            .await?
2188
            .into_iter()
2192
            .map(|doc| (doc.header.id, doc))
2188
            .collect::<BTreeMap<u64, _>>();
2188

            
2188
        Ok(MappedDocuments {
2188
            mappings: results,
2188
            documents,
2188
        })
4376
    }

            
12207
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(key, access_policy)))]
    async fn reduce<V: schema::SerializedView>(
        &self,
        key: Option<QueryKey<V::Key>>,
        access_policy: AccessPolicy,
    ) -> Result<V::Value, bonsaidb_core::Error>
    where
        Self: Sized,
4069
    {
4069
        let view = self
4069
            .data
4069
            .schema
4069
            .view::<V>()
4069
            .expect("query made with view that isn't registered with this database");

            
4066
        let result = self
            .reduce_in_view(
4069
                &view.view_name(),
4069
                key.map(|key| key.serialized()).transpose()?,
4069
                access_policy,
103
            )
103
            .await?;
4066
        let value = V::deserialize(&result)?;

            
4066
        Ok(value)
8138
    }

            
18
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(key, access_policy)))]
    async fn reduce_grouped<V: schema::SerializedView>(
        &self,
        key: Option<QueryKey<V::Key>>,
        access_policy: AccessPolicy,
    ) -> Result<Vec<MappedValue<V::Key, V::Value>>, bonsaidb_core::Error>
    where
        Self: Sized,
6
    {
6
        let view = self
6
            .data
6
            .schema
6
            .view::<V>()
6
            .expect("query made with view that isn't registered with this database");

            
6
        let results = self
            .grouped_reduce_in_view(
6
                &view.view_name(),
6
                key.map(|key| key.serialized()).transpose()?,
6
                access_policy,
3
            )
3
            .await?;
6
        results
6
            .into_iter()
9
            .map(|map| {
9
                Ok(MappedValue::new(
9
                    V::Key::from_big_endian_bytes(&map.key)
9
                        .map_err(view::Error::key_serialization)?,
9
                    V::deserialize(&map.value)?,
                ))
9
            })
6
            .collect::<Result<Vec<_>, bonsaidb_core::Error>>()
12
    }

            
3
    async fn delete_docs<V: schema::SerializedView>(
3
        &self,
3
        key: Option<QueryKey<V::Key>>,
3
        access_policy: AccessPolicy,
3
    ) -> Result<u64, bonsaidb_core::Error>
3
    where
3
        Self: Sized,
3
    {
3
        let collection = <V::Collection as Collection>::collection_name();
3
        let mut transaction = Transaction::default();
3
        self.for_each_view_entry::<V, _>(
3
            key,
3
            Sort::Ascending,
3
            None,
3
            access_policy,
3
            |entry_collection| {
3
                let entry = ViewEntry::from(entry_collection);

            
9
                for mapping in entry.mappings {
6
                    transaction.push(Operation::delete(collection.clone(), mapping.source));
6
                }

            
3
                Ok(())
3
            },
3
        )
3
        .await?;

            
3
        let results = Connection::apply_transaction(self, transaction).await?;

            
3
        Ok(results.len() as u64)
6
    }

            
897543
    #[cfg_attr(feature = "tracing", tracing::instrument)]
299181
    async fn last_transaction_id(&self) -> Result<Option<u64>, bonsaidb_core::Error> {
299181
        Ok(self.roots().transactions().current_transaction_id())
299181
    }

            
9
    #[cfg_attr(feature = "tracing", tracing::instrument)]
3
    async fn compact_collection<C: schema::Collection>(&self) -> Result<(), bonsaidb_core::Error> {
3
        self.storage()
3
            .tasks()
3
            .compact_collection(self.clone(), C::collection_name())
3
            .await?;
3
        Ok(())
6
    }

            
222
    #[cfg_attr(feature = "tracing", tracing::instrument)]
74
    async fn compact(&self) -> Result<(), bonsaidb_core::Error> {
74
        self.storage()
74
            .tasks()
74
            .compact_database(self.clone())
74
            .await?;
74
        Ok(())
148
    }

            
222
    #[cfg_attr(feature = "tracing", tracing::instrument)]
74
    async fn compact_key_value_store(&self) -> Result<(), bonsaidb_core::Error> {
74
        self.storage()
74
            .tasks()
74
            .compact_key_value_store(self.clone())
74
            .await?;
74
        Ok(())
148
    }
}

            
type ViewIterator<'a> =
    Box<dyn Iterator<Item = Result<(ArcBytes<'static>, ArcBytes<'static>), Error>> + 'a>;

            
struct ViewEntryCollectionIterator<'a> {
    iterator: ViewIterator<'a>,
}

            
impl<'a> Iterator for ViewEntryCollectionIterator<'a> {
    type Item = Result<ViewEntryCollection, crate::Error>;

            
    fn next(&mut self) -> Option<Self::Item> {
        self.iterator.next().map(|item| {
            item.map_err(crate::Error::from)
                .and_then(|(_, value)| bincode::deserialize(&value).map_err(Error::from))
        })
    }
}

            
1279393
#[derive(Debug, Clone)]
pub(crate) struct Context {
    data: Arc<ContextData>,
}

            
impl Deref for Context {
    type Target = ContextData;

            
2152457
    fn deref(&self) -> &Self::Target {
2152457
        &self.data
2152457
    }
}

            
#[derive(Debug)]
pub(crate) struct ContextData {
    pub(crate) roots: Roots<AnyFile>,
    key_value_state: Arc<Mutex<keyvalue::KeyValueState>>,
    runtime: tokio::runtime::Handle,
}

            
impl Borrow<Roots<AnyFile>> for Context {
    fn borrow(&self) -> &Roots<AnyFile> {
        &self.data.roots
    }
}

            
impl Context {
19441
    pub(crate) fn new(roots: Roots<AnyFile>, key_value_persistence: KeyValuePersistence) -> Self {
19441
        let (background_sender, background_receiver) = watch::channel(None);
19441
        let key_value_state = Arc::new(Mutex::new(keyvalue::KeyValueState::new(
19441
            key_value_persistence,
19441
            roots.clone(),
19441
            background_sender,
19441
        )));
19441
        let context = Self {
19441
            data: Arc::new(ContextData {
19441
                roots,
19441
                key_value_state: key_value_state.clone(),
19441
                runtime: tokio::runtime::Handle::current(),
19441
            }),
19441
        };
19441
        tokio::task::spawn(keyvalue::background_worker(
19441
            key_value_state,
19441
            background_receiver,
19441
        ));
19441
        context
19441
    }

            
748422
    pub(crate) async fn perform_kv_operation(
748422
        &self,
748422
        op: KeyOperation,
748422
    ) -> Result<Output, bonsaidb_core::Error> {
748421
        let mut state = fast_async_lock!(self.data.key_value_state);
748421
        state
748421
            .perform_kv_operation(op, &self.data.key_value_state)
            .await
748422
    }

            
11
    pub(crate) async fn update_key_expiration_async<'key>(
11
        &self,
11
        tree_key: impl Into<Cow<'key, str>>,
11
        expiration: Option<Timestamp>,
11
    ) {
11
        let mut state = fast_async_lock!(self.data.key_value_state);
11
        state.update_key_expiration(tree_key, expiration);
11
    }
}

            
impl Drop for ContextData {
17545
    fn drop(&mut self) {
17545
        let key_value_state = self.key_value_state.clone();
17545
        self.runtime.spawn(async move {
13543
            let mut state = fast_async_lock!(key_value_state);
13543
            state.shutdown(&key_value_state).await
17545
        });
17545
    }
}

            
1610358
pub fn document_tree_name(collection: &CollectionName) -> String {
1610358
    format!("collection.{:#}", collection)
1610358
}