1
use std::{
2
    borrow::{Borrow, Cow},
3
    collections::{BTreeMap, HashMap, HashSet},
4
    convert::Infallible,
5
    ops::Deref,
6
    sync::Arc,
7
    u8,
8
};
9

            
10
use async_lock::Mutex;
11
use async_trait::async_trait;
12
use bonsaidb_core::{
13
    arc_bytes::{serde::Bytes, ArcBytes},
14
    connection::{AccessPolicy, Connection, QueryKey, Range, Sort, StorageConnection},
15
    document::{BorrowedDocument, Document, Header, KeyId, OwnedDocument},
16
    keyvalue::{KeyOperation, Output, Timestamp},
17
    limits::{LIST_TRANSACTIONS_DEFAULT_RESULT_COUNT, LIST_TRANSACTIONS_MAX_RESULTS},
18
    permissions::Permissions,
19
    schema::{
20
        self,
21
        view::{
22
            self,
23
            map::{MappedDocuments, MappedSerializedValue},
24
        },
25
        Collection, CollectionName, Key, Map, MappedValue, Schema, Schematic, ViewName,
26
    },
27
    transaction::{
28
        self, ChangedDocument, Changes, Command, Operation, OperationResult, Transaction,
29
    },
30
};
31
use bonsaidb_utils::fast_async_lock;
32
use byteorder::{BigEndian, ByteOrder};
33
use itertools::Itertools;
34
use nebari::{
35
    io::any::AnyFile,
36
    tree::{
37
        AnyTreeRoot, BorrowByteRange, KeyEvaluation, Root, TreeRoot, U64Range, Unversioned,
38
        Versioned,
39
    },
40
    AbortError, ExecutingTransaction, Roots, Tree,
41
};
42
use tokio::sync::watch;
43

            
44
#[cfg(feature = "encryption")]
45
use crate::vault::TreeVault;
46
use crate::{
47
    config::{Builder, KeyValuePersistence, StorageConfiguration},
48
    database::keyvalue::BackgroundWorkerProcessTarget,
49
    error::Error,
50
    open_trees::OpenTrees,
51
    views::{
52
        mapper::{self, ViewEntryCollection},
53
        view_document_map_tree_name, view_entries_tree_name, view_invalidated_docs_tree_name,
54
        view_omitted_docs_tree_name, ViewEntry,
55
    },
56
    Storage,
57
};
58

            
59
pub mod keyvalue;
60

            
61
pub mod pubsub;
62

            
63
/// A local, file-based database.
64
///
65
/// ## Using `Database` to create a single database
66
///
67
/// `Database`provides an easy mechanism to open and access a single database:
68
///
69
/// ```rust
70
/// // `bonsaidb_core` is re-exported to `bonsaidb::core` or `bonsaidb_local::core`.
71
/// use bonsaidb_core::schema::Collection;
72
/// // `bonsaidb_local` is re-exported to `bonsaidb::local` if using the omnibus crate.
73
/// use bonsaidb_local::{
74
///     config::{Builder, StorageConfiguration},
75
///     Database,
76
/// };
77
/// use serde::{Deserialize, Serialize};
78
///
79
/// #[derive(Debug, Serialize, Deserialize, Collection)]
80
/// #[collection(name = "blog-posts")]
81
/// # #[collection(core = bonsaidb_core)]
82
/// struct BlogPost {
83
///     pub title: String,
84
///     pub contents: String,
85
/// }
86
///
87
/// # async fn test_fn() -> Result<(), bonsaidb_core::Error> {
88
/// let db = Database::open::<BlogPost>(StorageConfiguration::new("my-db.bonsaidb")).await?;
89
/// #     Ok(())
90
/// # }
91
/// ```
92
///
93
/// Under the hood, this initializes a [`Storage`] instance pointing at
94
/// "./my-db.bonsaidb". It then returns (or creates) a database named "default"
95
/// with the schema `BlogPost`.
96
///
97
/// In this example, `BlogPost` implements the [`Collection`] trait, and all
98
/// collections can be used as a [`Schema`].
99
#[derive(Debug)]
100
pub struct Database {
101
    pub(crate) data: Arc<Data>,
102
}
103

            
104
#[derive(Debug)]
105
pub struct Data {
106
    pub name: Arc<Cow<'static, str>>,
107
    context: Context,
108
    pub(crate) storage: Storage,
109
    pub(crate) schema: Arc<Schematic>,
110
    #[allow(dead_code)] // This code was previously used, it works, but is currently unused.
111
    pub(crate) effective_permissions: Option<Permissions>,
112
}
113
impl Clone for Database {
114
1027902
    fn clone(&self) -> Self {
115
1027902
        Self {
116
1027902
            data: self.data.clone(),
117
1027902
        }
118
1027902
    }
119
}
120

            
121
impl Database {
122
    /// Opens a local file as a bonsaidb.
123
87042
    pub(crate) async fn new<DB: Schema, S: Into<Cow<'static, str>> + Send>(
124
87042
        name: S,
125
87042
        context: Context,
126
87042
        storage: Storage,
127
87042
    ) -> Result<Self, Error> {
128
86970
        let name = name.into();
129
86970
        let schema = Arc::new(DB::schematic()?);
130
86970
        let db = Self {
131
86970
            data: Arc::new(Data {
132
86970
                name: Arc::new(name),
133
86970
                context,
134
86970
                storage: storage.clone(),
135
86970
                schema,
136
86970
                effective_permissions: None,
137
86970
            }),
138
86970
        };
139
86970

            
140
86970
        if db.data.storage.check_view_integrity_on_database_open() {
141
16
            for view in db.data.schema.views() {
142
16
                db.data
143
16
                    .storage
144
16
                    .tasks()
145
16
                    .spawn_integrity_check(view, &db)
146
                    .await?;
147
            }
148
86966
        }
149

            
150
86970
        storage.tasks().spawn_key_value_expiration_loader(&db).await;
151

            
152
87042
        Ok(db)
153
87042
    }
154

            
155
    /// Returns a clone with `effective_permissions`. Replaces any previously applied permissions.
156
    ///
157
    /// # Unstable
158
    ///
159
    /// See [this issue](https://github.com/khonsulabs/bonsaidb/issues/68).
160
    #[doc(hidden)]
161
    #[must_use]
162
    pub fn with_effective_permissions(&self, effective_permissions: Permissions) -> Self {
163
        Self {
164
            data: Arc::new(Data {
165
                name: self.data.name.clone(),
166
                context: self.data.context.clone(),
167
                storage: self.data.storage.clone(),
168
                schema: self.data.schema.clone(),
169
                effective_permissions: Some(effective_permissions),
170
            }),
171
        }
172
    }
173

            
174
    /// Returns the name of the database.
175
    #[must_use]
176
1352
    pub fn name(&self) -> &str {
177
1352
        self.data.name.as_ref()
178
1352
    }
179

            
180
    /// Creates a `Storage` with a single-database named "default" with its data stored at `path`.
181
19
    pub async fn open<DB: Schema>(configuration: StorageConfiguration) -> Result<Self, Error> {
182
297
        let storage = Storage::open(configuration.with_schema::<DB>()?).await?;
183

            
184
32
        storage.create_database::<DB>("default", true).await?;
185

            
186
19
        Ok(storage.database::<DB>("default").await?)
187
19
    }
188

            
189
    /// Returns the [`Storage`] that this database belongs to.
190
    #[must_use]
191
2402874
    pub fn storage(&self) -> &'_ Storage {
192
2402874
        &self.data.storage
193
2402874
    }
194

            
195
    /// Returns the [`Schematic`] for the schema for this database.
196
    #[must_use]
197
2116090
    pub fn schematic(&self) -> &'_ Schematic {
198
2116090
        &self.data.schema
199
2116090
    }
200

            
201
1371889
    pub(crate) fn roots(&self) -> &'_ nebari::Roots<AnyFile> {
202
1371889
        &self.data.context.roots
203
1371889
    }
204

            
205
52729
    async fn for_each_in_view<
206
52729
        F: FnMut(ViewEntryCollection) -> Result<(), bonsaidb_core::Error> + Send + Sync,
207
52729
    >(
208
52729
        &self,
209
52729
        view: &dyn view::Serialized,
210
52729
        key: Option<QueryKey<Bytes>>,
211
52729
        order: Sort,
212
52729
        limit: Option<usize>,
213
52729
        access_policy: AccessPolicy,
214
52729
        mut callback: F,
215
52729
    ) -> Result<(), bonsaidb_core::Error> {
216
52729
        if matches!(access_policy, AccessPolicy::UpdateBefore) {
217
40922
            self.data
218
40922
                .storage
219
40922
                .tasks()
220
40922
                .update_view_if_needed(view, self)
221
35432
                .await?;
222
11807
        }
223

            
224
52729
        let view_entries = self
225
52729
            .roots()
226
52729
            .tree(self.collection_tree(
227
52729
                &view.collection(),
228
52729
                view_entries_tree_name(&view.view_name()),
229
52729
            )?)
230
52729
            .map_err(Error::from)?;
231

            
232
        {
233
52729
            for entry in Self::create_view_iterator(&view_entries, key, order, limit)? {
234
25483
                callback(entry)?;
235
            }
236
        }
237

            
238
52729
        if matches!(access_policy, AccessPolicy::UpdateAfter) {
239
5
            let db = self.clone();
240
5
            let view_name = view.view_name();
241
5
            tokio::task::spawn(async move {
242
5
                let view = db
243
5
                    .data
244
5
                    .schema
245
5
                    .view_by_name(&view_name)
246
5
                    .expect("query made with view that isn't registered with this database");
247
5
                db.data
248
5
                    .storage
249
5
                    .tasks()
250
10
                    .update_view_if_needed(view, &db)
251
10
                    .await
252
5
            });
253
52724
        }
254

            
255
52729
        Ok(())
256
52729
    }
257

            
258
36608
    async fn for_each_view_entry<
259
36608
        V: schema::View,
260
36608
        F: FnMut(ViewEntryCollection) -> Result<(), bonsaidb_core::Error> + Send + Sync,
261
36608
    >(
262
36608
        &self,
263
36608
        key: Option<QueryKey<V::Key>>,
264
36608
        order: Sort,
265
36608
        limit: Option<usize>,
266
36608
        access_policy: AccessPolicy,
267
36608
        callback: F,
268
36608
    ) -> Result<(), bonsaidb_core::Error> {
269
36608
        let view = self
270
36608
            .data
271
36608
            .schema
272
36608
            .view::<V>()
273
36608
            .expect("query made with view that isn't registered with this database");
274
36608

            
275
36608
        self.for_each_in_view(
276
36608
            view,
277
36608
            key.map(|key| key.serialized()).transpose()?,
278
36608
            order,
279
36608
            limit,
280
36608
            access_policy,
281
36608
            callback,
282
33213
        )
283
33213
        .await
284
36608
    }
285

            
286
    #[cfg(feature = "internal-apis")]
287
    #[doc(hidden)]
288
193584
    pub async fn internal_get_from_collection_id(
289
193584
        &self,
290
193584
        id: u64,
291
193584
        collection: &CollectionName,
292
193584
    ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error> {
293
8066
        self.get_from_collection_id(id, collection).await
294
8066
    }
295

            
296
    #[cfg(feature = "internal-apis")]
297
    #[doc(hidden)]
298
192
    pub async fn list_from_collection(
299
192
        &self,
300
192
        ids: Range<u64>,
301
192
        order: Sort,
302
192
        limit: Option<usize>,
303
192
        collection: &CollectionName,
304
192
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
305
8
        self.list(ids, order, limit, collection).await
306
8
    }
307

            
308
    #[cfg(feature = "internal-apis")]
309
    #[doc(hidden)]
310
96
    pub async fn internal_get_multiple_from_collection_id(
311
96
        &self,
312
96
        ids: &[u64],
313
96
        collection: &CollectionName,
314
96
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
315
4
        self.get_multiple_from_collection_id(ids, collection).await
316
4
    }
317

            
318
    #[cfg(feature = "internal-apis")]
319
    #[doc(hidden)]
320
48
    pub async fn compact_collection_by_name(
321
48
        &self,
322
48
        collection: CollectionName,
323
48
    ) -> Result<(), bonsaidb_core::Error> {
324
2
        self.storage()
325
2
            .tasks()
326
2
            .compact_collection(self.clone(), collection)
327
2
            .await?;
328
2
        Ok(())
329
2
    }
330

            
331
    #[cfg(feature = "internal-apis")]
332
    #[doc(hidden)]
333
97968
    pub async fn query_by_name(
334
97968
        &self,
335
97968
        view: &ViewName,
336
97968
        key: Option<QueryKey<Bytes>>,
337
97968
        order: Sort,
338
97968
        limit: Option<usize>,
339
97968
        access_policy: AccessPolicy,
340
97968
    ) -> Result<Vec<bonsaidb_core::schema::view::map::Serialized>, bonsaidb_core::Error> {
341
4082
        if let Some(view) = self.schematic().view_by_name(view) {
342
4082
            let mut results = Vec::new();
343
4082
            self.for_each_in_view(view, key, order, limit, access_policy, |collection| {
344
4080
                let entry = ViewEntry::from(collection);
345
8178
                for mapping in entry.mappings {
346
4098
                    results.push(bonsaidb_core::schema::view::map::Serialized {
347
4098
                        source: mapping.source,
348
4098
                        key: entry.key.clone(),
349
4098
                        value: mapping.value,
350
4098
                    });
351
4098
                }
352
4080
                Ok(())
353
4082
            })
354
1963
            .await?;
355

            
356
4082
            Ok(results)
357
        } else {
358
            Err(bonsaidb_core::Error::CollectionNotFound)
359
        }
360
4082
    }
361

            
362
    #[cfg(feature = "internal-apis")]
363
    #[doc(hidden)]
364
96768
    pub async fn query_by_name_with_docs(
365
96768
        &self,
366
96768
        view: &ViewName,
367
96768
        key: Option<QueryKey<Bytes>>,
368
96768
        order: Sort,
369
96768
        limit: Option<usize>,
370
96768
        access_policy: AccessPolicy,
371
96768
    ) -> Result<bonsaidb_core::schema::view::map::MappedSerializedDocuments, bonsaidb_core::Error>
372
96768
    {
373
4032
        let results = self
374
4032
            .query_by_name(view, key, order, limit, access_policy)
375
1928
            .await?;
376
4032
        let view = self.schematic().view_by_name(view).unwrap(); // query() will fail if it's not present
377

            
378
4032
        let documents = self
379
4032
            .get_multiple_from_collection_id(
380
4032
                &results.iter().map(|m| m.source.id).collect::<Vec<_>>(),
381
4032
                &view.collection(),
382
4032
            )
383
2202
            .await?
384
4032
            .into_iter()
385
4032
            .map(|doc| (doc.header.id, doc))
386
4032
            .collect::<BTreeMap<_, _>>();
387
4032

            
388
4032
        Ok(
389
4032
            bonsaidb_core::schema::view::map::MappedSerializedDocuments {
390
4032
                mappings: results,
391
4032
                documents,
392
4032
            },
393
4032
        )
394
4032
    }
395

            
396
    #[cfg(feature = "internal-apis")]
397
    #[doc(hidden)]
398
192072
    pub async fn reduce_by_name(
399
192072
        &self,
400
192072
        view: &ViewName,
401
192072
        key: Option<QueryKey<Bytes>>,
402
192072
        access_policy: AccessPolicy,
403
192072
    ) -> Result<Vec<u8>, bonsaidb_core::Error> {
404
8004
        self.reduce_in_view(view, key, access_policy).await
405
8003
    }
406

            
407
    #[cfg(feature = "internal-apis")]
408
    #[doc(hidden)]
409
144
    pub async fn reduce_grouped_by_name(
410
144
        &self,
411
144
        view: &ViewName,
412
144
        key: Option<QueryKey<Bytes>>,
413
144
        access_policy: AccessPolicy,
414
144
    ) -> Result<Vec<MappedSerializedValue>, bonsaidb_core::Error> {
415
7
        self.grouped_reduce_in_view(view, key, access_policy).await
416
6
    }
417

            
418
    #[cfg(feature = "internal-apis")]
419
    #[doc(hidden)]
420
48
    pub async fn delete_docs_by_name(
421
48
        &self,
422
48
        view: &ViewName,
423
48
        key: Option<QueryKey<Bytes>>,
424
48
        access_policy: AccessPolicy,
425
48
    ) -> Result<u64, bonsaidb_core::Error> {
426
2
        let view = self
427
2
            .data
428
2
            .schema
429
2
            .view_by_name(view)
430
2
            .ok_or(bonsaidb_core::Error::CollectionNotFound)?;
431
2
        let collection = view.collection();
432
2
        let mut transaction = Transaction::default();
433
2
        self.for_each_in_view(view, key, Sort::Ascending, None, access_policy, |entry| {
434
2
            let entry = ViewEntry::from(entry);
435

            
436
6
            for mapping in entry.mappings {
437
4
                transaction.push(Operation::delete(collection.clone(), mapping.source));
438
4
            }
439

            
440
2
            Ok(())
441
2
        })
442
2
        .await?;
443

            
444
2
        let results = Connection::apply_transaction(self, transaction).await?;
445

            
446
2
        Ok(results.len() as u64)
447
2
    }
448

            
449
266827
    async fn get_from_collection_id(
450
266827
        &self,
451
266827
        id: u64,
452
266827
        collection: &CollectionName,
453
266827
    ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error> {
454
11159
        let task_self = self.clone();
455
11159
        let collection = collection.clone();
456
11159
        tokio::task::spawn_blocking(move || {
457
11159
            let tree = task_self
458
11159
                .data
459
11159
                .context
460
11159
                .roots
461
11159
                .tree(task_self.collection_tree::<Versioned, _>(
462
11159
                    &collection,
463
11159
                    document_tree_name(&collection),
464
11159
                )?)
465
11159
                .map_err(Error::from)?;
466
10647
            if let Some(vec) = tree
467
                .get(
468
11159
                    &id.as_big_endian_bytes()
469
11159
                        .map_err(view::Error::key_serialization)?,
470
                )
471
11159
                .map_err(Error::from)?
472
            {
473
10647
                Ok(Some(deserialize_document(&vec)?.into_owned()))
474
            } else {
475
511
                Ok(None)
476
            }
477
11159
        })
478
6324
        .await
479
11159
        .unwrap()
480
11159
    }
481

            
482
149708
    async fn get_multiple_from_collection_id(
483
149708
        &self,
484
149708
        ids: &[u64],
485
149708
        collection: &CollectionName,
486
149708
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
487
6257
        let task_self = self.clone();
488
6263
        let ids = ids.iter().map(|id| id.to_be_bytes()).collect::<Vec<_>>();
489
6257
        let collection = collection.clone();
490
6257
        tokio::task::spawn_blocking(move || {
491
6257
            let tree = task_self
492
6257
                .data
493
6257
                .context
494
6257
                .roots
495
6257
                .tree(task_self.collection_tree::<Versioned, _>(
496
6257
                    &collection,
497
6257
                    document_tree_name(&collection),
498
6257
                )?)
499
6257
                .map_err(Error::from)?;
500
6263
            let mut ids = ids.iter().map(|id| &id[..]).collect::<Vec<_>>();
501
6257
            ids.sort();
502
6257
            let keys_and_values = tree.get_multiple(&ids).map_err(Error::from)?;
503

            
504
6257
            keys_and_values
505
6257
                .into_iter()
506
6263
                .map(|(_, value)| deserialize_document(&value).map(BorrowedDocument::into_owned))
507
6257
                .collect()
508
6257
        })
509
3400
        .await
510
6257
        .unwrap()
511
6257
    }
512

            
513
469
    pub(crate) async fn list(
514
469
        &self,
515
469
        ids: Range<u64>,
516
469
        sort: Sort,
517
469
        limit: Option<usize>,
518
469
        collection: &CollectionName,
519
469
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
520
32
        let task_self = self.clone();
521
32
        let collection = collection.clone();
522
32
        tokio::task::spawn_blocking(move || {
523
32
            let tree = task_self
524
32
                .data
525
32
                .context
526
32
                .roots
527
32
                .tree(task_self.collection_tree::<Versioned, _>(
528
32
                    &collection,
529
32
                    document_tree_name(&collection),
530
32
                )?)
531
32
                .map_err(Error::from)?;
532
32
            let mut found_docs = Vec::new();
533
32
            let mut keys_read = 0;
534
32
            let ids = U64Range::new(ids);
535
32
            tree.scan(
536
32
                &ids.borrow_as_bytes(),
537
32
                match sort {
538
27
                    Sort::Ascending => true,
539
5
                    Sort::Descending => false,
540
                },
541
                |_, _, _| true,
542
32
                |_, _| {
543
44
                    if let Some(limit) = limit {
544
10
                        if keys_read >= limit {
545
5
                            return KeyEvaluation::Stop;
546
5
                        }
547
5

            
548
5
                        keys_read += 1;
549
34
                    }
550
39
                    KeyEvaluation::ReadData
551
44
                },
552
32
                |_, _, doc| {
553
                    found_docs.push(
554
39
                        deserialize_document(&doc)
555
39
                            .map(BorrowedDocument::into_owned)
556
39
                            .map_err(AbortError::Other)?,
557
                    );
558
39
                    Ok(())
559
39
                },
560
            )
561
32
            .map_err(|err| match err {
562
                AbortError::Other(err) => err,
563
                AbortError::Nebari(err) => bonsaidb_core::Error::from(crate::Error::from(err)),
564
32
            })
565
32
            .unwrap();
566
32

            
567
32
            Ok(found_docs)
568
32
        })
569
29
        .await
570
32
        .unwrap()
571
32
    }
572

            
573
288346
    async fn reduce_in_view(
574
288346
        &self,
575
288346
        view_name: &ViewName,
576
288346
        key: Option<QueryKey<Bytes>>,
577
288346
        access_policy: AccessPolicy,
578
288346
    ) -> Result<Vec<u8>, bonsaidb_core::Error> {
579
12025
        let view = self
580
12025
            .data
581
12025
            .schema
582
12025
            .view_by_name(view_name)
583
12025
            .ok_or(bonsaidb_core::Error::CollectionNotFound)?;
584
12025
        let mut mappings = self
585
12027
            .grouped_reduce_in_view(view_name, key, access_policy)
586
247
            .await?;
587

            
588
12025
        let result = if mappings.len() == 1 {
589
5959
            mappings.pop().unwrap().value.into_vec()
590
        } else {
591
6066
            view.reduce(
592
6066
                &mappings
593
6066
                    .iter()
594
6203
                    .map(|map| (map.key.as_ref(), map.value.as_ref()))
595
6066
                    .collect::<Vec<_>>(),
596
6066
                true,
597
6066
            )
598
6066
            .map_err(Error::from)?
599
        };
600

            
601
12019
        Ok(result)
602
12024
    }
603

            
604
288542
    async fn grouped_reduce_in_view(
605
288542
        &self,
606
288542
        view_name: &ViewName,
607
288542
        key: Option<QueryKey<Bytes>>,
608
288542
        access_policy: AccessPolicy,
609
288542
    ) -> Result<Vec<MappedSerializedValue>, bonsaidb_core::Error> {
610
12037
        let view = self
611
12037
            .data
612
12037
            .schema
613
12037
            .view_by_name(view_name)
614
12037
            .ok_or(bonsaidb_core::Error::CollectionNotFound)?;
615
12037
        let mut mappings = Vec::new();
616
12188
        self.for_each_in_view(view, key, Sort::Ascending, None, access_policy, |entry| {
617
6134
            let entry = ViewEntry::from(entry);
618
6134
            mappings.push(MappedSerializedValue {
619
6134
                key: entry.key,
620
6134
                value: entry.reduced_value,
621
6134
            });
622
6134
            Ok(())
623
12190
        })
624
255
        .await?;
625

            
626
12036
        Ok(mappings)
627
12036
    }
628

            
629
333712
    fn apply_transaction_to_roots(
630
333712
        &self,
631
333712
        transaction: &Transaction,
632
333712
    ) -> Result<Vec<OperationResult>, Error> {
633
333712
        let mut open_trees = OpenTrees::default();
634
809263
        for op in &transaction.operations {
635
475625
            if !self.data.schema.contains_collection_id(&op.collection) {
636
74
                return Err(Error::Core(bonsaidb_core::Error::CollectionNotFound));
637
475551
            }
638
475551

            
639
475551
            open_trees.open_trees_for_document_change(
640
475551
                &op.collection,
641
475551
                &self.data.schema,
642
475551
                self.collection_encryption_key(&op.collection),
643
475551
                #[cfg(feature = "encryption")]
644
475551
                self.storage().vault(),
645
475551
            )?;
646
        }
647

            
648
333638
        let mut roots_transaction = self
649
333638
            .data
650
333638
            .context
651
333638
            .roots
652
333638
            .transaction::<_, dyn AnyTreeRoot<AnyFile>>(&open_trees.trees)?;
653

            
654
333638
        let mut results = Vec::new();
655
333638
        let mut changed_documents = Vec::new();
656
796598
        for op in &transaction.operations {
657
475551
            let result = self.execute_operation(
658
475551
                op,
659
475551
                &mut roots_transaction,
660
475551
                &open_trees.trees_index_by_name,
661
475551
            )?;
662

            
663
462960
            match &result {
664
431366
                OperationResult::DocumentUpdated { header, collection } => {
665
431366
                    changed_documents.push(ChangedDocument {
666
431366
                        collection: collection.clone(),
667
431366
                        id: header.id,
668
431366
                        deleted: false,
669
431366
                    });
670
431366
                }
671
31594
                OperationResult::DocumentDeleted { id, collection } => {
672
31594
                    changed_documents.push(ChangedDocument {
673
31594
                        collection: collection.clone(),
674
31594
                        id: *id,
675
31594
                        deleted: true,
676
31594
                    });
677
31594
                }
678
                OperationResult::Success => {}
679
            }
680
462960
            results.push(result);
681
        }
682

            
683
        // Insert invalidations for each record changed
684
322151
        for (collection, changed_documents) in &changed_documents
685
321047
            .iter()
686
462960
            .group_by(|doc| doc.collection.clone())
687
        {
688
322150
            if let Some(views) = self.data.schema.views_in_collection(&collection) {
689
205533
                let changed_documents = changed_documents.collect::<Vec<_>>();
690
895214
                for view in views {
691
689657
                    if !view.unique() {
692
655323
                        let view_name = view.view_name();
693
1371254
                        for changed_document in &changed_documents {
694
715907
                            let invalidated_docs = roots_transaction
695
715907
                                .tree::<Unversioned>(
696
715907
                                    open_trees.trees_index_by_name
697
715907
                                        [&view_invalidated_docs_tree_name(&view_name)],
698
715907
                                )
699
715907
                                .unwrap();
700
715907
                            invalidated_docs.set(
701
715907
                                changed_document.id.as_big_endian_bytes().unwrap().to_vec(),
702
715907
                                b"",
703
715907
                            )?;
704
                        }
705
34334
                    }
706
                }
707
116617
            }
708
        }
709

            
710
321047
        roots_transaction
711
321047
            .entry_mut()
712
321047
            .set_data(pot::to_vec(&Changes::Documents(changed_documents))?)?;
713

            
714
321047
        roots_transaction.commit()?;
715

            
716
321047
        Ok(results)
717
333712
    }
718

            
719
475551
    fn execute_operation(
720
475551
        &self,
721
475551
        operation: &Operation,
722
475551
        transaction: &mut ExecutingTransaction<AnyFile>,
723
475551
        tree_index_map: &HashMap<String, usize>,
724
475551
    ) -> Result<OperationResult, Error> {
725
475551
        match &operation.command {
726
346625
            Command::Insert { id, contents } => self.execute_insert(
727
346625
                operation,
728
346625
                transaction,
729
346625
                tree_index_map,
730
346625
                *id,
731
346625
                contents.to_vec(),
732
346625
            ),
733
97332
            Command::Update { header, contents } => self.execute_update(
734
97332
                operation,
735
97332
                transaction,
736
97332
                tree_index_map,
737
97332
                header,
738
97332
                contents.to_vec(),
739
97332
            ),
740
31594
            Command::Delete { header } => {
741
31594
                self.execute_delete(operation, transaction, tree_index_map, header)
742
            }
743
        }
744
475551
    }
745

            
746
97332
    fn execute_update(
747
97332
        &self,
748
97332
        operation: &Operation,
749
97332
        transaction: &mut ExecutingTransaction<AnyFile>,
750
97332
        tree_index_map: &HashMap<String, usize>,
751
97332
        header: &Header,
752
97332
        contents: Vec<u8>,
753
97332
    ) -> Result<OperationResult, crate::Error> {
754
97332
        let documents = transaction
755
97332
            .tree::<Versioned>(tree_index_map[&document_tree_name(&operation.collection)])
756
97332
            .unwrap();
757
97332
        let document_id = header.id.as_big_endian_bytes().unwrap();
758
        // TODO switch to compare_swap
759

            
760
97332
        if let Some(vec) = documents.get(document_id.as_ref())? {
761
97258
            let doc = deserialize_document(&vec)?;
762
97258
            if doc.header.revision == header.revision {
763
97110
                if let Some(updated_doc) = doc.create_new_revision(contents) {
764
                    documents.set(
765
97036
                        updated_doc
766
97036
                            .header
767
97036
                            .id
768
97036
                            .as_big_endian_bytes()
769
97036
                            .unwrap()
770
97036
                            .as_ref()
771
97036
                            .to_vec(),
772
97036
                        serialize_document(&updated_doc)?,
773
                    )?;
774

            
775
97036
                    self.update_unique_views(&document_id, operation, transaction, tree_index_map)?;
776

            
777
96888
                    Ok(OperationResult::DocumentUpdated {
778
96888
                        collection: operation.collection.clone(),
779
96888
                        header: updated_doc.header,
780
96888
                    })
781
                } else {
782
                    // If no new revision was made, it means an attempt to
783
                    // save a document with the same contents was made.
784
                    // We'll return a success but not actually give a new
785
                    // version
786
74
                    Ok(OperationResult::DocumentUpdated {
787
74
                        collection: operation.collection.clone(),
788
74
                        header: doc.header,
789
74
                    })
790
                }
791
            } else {
792
148
                Err(Error::Core(bonsaidb_core::Error::DocumentConflict(
793
148
                    operation.collection.clone(),
794
148
                    header.id,
795
148
                )))
796
            }
797
        } else {
798
74
            Err(Error::Core(bonsaidb_core::Error::DocumentNotFound(
799
74
                operation.collection.clone(),
800
74
                header.id,
801
74
            )))
802
        }
803
97332
    }
804

            
805
346625
    fn execute_insert(
806
346625
        &self,
807
346625
        operation: &Operation,
808
346625
        transaction: &mut ExecutingTransaction<AnyFile>,
809
346625
        tree_index_map: &HashMap<String, usize>,
810
346625
        id: Option<u64>,
811
346625
        contents: Vec<u8>,
812
346625
    ) -> Result<OperationResult, Error> {
813
346625
        let documents = transaction
814
346625
            .tree::<Versioned>(tree_index_map[&document_tree_name(&operation.collection)])
815
346625
            .unwrap();
816
346625
        let id = if let Some(id) = id {
817
153368
            id
818
        } else {
819
193257
            let last_key = documents
820
193257
                .last_key()?
821
193257
                .map(|bytes| BigEndian::read_u64(&bytes))
822
193257
                .unwrap_or_default();
823
193257
            last_key + 1
824
        };
825

            
826
346625
        let doc = BorrowedDocument::new(id, contents);
827
346625
        let serialized: Vec<u8> = serialize_document(&doc)?;
828
346625
        let document_id = ArcBytes::from(doc.header.id.as_big_endian_bytes().unwrap().to_vec());
829
346625
        if documents
830
346625
            .replace(document_id.clone(), serialized)?
831
346625
            .is_some()
832
        {
833
12099
            Err(Error::Core(bonsaidb_core::Error::DocumentConflict(
834
12099
                operation.collection.clone(),
835
12099
                id,
836
12099
            )))
837
        } else {
838
334526
            self.update_unique_views(&document_id, operation, transaction, tree_index_map)?;
839

            
840
334404
            Ok(OperationResult::DocumentUpdated {
841
334404
                collection: operation.collection.clone(),
842
334404
                header: doc.header,
843
334404
            })
844
        }
845
346625
    }
846

            
847
31594
    fn execute_delete(
848
31594
        &self,
849
31594
        operation: &Operation,
850
31594
        transaction: &mut ExecutingTransaction<AnyFile>,
851
31594
        tree_index_map: &HashMap<String, usize>,
852
31594
        header: &Header,
853
31594
    ) -> Result<OperationResult, Error> {
854
31594
        let documents = transaction
855
31594
            .tree::<Versioned>(tree_index_map[&document_tree_name(&operation.collection)])
856
31594
            .unwrap();
857
31594
        let document_id = header.id.as_big_endian_bytes().unwrap();
858
31594
        if let Some(vec) = documents.remove(&document_id)? {
859
31594
            let doc = deserialize_document(&vec)?;
860
31594
            if &doc.header == header {
861
31594
                self.update_unique_views(
862
31594
                    document_id.as_ref(),
863
31594
                    operation,
864
31594
                    transaction,
865
31594
                    tree_index_map,
866
31594
                )?;
867

            
868
31594
                Ok(OperationResult::DocumentDeleted {
869
31594
                    collection: operation.collection.clone(),
870
31594
                    id: header.id,
871
31594
                })
872
            } else {
873
                Err(Error::Core(bonsaidb_core::Error::DocumentConflict(
874
                    operation.collection.clone(),
875
                    header.id,
876
                )))
877
            }
878
        } else {
879
            Err(Error::Core(bonsaidb_core::Error::DocumentNotFound(
880
                operation.collection.clone(),
881
                header.id,
882
            )))
883
        }
884
31594
    }
885

            
886
    fn update_unique_views(
887
        &self,
888
        document_id: &[u8],
889
        operation: &Operation,
890
        transaction: &mut ExecutingTransaction<AnyFile>,
891
        tree_index_map: &HashMap<String, usize>,
892
    ) -> Result<(), Error> {
893
463156
        if let Some(unique_views) = self
894
463156
            .data
895
463156
            .schema
896
463156
            .unique_views_in_collection(&operation.collection)
897
        {
898
139786
            for view in unique_views {
899
70028
                let name = view.view_name();
900
70028
                mapper::DocumentRequest {
901
70028
                    database: self,
902
70028
                    document_id,
903
70028
                    map_request: &mapper::Map {
904
70028
                        database: self.data.name.clone(),
905
70028
                        collection: operation.collection.clone(),
906
70028
                        view_name: name.clone(),
907
70028
                    },
908
70028
                    transaction,
909
70028
                    document_map_index: tree_index_map[&view_document_map_tree_name(&name)],
910
70028
                    documents_index: tree_index_map[&document_tree_name(&operation.collection)],
911
70028
                    omitted_entries_index: tree_index_map[&view_omitted_docs_tree_name(&name)],
912
70028
                    view_entries_index: tree_index_map[&view_entries_tree_name(&name)],
913
70028
                    view,
914
70028
                }
915
70028
                .map()?;
916
            }
917
393128
        }
918

            
919
462886
        Ok(())
920
463156
    }
921

            
922
476964
    fn create_view_iterator<'a, K: for<'k> Key<'k> + 'a>(
923
476964
        view_entries: &'a Tree<Unversioned, AnyFile>,
924
476964
        key: Option<QueryKey<K>>,
925
476964
        order: Sort,
926
476964
        limit: Option<usize>,
927
476964
    ) -> Result<Vec<ViewEntryCollection>, Error> {
928
476964
        let mut values = Vec::new();
929
476964
        let forwards = match order {
930
476890
            Sort::Ascending => true,
931
74
            Sort::Descending => false,
932
        };
933
476964
        let mut values_read = 0;
934
476964
        if let Some(key) = key {
935
474023
            match key {
936
220
                QueryKey::Range(range) => {
937
220
                    let range = range
938
220
                        .as_big_endian_bytes()
939
220
                        .map_err(view::Error::key_serialization)?;
940
220
                    view_entries.scan::<Infallible, _, _, _, _>(
941
440
                        &range.map_ref(|bytes| &bytes[..]),
942
220
                        forwards,
943
508
                        |_, _, _| true,
944
220
                        |_, _| {
945
1256
                            if let Some(limit) = limit {
946
148
                                if values_read >= limit {
947
74
                                    return KeyEvaluation::Stop;
948
74
                                }
949
74
                                values_read += 1;
950
1108
                            }
951
1182
                            KeyEvaluation::ReadData
952
1256
                        },
953
1182
                        |_key, _index, value| {
954
1182
                            values.push(value);
955
1182
                            Ok(())
956
1182
                        },
957
220
                    )?;
958
                }
959
473631
                QueryKey::Matches(key) => {
960
473631
                    let key = key
961
473631
                        .as_big_endian_bytes()
962
473631
                        .map_err(view::Error::key_serialization)?
963
473631
                        .to_vec();
964
473631

            
965
473631
                    values.extend(view_entries.get(&key)?);
966
                }
967
172
                QueryKey::Multiple(list) => {
968
172
                    let mut list = list
969
172
                        .into_iter()
970
344
                        .map(|key| {
971
344
                            key.as_big_endian_bytes()
972
344
                                .map(|bytes| bytes.to_vec())
973
344
                                .map_err(view::Error::key_serialization)
974
344
                        })
975
172
                        .collect::<Result<Vec<_>, _>>()?;
976

            
977
172
                    list.sort();
978
172

            
979
172
                    values.extend(
980
172
                        view_entries
981
172
                            .get_multiple(&list.iter().map(Vec::as_slice).collect::<Vec<_>>())?
982
172
                            .into_iter()
983
344
                            .map(|(_, value)| value),
984
                    );
985
                }
986
            }
987
        } else {
988
2941
            view_entries.scan::<Infallible, _, _, _, _>(
989
2941
                &(..),
990
2941
                forwards,
991
2941
                |_, _, _| true,
992
2941
                |_, _| {
993
4061
                    if let Some(limit) = limit {
994
                        if values_read >= limit {
995
                            return KeyEvaluation::Stop;
996
                        }
997
                        values_read += 1;
998
4061
                    }
999
4061
                    KeyEvaluation::ReadData
4117
                },
4117
                |_, _, value| {
4061
                    values.push(value);
4061
                    Ok(())
4117
                },
2941
            )?;
        }

            
476964
        values
476964
            .into_iter()
476964
            .map(|value| bincode::deserialize(&value).map_err(Error::from))
476964
            .collect::<Result<Vec<_>, Error>>()
476964
    }

            
1921030
    pub(crate) fn collection_encryption_key(&self, collection: &CollectionName) -> Option<&KeyId> {
1921030
        self.schematic()
1921030
            .encryption_key_for_collection(collection)
1921030
            .or_else(|| self.storage().default_encryption_key())
1921030
    }

            
    #[cfg_attr(
        not(feature = "encryption"),
        allow(unused_mut, unused_variables, clippy::let_and_return)
    )]
    #[cfg_attr(feature = "encryption", allow(clippy::unnecessary_wraps))]
1445575
    pub(crate) fn collection_tree<R: Root, S: Into<Cow<'static, str>>>(
1445575
        &self,
1445575
        collection: &CollectionName,
1445575
        name: S,
1445575
    ) -> Result<TreeRoot<R, AnyFile>, Error> {
1445575
        let mut tree = R::tree(name);

            
1445575
        if let Some(key) = self.collection_encryption_key(collection) {
            #[cfg(feature = "encryption")]
39367
            {
39367
                tree = tree.with_vault(TreeVault {
39367
                    key: key.clone(),
39367
                    vault: self.storage().vault().clone(),
39367
                });
39367
            }

            
            #[cfg(not(feature = "encryption"))]
            {
                return Err(Error::EncryptionDisabled);
            }
1406208
        }

            
1445575
        Ok(tree)
1445575
    }

            
1
    pub(crate) async fn update_key_expiration_async<'key>(
1
        &self,
1
        tree_key: impl Into<Cow<'key, str>>,
1
        expiration: Option<Timestamp>,
1
    ) {
1
        self.data
1
            .context
1
            .update_key_expiration_async(tree_key, expiration)
            .await;
1
    }
}

            
629010
pub(crate) fn deserialize_document(
629010
    bytes: &[u8],
629010
) -> Result<BorrowedDocument<'_>, bonsaidb_core::Error> {
629010
    let document = bincode::deserialize::<BorrowedDocument<'_>>(bytes).map_err(Error::from)?;
629010
    Ok(document)
629010
}

            
443661
fn serialize_document(document: &BorrowedDocument<'_>) -> Result<Vec<u8>, bonsaidb_core::Error> {
443661
    bincode::serialize(document)
443661
        .map_err(Error::from)
443661
        .map_err(bonsaidb_core::Error::from)
443661
}

            
#[async_trait]
impl Connection for Database {
1001136
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(transaction)))]
    async fn apply_transaction(
        &self,
        transaction: Transaction,
333712
    ) -> Result<Vec<OperationResult>, bonsaidb_core::Error> {
333712
        let task_self = self.clone();
333712
        let mut unique_view_tasks = Vec::new();
334840
        for collection_name in transaction
333712
            .operations
333712
            .iter()
475625
            .map(|op| &op.collection)
333712
            .collect::<HashSet<_>>()
        {
334839
            if let Some(views) = self.data.schema.views_in_collection(collection_name) {
957383
                for view in views {
739235
                    if view.unique() {
34604
                        if let Some(task) = self
34604
                            .data
34604
                            .storage
34604
                            .tasks()
34604
                            .spawn_integrity_check(view, self)
                            .await?
682
                        {
682
                            unique_view_tasks.push(task);
33922
                        }
704631
                    }
                }
116691
            }
        }
334370
        for task in unique_view_tasks {
658
            task.receive()
610
                .await
682
                .map_err(Error::from)?
682
                .map_err(Error::from)?;
        }

            
333712
        tokio::task::spawn_blocking(move || task_self.apply_transaction_to_roots(&transaction))
286714
            .await
333688
            .map_err(|err| bonsaidb_core::Error::Database(err.to_string()))?
333712
            .map_err(bonsaidb_core::Error::from)
667424
    }

            
9279
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(id)))]
    async fn get<C: schema::Collection>(
        &self,
        id: u64,
3093
    ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error> {
3093
        self.get_from_collection_id(id, &C::collection_name()).await
6186
    }

            
6663
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(ids)))]
    async fn get_multiple<C: schema::Collection>(
        &self,
        ids: &[u64],
2221
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
2221
        self.get_multiple_from_collection_id(ids, &C::collection_name())
1194
            .await
4442
    }

            
36
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(ids, order, limit)))]
    async fn list<C: schema::Collection, R: Into<Range<u64>> + Send>(
        &self,
        ids: R,
        order: Sort,
        limit: Option<usize>,
12
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
12
        self.list(ids.into(), order, limit, &C::collection_name())
12
            .await
24
    }

            
    #[cfg_attr(
        feature = "tracing",
76140
        tracing::instrument(skip(starting_id, result_limit))
    )]
    async fn list_executed_transactions(
        &self,
        starting_id: Option<u64>,
        result_limit: Option<usize>,
25380
    ) -> Result<Vec<transaction::Executed>, bonsaidb_core::Error> {
25380
        let result_limit = result_limit
25380
            .unwrap_or(LIST_TRANSACTIONS_DEFAULT_RESULT_COUNT)
25380
            .min(LIST_TRANSACTIONS_MAX_RESULTS);
25380
        if result_limit > 0 {
25306
            let task_self = self.clone();
25306
            tokio::task::spawn_blocking::<_, Result<Vec<transaction::Executed>, Error>>(move || {
25306
                let range = if let Some(starting_id) = starting_id {
12912
                    Range::from(starting_id..)
                } else {
12394
                    Range::from(..)
                };

            
25306
                let mut entries = Vec::new();
192286
                task_self.roots().transactions().scan(range, |entry| {
192286
                    entries.push(entry);
192286
                    entries.len() < result_limit
192286
                })?;

            
25306
                entries
25306
                    .into_iter()
25306
                    .map(|entry| {
192286
                        if let Some(data) = entry.data() {
192286
                            let changes = match pot::from_slice(data) {
192286
                                Ok(changes) => changes,
                                Err(pot::Error::NotAPot) => {
                                    Changes::Documents(bincode::deserialize(entry.data().unwrap())?)
                                }
                                other => other?,
                            };
192286
                            Ok(Some(transaction::Executed {
192286
                                id: entry.id,
192286
                                changes,
192286
                            }))
                        } else {
                            Ok(None)
                        }
192286
                    })
25306
                    .filter_map(Result::transpose)
25306
                    .collect::<Result<Vec<_>, Error>>()
25306
            })
19474
            .await
25306
            .unwrap()
25306
            .map_err(bonsaidb_core::Error::from)
        } else {
            // A request was made to return an empty result? This should probably be
            // an error, but technically this is a correct response.
74
            Ok(Vec::default())
        }
50760
    }

            
    #[cfg_attr(
        feature = "tracing",
109815
        tracing::instrument(skip(key, order, limit, access_policy))
    )]
    #[must_use]
    async fn query<V: schema::SerializedView>(
        &self,
        key: Option<QueryKey<V::Key>>,
        order: Sort,
        limit: Option<usize>,
        access_policy: AccessPolicy,
    ) -> Result<Vec<Map<V::Key, V::Value>>, bonsaidb_core::Error>
    where
        Self: Sized,
36605
    {
36605
        let mut results = Vec::new();
36605
        self.for_each_view_entry::<V, _>(key, order, limit, access_policy, |collection| {
15264
            let entry = ViewEntry::from(collection);
15264
            let key = <V::Key as Key>::from_big_endian_bytes(&entry.key)
15264
                .map_err(view::Error::key_serialization)
15264
                .map_err(Error::from)?;
30561
            for entry in entry.mappings {
                results.push(Map::new(
15297
                    entry.source,
15297
                    key.clone(),
15297
                    V::deserialize(&entry.value)?,
                ));
            }
15264
            Ok(())
36605
        })
33209
        .await?;

            
36605
        Ok(results)
73210
    }

            
    #[cfg_attr(
        feature = "tracing",
6630
        tracing::instrument(skip(key, order, limit, access_policy))
    )]
    async fn query_with_docs<V: schema::SerializedView>(
        &self,
        key: Option<QueryKey<V::Key>>,
        order: Sort,
        limit: Option<usize>,
        access_policy: AccessPolicy,
    ) -> Result<MappedDocuments<OwnedDocument, V>, bonsaidb_core::Error>
    where
        Self: Sized,
2210
    {
2212
        let results = Connection::query::<V>(self, key, order, limit, access_policy).await?;

            
2210
        let documents = self
2214
            .get_multiple::<V::Collection>(&results.iter().map(|m| m.source.id).collect::<Vec<_>>())
1183
            .await?
2210
            .into_iter()
2214
            .map(|doc| (doc.header.id, doc))
2210
            .collect::<BTreeMap<u64, _>>();
2210

            
2210
        Ok(MappedDocuments {
2210
            mappings: results,
2210
            documents,
2210
        })
4420
    }

            
12066
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(key, access_policy)))]
    async fn reduce<V: schema::SerializedView>(
        &self,
        key: Option<QueryKey<V::Key>>,
        access_policy: AccessPolicy,
    ) -> Result<V::Value, bonsaidb_core::Error>
    where
        Self: Sized,
4022
    {
4022
        let view = self
4022
            .data
4022
            .schema
4022
            .view::<V>()
4022
            .expect("query made with view that isn't registered with this database");

            
4019
        let result = self
            .reduce_in_view(
4022
                &view.view_name(),
4022
                key.map(|key| key.serialized()).transpose()?,
4022
                access_policy,
92
            )
92
            .await?;
4019
        let value = V::deserialize(&result)?;

            
4019
        Ok(value)
8044
    }

            
18
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(key, access_policy)))]
    async fn reduce_grouped<V: schema::SerializedView>(
        &self,
        key: Option<QueryKey<V::Key>>,
        access_policy: AccessPolicy,
    ) -> Result<Vec<MappedValue<V::Key, V::Value>>, bonsaidb_core::Error>
    where
        Self: Sized,
6
    {
6
        let view = self
6
            .data
6
            .schema
6
            .view::<V>()
6
            .expect("query made with view that isn't registered with this database");

            
6
        let results = self
            .grouped_reduce_in_view(
6
                &view.view_name(),
6
                key.map(|key| key.serialized()).transpose()?,
6
                access_policy,
3
            )
3
            .await?;
6
        results
6
            .into_iter()
9
            .map(|map| {
9
                Ok(MappedValue::new(
9
                    V::Key::from_big_endian_bytes(&map.key)
9
                        .map_err(view::Error::key_serialization)?,
9
                    V::deserialize(&map.value)?,
                ))
9
            })
6
            .collect::<Result<Vec<_>, bonsaidb_core::Error>>()
12
    }

            
3
    async fn delete_docs<V: schema::SerializedView>(
3
        &self,
3
        key: Option<QueryKey<V::Key>>,
3
        access_policy: AccessPolicy,
3
    ) -> Result<u64, bonsaidb_core::Error>
3
    where
3
        Self: Sized,
3
    {
3
        let collection = <V::Collection as Collection>::collection_name();
3
        let mut transaction = Transaction::default();
3
        self.for_each_view_entry::<V, _>(
3
            key,
3
            Sort::Ascending,
3
            None,
3
            access_policy,
3
            |entry_collection| {
3
                let entry = ViewEntry::from(entry_collection);

            
9
                for mapping in entry.mappings {
6
                    transaction.push(Operation::delete(collection.clone(), mapping.source));
6
                }

            
3
                Ok(())
3
            },
3
        )
3
        .await?;

            
3
        let results = Connection::apply_transaction(self, transaction).await?;

            
3
        Ok(results.len() as u64)
6
    }

            
895461
    #[cfg_attr(feature = "tracing", tracing::instrument)]
298487
    async fn last_transaction_id(&self) -> Result<Option<u64>, bonsaidb_core::Error> {
298487
        Ok(self.roots().transactions().current_transaction_id())
298487
    }

            
9
    #[cfg_attr(feature = "tracing", tracing::instrument)]
3
    async fn compact_collection<C: schema::Collection>(&self) -> Result<(), bonsaidb_core::Error> {
3
        self.storage()
3
            .tasks()
3
            .compact_collection(self.clone(), C::collection_name())
3
            .await?;
3
        Ok(())
6
    }

            
222
    #[cfg_attr(feature = "tracing", tracing::instrument)]
74
    async fn compact(&self) -> Result<(), bonsaidb_core::Error> {
74
        self.storage()
74
            .tasks()
74
            .compact_database(self.clone())
74
            .await?;
74
        Ok(())
148
    }

            
222
    #[cfg_attr(feature = "tracing", tracing::instrument)]
74
    async fn compact_key_value_store(&self) -> Result<(), bonsaidb_core::Error> {
74
        self.storage()
74
            .tasks()
74
            .compact_key_value_store(self.clone())
74
            .await?;
74
        Ok(())
148
    }
}

            
type ViewIterator<'a> =
    Box<dyn Iterator<Item = Result<(ArcBytes<'static>, ArcBytes<'static>), Error>> + 'a>;

            
struct ViewEntryCollectionIterator<'a> {
    iterator: ViewIterator<'a>,
}

            
impl<'a> Iterator for ViewEntryCollectionIterator<'a> {
    type Item = Result<ViewEntryCollection, crate::Error>;

            
    fn next(&mut self) -> Option<Self::Item> {
        self.iterator.next().map(|item| {
            item.map_err(crate::Error::from)
                .and_then(|(_, value)| bincode::deserialize(&value).map_err(Error::from))
        })
    }
}

            
1269265
#[derive(Debug, Clone)]
pub(crate) struct Context {
    data: Arc<ContextData>,
}

            
impl Deref for Context {
    type Target = ContextData;

            
2130296
    fn deref(&self) -> &Self::Target {
2130296
        &self.data
2130296
    }
}

            
#[derive(Debug)]
pub(crate) struct ContextData {
    pub(crate) roots: Roots<AnyFile>,
    key_value_state: Arc<Mutex<keyvalue::KeyValueState>>,
    runtime: tokio::runtime::Handle,
}

            
impl Borrow<Roots<AnyFile>> for Context {
    fn borrow(&self) -> &Roots<AnyFile> {
        &self.data.roots
    }
}

            
impl Context {
19393
    pub(crate) fn new(roots: Roots<AnyFile>, key_value_persistence: KeyValuePersistence) -> Self {
19393
        let (background_sender, background_receiver) =
19393
            watch::channel(BackgroundWorkerProcessTarget::Never);
19393
        let key_value_state = Arc::new(Mutex::new(keyvalue::KeyValueState::new(
19393
            key_value_persistence,
19393
            roots.clone(),
19393
            background_sender,
19393
        )));
19393
        let context = Self {
19393
            data: Arc::new(ContextData {
19393
                roots,
19393
                key_value_state: key_value_state.clone(),
19393
                runtime: tokio::runtime::Handle::current(),
19393
            }),
19393
        };
19393
        tokio::task::spawn(keyvalue::background_worker(
19393
            key_value_state,
19393
            background_receiver,
19393
        ));
19393
        context
19393
    }

            
748422
    pub(crate) async fn perform_kv_operation(
748422
        &self,
748422
        op: KeyOperation,
748422
    ) -> Result<Output, bonsaidb_core::Error> {
748422
        let mut state = fast_async_lock!(self.data.key_value_state);
748422
        state
748422
            .perform_kv_operation(op, &self.data.key_value_state)
            .await
748422
    }

            
11
    pub(crate) async fn update_key_expiration_async<'key>(
11
        &self,
11
        tree_key: impl Into<Cow<'key, str>>,
11
        expiration: Option<Timestamp>,
11
    ) {
11
        let mut state = fast_async_lock!(self.data.key_value_state);
11
        state.update_key_expiration(tree_key, expiration);
11
    }
}

            
impl Drop for ContextData {
17473
    fn drop(&mut self) {
17473
        let key_value_state = self.key_value_state.clone();
17473
        self.runtime.spawn(async move {
13353
            let mut state = fast_async_lock!(key_value_state);
13353
            state.shutdown(&key_value_state).await
17473
        });
17473
    }
}

            
1550168
pub fn document_tree_name(collection: &CollectionName) -> String {
1550168
    format!("collection.{:#}", collection)
1550168
}