1
use std::{
2
    collections::BTreeMap,
3
    marker::PhantomData,
4
    ops::{Deref, DerefMut},
5
};
6

            
7
use arc_bytes::serde::Bytes;
8
use async_trait::async_trait;
9
use futures::{future::BoxFuture, Future, FutureExt};
10
use serde::{Deserialize, Serialize};
11
#[cfg(feature = "multiuser")]
12
use zeroize::Zeroize;
13

            
14
#[cfg(feature = "multiuser")]
15
use crate::schema::NamedReference;
16
use crate::{
17
    document::{CollectionDocument, Document, Header, OwnedDocument},
18
    permissions::Permissions,
19
    schema::{
20
        self,
21
        view::{self, map::MappedDocuments},
22
        Key, Map, MappedValue, Schema, SchemaName, SerializedCollection,
23
    },
24
    transaction::{self, OperationResult, Transaction},
25
    Error,
26
};
27

            
28
/// Defines all interactions with a [`schema::Schema`], regardless of whether it
29
/// is local or remote.
30
///
31
/// ## Interacting with [`Collection`s](schema::Collection)
32
///
33
/// At its core, each document is just a unique ID and an array of bytes. The
34
/// low-level interface works with [`OwnedDocument`], which leaves you in charge
35
/// of deserializing data.
36
///
37
/// For most standard use cases, you will be happy to leverage
38
/// [Serde](https://serde.rs/) / [Transmog](https://github.com/khonsulabs/transmog) and
39
/// [`CollectionDocument<T>`][cd]/[`SerializedCollection`].
40
///
41
/// These examples all use this basic collection type definition:
42
///
43
/// ```rust
44
/// use bonsaidb_core::{
45
///     schema::{Collection, CollectionName, DefaultSerialization, Schematic},
46
///     Error,
47
/// };
48
/// use serde::{Deserialize, Serialize};
49
///
50
/// #[derive(Debug, Serialize, Deserialize, Default)]
51
/// pub struct MyCollection {
52
///     pub rank: u32,
53
///     pub score: f32,
54
/// }
55
///
56
/// impl Collection for MyCollection {
57
///     fn collection_name() -> CollectionName {
58
///         CollectionName::new("MyAuthority", "MyCollection")
59
///     }
60
///
61
///     fn define_views(schema: &mut Schematic) -> Result<(), Error> {
62
///         // ...
63
///         Ok(())
64
///     }
65
/// }
66
///
67
/// impl DefaultSerialization for MyCollection {}
68
/// ```
69
///
70
/// ### Using `Connection` with `OwnedDocument`
71
///
72
/// #### Inserting a document with an automatically assigned ID
73
///
74
/// ```rust
75
/// # bonsaidb_core::__doctest_prelude!();
76
/// # fn test_fn<C: Connection>(db: &C) -> Result<(), Error> {
77
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
78
/// let inserted_header = db.collection::<MyCollection>().push_bytes(vec![]).await?;
79
/// println!(
80
///     "Inserted id {} with revision {}",
81
///     inserted_header.id, inserted_header.revision
82
/// );
83
/// # Ok(())
84
/// # })
85
/// # }
86
/// ```
87
///
88
/// #### Inserting a document with a specific ID
89
///
90
/// ```rust
91
/// # bonsaidb_core::__doctest_prelude!();
92
/// # fn test_fn<C: Connection>(db: &C) -> Result<(), Error> {
93
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
94
/// let inserted_header = db
95
///     .collection::<MyCollection>()
96
///     .insert_bytes(42, vec![])
97
///     .await?;
98
/// println!(
99
///     "Inserted id {} with revision {}",
100
///     inserted_header.id, inserted_header.revision
101
/// );
102
/// # Ok(())
103
/// # })
104
/// # }
105
/// ```
106
///
107
/// #### Retrieving a document by ID
108
///
109
/// ```rust
110
/// # bonsaidb_core::__doctest_prelude!();
111
/// # fn test_fn<C: Connection>(db: &C) -> Result<(), Error> {
112
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
113
/// if let Some(doc) = db.collection::<MyCollection>().get(42).await? {
114
///     println!(
115
///         "Retrieved bytes {:?} with revision {}",
116
///         doc.contents, doc.header.revision
117
///     );
118
///     let deserialized = doc.contents::<MyCollection>()?;
119
///     println!("Deserialized contents: {:?}", deserialized);
120
/// }
121
/// # Ok(())
122
/// # })
123
/// # }
124
/// ```
125
///
126
/// #### Retreiving multiple documents by IDs
127
///
128
/// ```rust
129
/// # bonsaidb_core::__doctest_prelude!();
130
/// # fn test_fn<C: Connection>(db: &C) -> Result<(), Error> {
131
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
132
/// for doc in db
133
///     .collection::<MyCollection>()
134
///     .get_multiple(&[42, 43])
135
///     .await?
136
/// {
137
///     println!("Retrieved #{} with bytes {:?}", doc.header.id, doc.contents);
138
///     let deserialized = doc.contents::<MyCollection>()?;
139
///     println!("Deserialized contents: {:?}", deserialized);
140
/// }
141
/// # Ok(())
142
/// # })
143
/// # }
144
/// ```
145
///
146
/// #### Retreiving all documents
147
///
148
/// ```rust
149
/// # bonsaidb_core::__doctest_prelude!();
150
/// # fn test_fn<C: Connection>(db: &C) -> Result<(), Error> {
151
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
152
/// for doc in db.collection::<MyCollection>().list(..).await? {
153
///     println!("Retrieved #{} with bytes {:?}", doc.header.id, doc.contents);
154
///     let deserialized = doc.contents::<MyCollection>()?;
155
///     println!("Deserialized contents: {:?}", deserialized);
156
/// }
157
/// # Ok(())
158
/// # })
159
/// # }
160
/// ```
161
///
162
/// #### Listing a limited amount of documents in reverse order
163
///
164
/// ```rust
165
/// # bonsaidb_core::__doctest_prelude!();
166
/// # fn test_fn<C: Connection>(db: &C) -> Result<(), Error> {
167
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
168
/// for doc in db
169
///     .collection::<MyCollection>()
170
///     .list(..)
171
///     .descending()
172
///     .limit(20)
173
///     .await?
174
/// {
175
///     println!("Retrieved #{} with bytes {:?}", doc.header.id, doc.contents);
176
///     let deserialized = doc.contents::<MyCollection>()?;
177
///     println!("Deserialized contents: {:?}", deserialized);
178
/// }
179
/// # Ok(())
180
/// # })
181
/// # }
182
/// ```
183
///
184
/// ### Using `Connection` with `CollectionDocument<T>`
185
///
186
/// #### Inserting a document with an automatically assigned ID
187
///
188
/// ```rust
189
/// # bonsaidb_core::__doctest_prelude!();
190
/// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
191
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
192
/// let document = MyCollection::default().push_into(&db).await?;
193
/// println!(
194
///     "Inserted {:?} with id {} with revision {}",
195
///     document.contents, document.header.id, document.header.revision
196
/// );
197
/// # Ok(())
198
/// # })
199
/// # }
200
/// ```
201
///
202
/// #### Inserting a document with a specific ID
203
///
204
/// ```rust
205
/// # bonsaidb_core::__doctest_prelude!();
206
/// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
207
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
208
/// let document = MyCollection::default().insert_into(42, &db).await?;
209
/// println!(
210
///     "Inserted {:?} with id {} with revision {}",
211
///     document.contents, document.header.id, document.header.revision
212
/// );
213
/// # Ok(())
214
/// # })
215
/// # }
216
/// ```
217
///
218
/// #### Retrieving a document by ID
219
///
220
/// ```rust
221
/// # bonsaidb_core::__doctest_prelude!();
222
/// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
223
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
224
/// if let Some(doc) = MyCollection::get(42, &db).await? {
225
///     println!(
226
///         "Retrieved revision {} with deserialized contents: {:?}",
227
///         doc.header.revision, doc.contents
228
///     );
229
/// }
230
/// # Ok(())
231
/// # })
232
/// # }
233
/// ```
234
///
235
/// #### Retreiving multiple documents by IDs
236
///
237
/// ```rust
238
/// # bonsaidb_core::__doctest_prelude!();
239
/// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
240
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
241
/// for doc in MyCollection::get_multiple(&[42, 43], &db).await? {
242
///     println!(
243
///         "Retrieved #{} with deserialized contents: {:?}",
244
///         doc.header.id, doc.contents
245
///     );
246
/// }
247
/// # Ok(())
248
/// # })
249
/// # }
250
/// ```
251
///
252
/// #### Retreiving all documents
253
///
254
/// ```rust
255
/// # bonsaidb_core::__doctest_prelude!();
256
/// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
257
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
258
/// for doc in MyCollection::list(.., &db).await? {
259
///     println!(
260
///         "Retrieved #{} with deserialized contents: {:?}",
261
///         doc.header.id, doc.contents
262
///     );
263
/// }
264
/// # Ok(())
265
/// # })
266
/// # }
267
/// ```
268
///
269
/// #### Listing a limited amount of documents in reverse order
270
///
271
/// ```rust
272
/// # bonsaidb_core::__doctest_prelude!();
273
/// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
274
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
275
/// for doc in MyCollection::list(.., &db).descending().limit(20).await? {
276
///     println!(
277
///         "Retrieved #{} with deserialized contents: {:?}",
278
///         doc.header.id, doc.contents
279
///     );
280
/// }
281
/// # Ok(())
282
/// # })
283
/// # }
284
/// ```
285
///
286
/// ## Querying Views
287
///
288
/// The examples use this view definition:
289
///
290
/// ```rust
291
/// # mod collection {
292
/// # bonsaidb_core::__doctest_prelude!();
293
/// # }
294
/// # use collection::MyCollection;
295
/// use bonsaidb_core::{
296
///     define_basic_unique_mapped_view,
297
///     document::CollectionDocument,
298
///     schema::{
299
///         CollectionViewSchema, DefaultViewSerialization, Name, ReduceResult, View,
300
///         ViewMapResult, ViewMappedValue,
301
///     },
302
/// };
303
///
304
/// #[derive(Debug)]
305
/// pub struct ScoresByRank;
306
///
307
/// impl View for ScoresByRank {
308
///     type Collection = MyCollection;
309
///     type Key = u32;
310
///     type Value = f32;
311
///
312
///     fn name(&self) -> Name {
313
///         Name::new("scores-by-rank")
314
///     }
315
/// }
316
///
317
/// impl CollectionViewSchema for ScoresByRank {
318
///     type View = Self;
319
///     fn map(
320
///         &self,
321
///         document: CollectionDocument<<Self::View as View>::Collection>,
322
///     ) -> ViewMapResult<Self::View> {
323
///         Ok(document
324
///             .header
325
///             .emit_key_and_value(document.contents.rank, document.contents.score))
326
///     }
327
///
328
///     fn reduce(
329
///         &self,
330
///         mappings: &[ViewMappedValue<Self::View>],
331
///         rereduce: bool,
332
///     ) -> ReduceResult<Self::View> {
333
///         if mappings.is_empty() {
334
///             Ok(0.)
335
///         } else {
336
///             Ok(mappings.iter().map(|map| map.value).sum::<f32>() / mappings.len() as f32)
337
///         }
338
///     }
339
/// }
340
///
341
/// impl DefaultViewSerialization for ScoresByRank {}
342
/// ```
343
///
344
/// ### Retrieving all view entries
345
///
346
/// ```rust
347
/// # bonsaidb_core::__doctest_prelude!();
348
/// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
349
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
350
/// for mapping in db.view::<ScoresByRank>().query().await? {
351
///     println!(
352
///         "Mapping from #{} with rank: {} and score: {}",
353
///         mapping.source.id, mapping.key, mapping.value
354
///     );
355
/// }
356
/// # Ok(())
357
/// # })
358
/// # }
359
/// ```
360
///
361
/// ### Retrieving all mappings with the same key
362
///
363
/// ```rust
364
/// # bonsaidb_core::__doctest_prelude!();
365
/// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
366
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
367
/// for mapping in db.view::<ScoresByRank>().with_key(42).query().await? {
368
///     println!(
369
///         "Mapping from #{} with rank: {} and score: {}",
370
///         mapping.source.id, mapping.key, mapping.value
371
///     );
372
/// }
373
/// # Ok(())
374
/// # })
375
/// # }
376
/// ```
377
///
378
/// ### Retrieving all mappings with a range of keys
379
///
380
/// ```rust
381
/// # bonsaidb_core::__doctest_prelude!();
382
/// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
383
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
384
/// for mapping in db
385
///     .view::<ScoresByRank>()
386
///     .with_key_range(42..=44)
387
///     .query()
388
///     .await?
389
/// {
390
///     println!(
391
///         "Mapping from #{} with rank: {} and score: {}",
392
///         mapping.source.id, mapping.key, mapping.value
393
///     );
394
/// }
395
/// # Ok(())
396
/// # })
397
/// # }
398
/// ```
399
///
400
/// ### Retrieving the associated documents with a view query
401
///
402
/// With [`OwnedDocument`]:
403
///
404
/// ```rust
405
/// # bonsaidb_core::__doctest_prelude!();
406
/// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
407
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
408
/// for mapping in &db
409
///     .view::<ScoresByRank>()
410
///     .with_key_range(42..=44)
411
///     .query_with_docs()
412
///     .await?
413
/// {
414
///     println!(
415
///         "Mapping from #{} with rank: {} and score: {}. Document bytes: {:?}",
416
///         mapping.document.header.id, mapping.key, mapping.value, mapping.document.contents
417
///     );
418
/// }
419
/// # Ok(())
420
/// # })
421
/// # }
422
/// ```
423
///
424
/// With [`CollectionDocument<T>`][cd]:
425
///
426
/// ```rust
427
/// # bonsaidb_core::__doctest_prelude!();
428
/// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
429
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
430
/// for mapping in &db
431
///     .view::<ScoresByRank>()
432
///     .with_key_range(42..=44)
433
///     .query_with_collection_docs()
434
///     .await?
435
/// {
436
///     println!(
437
///         "Mapping from #{} with rank: {} and score: {}. Deserialized Contents: {:?}",
438
///         mapping.document.header.id, mapping.key, mapping.value, mapping.document.contents
439
///     );
440
/// }
441
/// # Ok(())
442
/// # })
443
/// # }
444
/// ```
445
///
446
/// ### Customizing view query parameters
447
///
448
/// ```rust
449
/// # bonsaidb_core::__doctest_prelude!();
450
/// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
451
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
452
/// for mapping in db
453
///     .view::<ScoresByRank>()
454
///     .with_key_range(42..=44)
455
///     .descending()
456
///     .limit(10)
457
///     .query()
458
///     .await?
459
/// {
460
///     println!(
461
///         "Mapping from #{} with rank: {} and score: {}",
462
///         mapping.source.id, mapping.key, mapping.value
463
///     );
464
/// }
465
/// # Ok(())
466
/// # })
467
/// # }
468
/// ```
469
///
470
/// ### Reducing a view to its value type
471
///
472
/// All of the ways of filtering a view can be used in conjunction with [`reduce()`](View::reduce()).
473
///
474
/// ```rust
475
/// # bonsaidb_core::__doctest_prelude!();
476
/// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
477
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
478
/// // score is an f32 in this example
479
/// let score = db.view::<ScoresByRank>().reduce().await?;
480
/// println!("Average score: {:3}", score);
481
/// # Ok(())
482
/// # })
483
/// # }
484
/// ```
485
///
486
/// ### Reducing a view to its value type, grouping by key
487
///
488
/// All of the ways of filtering a view can be used in conjunction with [`reduce()`](View::reduce()).
489
///
490
/// ```rust
491
/// # bonsaidb_core::__doctest_prelude!();
492
/// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
493
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
494
/// // score is an f32 in this example
495
/// for mapping in db.view::<ScoresByRank>().reduce_grouped().await? {
496
///     println!(
497
///         "Rank {} has an average score of {:3}",
498
///         mapping.key, mapping.value
499
///     );
500
/// }
501
/// # Ok(())
502
/// # })
503
/// # }
504
/// ```
505
///
506
/// [cd]: crate::document::CollectionDocument
507
#[async_trait]
508
pub trait Connection: Send + Sync {
509
    /// Accesses a collection for the connected [`schema::Schema`].
510
22682
    fn collection<C: schema::Collection>(&self) -> Collection<'_, Self, C>
511
22682
    where
512
22682
        Self: Sized,
513
22682
    {
514
22682
        Collection::new(self)
515
22682
    }
516

            
517
    /// Inserts a newly created document into the connected [`schema::Schema`]
518
    /// for the [`Collection`] `C`. If `id` is `None` a unique id will be
519
    /// generated. If an id is provided and a document already exists with that
520
    /// id, a conflict error will be returned.
521
26361
    async fn insert<C: schema::Collection, B: Into<Bytes> + Send>(
522
26361
        &self,
523
26361
        id: Option<u64>,
524
26361
        contents: B,
525
26361
    ) -> Result<Header, Error> {
526
26383
        let contents = contents.into();
527
26383
        let results = self
528
32542
            .apply_transaction(Transaction::insert(C::collection_name(), id, contents))
529
31777
            .await?;
530
25845
        if let OperationResult::DocumentUpdated { header, .. } = &results[0] {
531
25845
            Ok(header.clone())
532
        } else {
533
            unreachable!(
534
                "apply_transaction on a single insert should yield a single DocumentUpdated entry"
535
            )
536
        }
537
52744
    }
538

            
539
    /// Updates an existing document in the connected [`schema::Schema`] for the
540
    /// [`Collection`] `C`. Upon success, `doc.revision` will be updated with
541
    /// the new revision.
542
3928
    async fn update<'a, C: schema::Collection, D: Document<'a> + Send + Sync>(
543
3928
        &self,
544
3928
        doc: &mut D,
545
3928
    ) -> Result<(), Error> {
546
3928
        let results = self
547
3928
            .apply_transaction(Transaction::update(
548
3928
                C::collection_name(),
549
3928
                <D as Deref>::deref(doc).clone(),
550
3928
                <D as AsRef<[u8]>>::as_ref(doc).to_vec(),
551
6304
            ))
552
5898
            .await?;
553
3908
        if let Some(OperationResult::DocumentUpdated { header, .. }) = results.into_iter().next() {
554
3908
            *<D as DerefMut>::deref_mut(doc) = header;
555
3908
            Ok(())
556
        } else {
557
            unreachable!(
558
                "apply_transaction on a single update should yield a single DocumentUpdated entry"
559
            )
560
        }
561
7856
    }
562

            
563
    /// Retrieves a stored document from [`Collection`] `C` identified by `id`.
564
    async fn get<C: schema::Collection>(&self, id: u64) -> Result<Option<OwnedDocument>, Error>;
565

            
566
    /// Retrieves all documents matching `ids`. Documents that are not found
567
    /// are not returned, but no error will be generated.
568
    async fn get_multiple<C: schema::Collection>(
569
        &self,
570
        ids: &[u64],
571
    ) -> Result<Vec<OwnedDocument>, Error>;
572

            
573
    /// Retrieves all documents within the range of `ids`. Documents that are
574
    /// not found are not returned, but no error will be generated. To retrieve
575
    /// all documents, pass in `..` for `ids`.
576
    async fn list<C: schema::Collection, R: Into<Range<u64>> + Send>(
577
        &self,
578
        ids: R,
579
        order: Sort,
580
        limit: Option<usize>,
581
    ) -> Result<Vec<OwnedDocument>, Error>;
582

            
583
    /// Removes a `Document` from the database.
584
11873
    async fn delete<C: schema::Collection, H: AsRef<Header> + Send + Sync>(
585
11873
        &self,
586
11873
        doc: &H,
587
11873
    ) -> Result<(), Error> {
588
11873
        let results = self
589
11873
            .apply_transaction(Transaction::delete(
590
11873
                C::collection_name(),
591
11873
                doc.as_ref().clone(),
592
13018
            ))
593
12531
            .await?;
594
11873
        if let OperationResult::DocumentDeleted { .. } = &results[0] {
595
11873
            Ok(())
596
        } else {
597
            unreachable!(
598
                "apply_transaction on a single update should yield a single DocumentUpdated entry"
599
            )
600
        }
601
23746
    }
602

            
603
    /// Initializes [`View`] for [`schema::View`] `V`.
604
    #[must_use]
605
49168
    fn view<V: schema::SerializedView>(&'_ self) -> View<'_, Self, V>
606
49168
    where
607
49168
        Self: Sized,
608
49168
    {
609
49168
        View::new(self)
610
49168
    }
611

            
612
    /// Queries for view entries matching [`View`].
613
    #[must_use]
614
    async fn query<V: schema::SerializedView>(
615
        &self,
616
        key: Option<QueryKey<V::Key>>,
617
        order: Sort,
618
        limit: Option<usize>,
619
        access_policy: AccessPolicy,
620
    ) -> Result<Vec<Map<V::Key, V::Value>>, Error>
621
    where
622
        Self: Sized;
623

            
624
    /// Queries for view entries matching [`View`] with their source documents.
625
    #[must_use]
626
    async fn query_with_docs<V: schema::SerializedView>(
627
        &self,
628
        key: Option<QueryKey<V::Key>>,
629
        order: Sort,
630
        limit: Option<usize>,
631
        access_policy: AccessPolicy,
632
    ) -> Result<MappedDocuments<OwnedDocument, V>, Error>
633
    where
634
        Self: Sized;
635

            
636
    /// Queries for view entries matching [`View`] with their source documents, deserialized.
637
    #[must_use]
638
98
    async fn query_with_collection_docs<V>(
639
98
        &self,
640
98
        key: Option<QueryKey<V::Key>>,
641
98
        order: Sort,
642
98
        limit: Option<usize>,
643
98
        access_policy: AccessPolicy,
644
98
    ) -> Result<MappedDocuments<CollectionDocument<V::Collection>, V>, Error>
645
98
    where
646
98
        V: schema::SerializedView,
647
98
        V::Collection: SerializedCollection,
648
98
        <V::Collection as SerializedCollection>::Contents: std::fmt::Debug,
649
98
        Self: Sized,
650
98
    {
651
98
        let mapped_docs = self
652
174
            .query_with_docs::<V>(key, order, limit, access_policy)
653
174
            .await?;
654
98
        let mut collection_docs = BTreeMap::new();
655
199
        for (id, doc) in mapped_docs.documents {
656
101
            collection_docs.insert(id, CollectionDocument::<V::Collection>::try_from(&doc)?);
657
        }
658
98
        Ok(MappedDocuments {
659
98
            mappings: mapped_docs.mappings,
660
98
            documents: collection_docs,
661
98
        })
662
196
    }
663

            
664
    /// Reduces the view entries matching [`View`].
665
    #[must_use]
666
    async fn reduce<V: schema::SerializedView>(
667
        &self,
668
        key: Option<QueryKey<V::Key>>,
669
        access_policy: AccessPolicy,
670
    ) -> Result<V::Value, Error>
671
    where
672
        Self: Sized;
673

            
674
    /// Reduces the view entries matching [`View`], reducing the values by each
675
    /// unique key.
676
    #[must_use]
677
    async fn reduce_grouped<V: schema::SerializedView>(
678
        &self,
679
        key: Option<QueryKey<V::Key>>,
680
        access_policy: AccessPolicy,
681
    ) -> Result<Vec<MappedValue<V::Key, V::Value>>, Error>
682
    where
683
        Self: Sized;
684

            
685
    /// Deletes all of the documents associated with this view.
686
    #[must_use]
687
    async fn delete_docs<V: schema::SerializedView>(
688
        &self,
689
        key: Option<QueryKey<V::Key>>,
690
        access_policy: AccessPolicy,
691
    ) -> Result<u64, Error>
692
    where
693
        Self: Sized;
694

            
695
    /// Applies a [`Transaction`] to the [`schema::Schema`]. If any operation in the
696
    /// [`Transaction`] fails, none of the operations will be applied to the
697
    /// [`schema::Schema`].
698
    async fn apply_transaction(
699
        &self,
700
        transaction: Transaction,
701
    ) -> Result<Vec<OperationResult>, Error>;
702

            
703
    /// Lists executed [`Transaction`]s from this [`schema::Schema`]. By default, a maximum of
704
    /// 1000 entries will be returned, but that limit can be overridden by
705
    /// setting `result_limit`. A hard limit of 100,000 results will be
706
    /// returned. To begin listing after another known `transaction_id`, pass
707
    /// `transaction_id + 1` into `starting_id`.
708
    async fn list_executed_transactions(
709
        &self,
710
        starting_id: Option<u64>,
711
        result_limit: Option<usize>,
712
    ) -> Result<Vec<transaction::Executed>, Error>;
713

            
714
    /// Fetches the last transaction id that has been committed, if any.
715
    async fn last_transaction_id(&self) -> Result<Option<u64>, Error>;
716

            
717
    /// Compacts the entire database to reclaim unused disk space.
718
    ///
719
    /// This process is done by writing data to a new file and swapping the file
720
    /// once the process completes. This ensures that if a hardware failure,
721
    /// power outage, or crash occurs that the original collection data is left
722
    /// untouched.
723
    ///
724
    /// ## Errors
725
    ///
726
    /// * [`Error::Io`]: an error occurred while compacting the database.
727
    async fn compact(&self) -> Result<(), crate::Error>;
728

            
729
    /// Compacts the collection to reclaim unused disk space.
730
    ///
731
    /// This process is done by writing data to a new file and swapping the file
732
    /// once the process completes. This ensures that if a hardware failure,
733
    /// power outage, or crash occurs that the original collection data is left
734
    /// untouched.
735
    ///
736
    /// ## Errors
737
    ///
738
    /// * [`Error::CollectionNotFound`]: database `name` does not exist.
739
    /// * [`Error::Io`]: an error occurred while compacting the database.
740
    async fn compact_collection<C: schema::Collection>(&self) -> Result<(), crate::Error>;
741

            
742
    /// Compacts the key value store to reclaim unused disk space.
743
    ///
744
    /// This process is done by writing data to a new file and swapping the file
745
    /// once the process completes. This ensures that if a hardware failure,
746
    /// power outage, or crash occurs that the original collection data is left
747
    /// untouched.
748
    ///
749
    /// ## Errors
750
    ///
751
    /// * [`Error::Io`]: an error occurred while compacting the database.
752
    async fn compact_key_value_store(&self) -> Result<(), crate::Error>;
753
}
754

            
755
/// Interacts with a collection over a `Connection`.
756
pub struct Collection<'a, Cn, Cl> {
757
    connection: &'a Cn,
758
    _phantom: PhantomData<Cl>, /* allows for extension traits to be written for collections of specific types */
759
}
760

            
761
impl<'a, Cn, Cl> Clone for Collection<'a, Cn, Cl> {
762
    fn clone(&self) -> Self {
763
        Self {
764
            connection: self.connection,
765
            _phantom: PhantomData,
766
        }
767
    }
768
}
769

            
770
impl<'a, Cn, Cl> Collection<'a, Cn, Cl>
771
where
772
    Cn: Connection,
773
    Cl: schema::Collection,
774
{
775
    /// Creates a new instance using `connection`.
776
22682
    pub fn new(connection: &'a Cn) -> Self {
777
22682
        Self {
778
22682
            connection,
779
22682
            _phantom: PhantomData::default(),
780
22682
        }
781
22682
    }
782

            
783
    /// Adds a new `Document<Cl>` with the contents `item`.
784
24842
    pub async fn push(
785
24842
        &self,
786
24842
        item: &<Cl as SerializedCollection>::Contents,
787
24842
    ) -> Result<Header, crate::Error>
788
24842
    where
789
24842
        Cl: schema::SerializedCollection,
790
24842
    {
791
24864
        let contents = Cl::serialize(item)?;
792
27637
        Ok(self.push_bytes(contents).await?)
793
24864
    }
794

            
795
    /// Adds a new `Document<Cl>` with the `contents`.
796
24820
    pub async fn push_bytes<B: Into<Bytes> + Send>(
797
24820
        &self,
798
24820
        contents: B,
799
24820
    ) -> Result<Header, crate::Error>
800
24820
    where
801
24820
        Cl: schema::SerializedCollection,
802
24820
    {
803
27637
        Ok(self.connection.insert::<Cl, B>(None, contents).await?)
804
24864
    }
805

            
806
    /// Adds a new `Document<Cl>` with the given `id` and contents `item`.
807
1010
    pub async fn insert(
808
1010
        &self,
809
1010
        id: u64,
810
1010
        item: &<Cl as SerializedCollection>::Contents,
811
1010
    ) -> Result<Header, crate::Error>
812
1010
    where
813
1010
        Cl: schema::SerializedCollection,
814
1010
    {
815
1010
        let contents = Cl::serialize(item)?;
816
3225
        Ok(self.connection.insert::<Cl, _>(Some(id), contents).await?)
817
1010
    }
818

            
819
    /// Adds a new `Document<Cl>` with the the given `id` and `contents`.
820
    pub async fn insert_bytes<B: Into<Bytes> + Send>(
821
        &self,
822
        id: u64,
823
        contents: B,
824
    ) -> Result<Header, crate::Error>
825
    where
826
        Cl: schema::SerializedCollection,
827
    {
828
        Ok(self.connection.insert::<Cl, B>(Some(id), contents).await?)
829
    }
830

            
831
    /// Retrieves a `Document<Cl>` with `id` from the connection.
832
1537
    pub async fn get(&self, id: u64) -> Result<Option<OwnedDocument>, Error> {
833
4524
        self.connection.get::<Cl>(id).await
834
1537
    }
835

            
836
    /// Retrieves all documents matching `ids`. Documents that are not found
837
    /// are not returned, but no error will be generated.
838
8
    pub async fn get_multiple(&self, ids: &[u64]) -> Result<Vec<OwnedDocument>, Error> {
839
8
        self.connection.get_multiple::<Cl>(ids).await
840
8
    }
841

            
842
    /// Retrieves all documents matching `ids`. Documents that are not found
843
    /// are not returned, but no error will be generated.
844
    pub fn list<R: Into<Range<u64>>>(&'a self, ids: R) -> List<'a, Cn, Cl> {
845
        List::new(PossiblyOwned::Borrowed(self), ids.into())
846
    }
847

            
848
    /// Removes a `Document` from the database.
849
806
    pub async fn delete<H: AsRef<Header> + Send + Sync>(&self, doc: &H) -> Result<(), Error> {
850
1972
        self.connection.delete::<Cl, H>(doc).await
851
806
    }
852
}
853

            
854
pub(crate) struct ListBuilder<'a, Cn, Cl> {
855
    collection: PossiblyOwned<'a, Collection<'a, Cn, Cl>>,
856
    range: Range<u64>,
857
    sort: Sort,
858
    limit: Option<usize>,
859
}
860

            
861
pub(crate) enum PossiblyOwned<'a, Cl> {
862
    Owned(Cl),
863
    Borrowed(&'a Cl),
864
}
865

            
866
impl<'a, Cl> Deref for PossiblyOwned<'a, Cl> {
867
    type Target = Cl;
868

            
869
12
    fn deref(&self) -> &Self::Target {
870
12
        match self {
871
12
            PossiblyOwned::Owned(value) => value,
872
            PossiblyOwned::Borrowed(value) => value,
873
        }
874
12
    }
875
}
876

            
877
pub(crate) enum ListState<'a, Cn, Cl> {
878
    Pending(Option<ListBuilder<'a, Cn, Cl>>),
879
    Executing(BoxFuture<'a, Result<Vec<OwnedDocument>, Error>>),
880
}
881

            
882
/// Executes [`Connection::list()`] when awaited. Also offers methods to
883
/// customize the options for the operation.
884
#[must_use]
885
pub struct List<'a, Cn, Cl> {
886
    state: ListState<'a, Cn, Cl>,
887
}
888

            
889
impl<'a, Cn, Cl> List<'a, Cn, Cl> {
890
12
    pub(crate) const fn new(
891
12
        collection: PossiblyOwned<'a, Collection<'a, Cn, Cl>>,
892
12
        range: Range<u64>,
893
12
    ) -> Self {
894
12
        Self {
895
12
            state: ListState::Pending(Some(ListBuilder {
896
12
                collection,
897
12
                range,
898
12
                sort: Sort::Ascending,
899
12
                limit: None,
900
12
            })),
901
12
        }
902
12
    }
903

            
904
8
    fn builder(&mut self) -> &mut ListBuilder<'a, Cn, Cl> {
905
8
        if let ListState::Pending(Some(builder)) = &mut self.state {
906
8
            builder
907
        } else {
908
            unreachable!("Attempted to use after retrieving the result")
909
        }
910
8
    }
911

            
912
    /// Lists documents by id in ascending order.
913
    pub fn ascending(mut self) -> Self {
914
        self.builder().sort = Sort::Ascending;
915
        self
916
    }
917

            
918
    /// Lists documents by id in descending order.
919
4
    pub fn descending(mut self) -> Self {
920
4
        self.builder().sort = Sort::Descending;
921
4
        self
922
4
    }
923

            
924
    /// Sets the maximum number of results to return.
925
4
    pub fn limit(mut self, maximum_results: usize) -> Self {
926
4
        self.builder().limit = Some(maximum_results);
927
4
        self
928
4
    }
929
}
930

            
931
impl<'a, Cn, Cl> Future for List<'a, Cn, Cl>
932
where
933
    Cn: Connection,
934
    Cl: schema::Collection + Unpin,
935
{
936
    type Output = Result<Vec<OwnedDocument>, Error>;
937

            
938
36
    fn poll(
939
36
        mut self: std::pin::Pin<&mut Self>,
940
36
        cx: &mut std::task::Context<'_>,
941
36
    ) -> std::task::Poll<Self::Output> {
942
36
        match &mut self.state {
943
24
            ListState::Executing(future) => future.as_mut().poll(cx),
944
12
            ListState::Pending(builder) => {
945
12
                let ListBuilder {
946
12
                    collection,
947
12
                    range,
948
12
                    sort,
949
12
                    limit,
950
12
                } = builder.take().unwrap();
951
12

            
952
12
                let future = async move {
953
12
                    collection
954
12
                        .connection
955
12
                        .list::<Cl, _>(range, sort, limit)
956
12
                        .await
957
12
                }
958
12
                .boxed();
959
12

            
960
12
                self.state = ListState::Executing(future);
961
12
                self.poll(cx)
962
            }
963
        }
964
36
    }
965
}
966

            
967
/// Parameters to query a `schema::View`.
968
pub struct View<'a, Cn, V: schema::SerializedView> {
969
    connection: &'a Cn,
970

            
971
    /// Key filtering criteria.
972
    pub key: Option<QueryKey<V::Key>>,
973

            
974
    /// The view's data access policy. The default value is [`AccessPolicy::UpdateBefore`].
975
    pub access_policy: AccessPolicy,
976

            
977
    /// The sort order of the query.
978
    pub sort: Sort,
979

            
980
    /// The maximum number of results to return.
981
    pub limit: Option<usize>,
982
}
983

            
984
impl<'a, Cn, V> View<'a, Cn, V>
985
where
986
    V: schema::SerializedView,
987
    Cn: Connection,
988
{
989
49169
    fn new(connection: &'a Cn) -> Self {
990
49169
        Self {
991
49169
            connection,
992
49169
            key: None,
993
49169
            access_policy: AccessPolicy::UpdateBefore,
994
49169
            sort: Sort::Ascending,
995
49169
            limit: None,
996
49169
        }
997
49169
    }
998

            
999
    /// Filters for entries in the view with `key`.
    #[must_use]
47071
    pub fn with_key(mut self, key: V::Key) -> Self {
47071
        self.key = Some(QueryKey::Matches(key));
47071
        self
47071
    }

            
    /// Filters for entries in the view with `keys`.
    #[must_use]
9
    pub fn with_keys<IntoIter: IntoIterator<Item = V::Key>>(mut self, keys: IntoIter) -> Self {
9
        self.key = Some(QueryKey::Multiple(keys.into_iter().collect()));
9
        self
9
    }

            
    /// Filters for entries in the view with the range `keys`.
    #[must_use]
11
    pub fn with_key_range<R: Into<Range<V::Key>>>(mut self, range: R) -> Self {
11
        self.key = Some(QueryKey::Range(range.into()));
11
        self
11
    }

            
    /// Sets the access policy for queries.
11444
    pub fn with_access_policy(mut self, policy: AccessPolicy) -> Self {
11444
        self.access_policy = policy;
11444
        self
11444
    }

            
    /// Queries the view in ascending order.
    pub fn ascending(mut self) -> Self {
        self.sort = Sort::Ascending;
        self
    }

            
    /// Queries the view in descending order.
4
    pub fn descending(mut self) -> Self {
4
        self.sort = Sort::Descending;
4
        self
4
    }

            
    /// Sets the maximum number of results to return.
4
    pub fn limit(mut self, maximum_results: usize) -> Self {
4
        self.limit = Some(maximum_results);
4
        self
4
    }

            
    /// Executes the query and retrieves the results.
31470
    pub async fn query(self) -> Result<Vec<Map<V::Key, V::Value>>, Error> {
31448
        self.connection
31448
            .query::<V>(self.key, self.sort, self.limit, self.access_policy)
29376
            .await
31470
    }

            
    /// Executes the query and retrieves the results with the associated [`Document`s](crate::document::OwnedDocument).
5930
    pub async fn query_with_docs(self) -> Result<MappedDocuments<OwnedDocument, V>, Error> {
5930
        self.connection
5991
            .query_with_docs::<V>(self.key, self.sort, self.limit, self.access_policy)
5954
            .await
5929
    }

            
    /// Executes the query and retrieves the results with the associated [`CollectionDocument`s](crate::document::CollectionDocument).
98
    pub async fn query_with_collection_docs(
98
        self,
98
    ) -> Result<MappedDocuments<CollectionDocument<V::Collection>, V>, Error>
98
    where
98
        V::Collection: SerializedCollection,
98
        <V::Collection as SerializedCollection>::Contents: std::fmt::Debug,
98
    {
98
        self.connection
174
            .query_with_collection_docs::<V>(self.key, self.sort, self.limit, self.access_policy)
174
            .await
98
    }

            
    /// Executes a reduce over the results of the query
11657
    pub async fn reduce(self) -> Result<V::Value, Error> {
11657
        self.connection
11658
            .reduce::<V>(self.key, self.access_policy)
7848
            .await
11657
    }

            
    /// Executes a reduce over the results of the query
10
    pub async fn reduce_grouped(self) -> Result<Vec<MappedValue<V::Key, V::Value>>, Error> {
10
        self.connection
13
            .reduce_grouped::<V>(self.key, self.access_policy)
11
            .await
10
    }

            
    /// Deletes all of the associated documents that match this view query.
4
    pub async fn delete_docs(self) -> Result<u64, Error> {
4
        self.connection
6
            .delete_docs::<V>(self.key, self.access_policy)
6
            .await
4
    }
}

            
/// A sort order.
7904
#[derive(Clone, Copy, Serialize, Deserialize, Debug)]
pub enum Sort {
    /// Sort ascending (A -> Z).
    Ascending,
    /// Sort descending (Z -> A).
    Descending,
}

            
/// Filters a [`View`] by key.
23396
#[derive(Clone, Serialize, Deserialize, Debug)]
pub enum QueryKey<K> {
    /// Matches all entries with the key provided.
    Matches(K),

            
    /// Matches all entires with keys in the range provided.
    Range(Range<K>),

            
    /// Matches all entries that have keys that are included in the set provided.
    Multiple(Vec<K>),
}

            
#[allow(clippy::use_self)] // clippy is wrong, Self is different because of generic parameters
impl<K: for<'a> Key<'a>> QueryKey<K> {
    /// Converts this key to a serialized format using the [`Key`] trait.
176345
    pub fn serialized(&self) -> Result<QueryKey<Bytes>, Error> {
176345
        match self {
176325
            Self::Matches(key) => key
176325
                .as_big_endian_bytes()
176325
                .map_err(|err| Error::Database(view::Error::key_serialization(err).to_string()))
176325
                .map(|v| QueryKey::Matches(Bytes::from(v.to_vec()))),
11
            Self::Range(range) => Ok(QueryKey::Range(range.as_big_endian_bytes().map_err(
11
                |err| Error::Database(view::Error::key_serialization(err).to_string()),
11
            )?)),
9
            Self::Multiple(keys) => {
9
                let keys = keys
9
                    .iter()
18
                    .map(|key| {
18
                        key.as_big_endian_bytes()
18
                            .map(|key| Bytes::from(key.to_vec()))
18
                            .map_err(|err| {
                                Error::Database(view::Error::key_serialization(err).to_string())
18
                            })
18
                    })
9
                    .collect::<Result<Vec<_>, Error>>()?;

            
9
                Ok(QueryKey::Multiple(keys))
            }
        }
176345
    }
}

            
#[allow(clippy::use_self)] // clippy is wrong, Self is different because of generic parameters
impl<'a, T> QueryKey<T>
where
    T: AsRef<[u8]>,
{
    /// Deserializes the bytes into `K` via the [`Key`] trait.
    pub fn deserialized<K: for<'k> Key<'k>>(&self) -> Result<QueryKey<K>, Error> {
        match self {
            Self::Matches(key) => K::from_big_endian_bytes(key.as_ref())
                .map_err(|err| Error::Database(view::Error::key_serialization(err).to_string()))
                .map(QueryKey::Matches),
            Self::Range(range) => Ok(QueryKey::Range(range.deserialize().map_err(|err| {
                Error::Database(view::Error::key_serialization(err).to_string())
            })?)),
            Self::Multiple(keys) => {
                let keys = keys
                    .iter()
                    .map(|key| {
                        K::from_big_endian_bytes(key.as_ref()).map_err(|err| {
                            Error::Database(view::Error::key_serialization(err).to_string())
                        })
                    })
                    .collect::<Result<Vec<_>, Error>>()?;

            
                Ok(QueryKey::Multiple(keys))
            }
        }
    }
}

            
/// A range type that can represent all std range types and be serialized.
20
#[derive(Serialize, Deserialize, Debug, Copy, Clone, Eq, PartialEq)]
pub struct Range<T> {
    /// The start of the range.
    pub start: Bound<T>,
    /// The end of the range.
    pub end: Bound<T>,
}

            
/// A range bound that can be serialized.
40
#[derive(Serialize, Deserialize, Debug, Copy, Clone, Eq, PartialEq)]
pub enum Bound<T> {
    /// No bound.
    Unbounded,
    /// Bounded by the contained value (inclusive).
    Included(T),
    /// Bounded by the contained value (exclusive).
    Excluded(T),
}

            
impl<T> Range<T> {
    /// Maps each contained value with the function provided.
    pub fn map<U, F: Fn(T) -> U>(self, map: F) -> Range<U> {
        Range {
            start: self.start.map(&map),
            end: self.end.map(&map),
        }
    }

            
    /// Maps each contained value as a reference.
200
    pub fn map_ref<U: ?Sized, F: Fn(&T) -> &U>(&self, map: F) -> Range<&U> {
200
        Range {
200
            start: self.start.map_ref(&map),
200
            end: self.end.map_ref(&map),
200
        }
200
    }
}

            
impl<'a, T: Key<'a>> Range<T> {
    /// Serializes the range's contained values to big-endian bytes.
211
    pub fn as_big_endian_bytes(&'a self) -> Result<Range<Bytes>, T::Error> {
211
        Ok(Range {
211
            start: self.start.as_big_endian_bytes()?,
211
            end: self.end.as_big_endian_bytes()?,
        })
211
    }
}

            
impl<'a, B> Range<B>
where
    B: AsRef<[u8]>,
{
    /// Deserializes the range's contained values from big-endian bytes.
    pub fn deserialize<T: for<'k> Key<'k>>(&'a self) -> Result<Range<T>, <T as Key<'_>>::Error> {
        Ok(Range {
            start: self.start.deserialize()?,
            end: self.start.deserialize()?,
        })
    }
}

            
impl<T> Bound<T> {
    /// Maps the contained value, if any, and returns the resulting `Bound`.
    pub fn map<U, F: Fn(T) -> U>(self, map: F) -> Bound<U> {
        match self {
            Bound::Unbounded => Bound::Unbounded,
            Bound::Included(value) => Bound::Included(map(value)),
            Bound::Excluded(value) => Bound::Excluded(map(value)),
        }
    }

            
    /// Maps each contained value as a reference.
400
    pub fn map_ref<U: ?Sized, F: Fn(&T) -> &U>(&self, map: F) -> Bound<&U> {
400
        match self {
            Bound::Unbounded => Bound::Unbounded,
334
            Bound::Included(value) => Bound::Included(map(value)),
66
            Bound::Excluded(value) => Bound::Excluded(map(value)),
        }
400
    }
}

            
impl<'a, T: Key<'a>> Bound<T> {
    /// Serializes the contained value to big-endian bytes.
422
    pub fn as_big_endian_bytes(&'a self) -> Result<Bound<Bytes>, T::Error> {
422
        match self {
            Bound::Unbounded => Ok(Bound::Unbounded),
353
            Bound::Included(value) => Ok(Bound::Included(Bytes::from(
353
                value.as_big_endian_bytes()?.to_vec(),
            ))),
69
            Bound::Excluded(value) => Ok(Bound::Excluded(Bytes::from(
69
                value.as_big_endian_bytes()?.to_vec(),
            ))),
        }
422
    }
}

            
impl<'a, B> Bound<B>
where
    B: AsRef<[u8]>,
{
    /// Deserializes the bound's contained value from big-endian bytes.
    pub fn deserialize<T: for<'k> Key<'k>>(&'a self) -> Result<Bound<T>, <T as Key<'_>>::Error> {
        match self {
            Bound::Unbounded => Ok(Bound::Unbounded),
            Bound::Included(value) => {
                Ok(Bound::Included(T::from_big_endian_bytes(value.as_ref())?))
            }
            Bound::Excluded(value) => {
                Ok(Bound::Excluded(T::from_big_endian_bytes(value.as_ref())?))
            }
        }
    }
}

            
impl<T> std::ops::RangeBounds<T> for Range<T> {
200479
    fn start_bound(&self) -> std::ops::Bound<&T> {
200479
        std::ops::Bound::from(&self.start)
200479
    }

            
177185
    fn end_bound(&self) -> std::ops::Bound<&T> {
177185
        std::ops::Bound::from(&self.end)
177185
    }
}

            
impl<'a, T> From<&'a Bound<T>> for std::ops::Bound<&'a T> {
377664
    fn from(bound: &'a Bound<T>) -> Self {
377664
        match bound {
289543
            Bound::Unbounded => std::ops::Bound::Unbounded,
86293
            Bound::Included(value) => std::ops::Bound::Included(value),
1828
            Bound::Excluded(value) => std::ops::Bound::Excluded(value),
        }
377664
    }
}

            
impl<T> From<std::ops::Range<T>> for Range<T> {
7
    fn from(range: std::ops::Range<T>) -> Self {
7
        Self {
7
            start: Bound::Included(range.start),
7
            end: Bound::Excluded(range.end),
7
        }
7
    }
}

            
impl<T> From<std::ops::RangeFrom<T>> for Range<T> {
11826
    fn from(range: std::ops::RangeFrom<T>) -> Self {
11826
        Self {
11826
            start: Bound::Included(range.start),
11826
            end: Bound::Unbounded,
11826
        }
11826
    }
}

            
impl<T> From<std::ops::RangeTo<T>> for Range<T> {
    fn from(range: std::ops::RangeTo<T>) -> Self {
        Self {
            start: Bound::Unbounded,
            end: Bound::Excluded(range.end),
        }
    }
}

            
impl<T: Clone> From<std::ops::RangeInclusive<T>> for Range<T> {
16
    fn from(range: std::ops::RangeInclusive<T>) -> Self {
16
        Self {
16
            start: Bound::Included(range.start().clone()),
16
            end: Bound::Included(range.end().clone()),
16
        }
16
    }
}

            
impl<T> From<std::ops::RangeToInclusive<T>> for Range<T> {
    fn from(range: std::ops::RangeToInclusive<T>) -> Self {
        Self {
            start: Bound::Unbounded,
            end: Bound::Included(range.end),
        }
    }
}

            
impl<T> From<std::ops::RangeFull> for Range<T> {
11516
    fn from(_: std::ops::RangeFull) -> Self {
11516
        Self {
11516
            start: Bound::Unbounded,
11516
            end: Bound::Unbounded,
11516
        }
11516
    }
}

            
/// Changes how the view's outdated data will be treated.
23430
#[derive(Clone, Serialize, Deserialize, Debug)]
pub enum AccessPolicy {
    /// Update any changed documents before returning a response.
    UpdateBefore,

            
    /// Return the results, which may be out-of-date, and start an update job in
    /// the background. This pattern is useful when you want to ensure you
    /// provide consistent response times while ensuring the database is
    /// updating in the background.
    UpdateAfter,

            
    /// Returns the results, which may be out-of-date, and do not start any
    /// background jobs. This mode is useful if you're using a view as a cache
    /// and have a background process that is responsible for controlling when
    /// data is refreshed and updated. While the default `UpdateBefore`
    /// shouldn't have much overhead, this option removes all overhead related
    /// to view updating from the query.
    NoUpdate,
}

            
/// Functions for interacting with a multi-database `BonsaiDb` instance.
#[async_trait]
pub trait StorageConnection: Send + Sync {
    /// The type that represents a database for this implementation.
    type Database: Connection;

            
    /// Creates a database named `name` with the `Schema` provided.
    ///
    /// ## Errors
    ///
    /// * [`Error::InvalidDatabaseName`]: `name` must begin with an alphanumeric
    ///   character (`[a-zA-Z0-9]`), and all remaining characters must be
    ///   alphanumeric, a period (`.`), or a hyphen (`-`).
    /// * [`Error::DatabaseNameAlreadyTaken`]: `name` was already used for a
    ///   previous database name. Database names are case insensitive. Returned
    ///   if `only_if_needed` is false.
899
    async fn create_database<DB: Schema>(
899
        &self,
899
        name: &str,
899
        only_if_needed: bool,
899
    ) -> Result<(), crate::Error> {
2371
        self.create_database_with_schema(name, DB::schema_name(), only_if_needed)
2369
            .await
1797
    }

            
    /// Returns a reference to database `name` with schema `DB`.
    async fn database<DB: Schema>(&self, name: &str) -> Result<Self::Database, crate::Error>;

            
    /// Creates a database named `name` using the [`SchemaName`] `schema`.
    ///
    /// ## Errors
    ///
    /// * [`Error::InvalidDatabaseName`]: `name` must begin with an alphanumeric
    ///   character (`[a-zA-Z0-9]`), and all remaining characters must be
    ///   alphanumeric, a period (`.`), or a hyphen (`-`).
    /// * [`Error::DatabaseNameAlreadyTaken`]: `name` was already used for a
    ///   previous database name. Database names are case insensitive. Returned
    ///   if `only_if_needed` is false.
    async fn create_database_with_schema(
        &self,
        name: &str,
        schema: SchemaName,
        only_if_needed: bool,
    ) -> Result<(), crate::Error>;

            
    /// Deletes a database named `name`.
    ///
    /// ## Errors
    ///
    /// * [`Error::DatabaseNotFound`]: database `name` does not exist.
    /// * [`Error::Io`]: an error occurred while deleting files.
    async fn delete_database(&self, name: &str) -> Result<(), crate::Error>;

            
    /// Lists the databases in this storage.
    async fn list_databases(&self) -> Result<Vec<Database>, crate::Error>;

            
    /// Lists the [`SchemaName`]s registered with this storage.
    async fn list_available_schemas(&self) -> Result<Vec<SchemaName>, crate::Error>;

            
    /// Creates a user.
    #[cfg(feature = "multiuser")]
    async fn create_user(&self, username: &str) -> Result<u64, crate::Error>;

            
    /// Sets a user's password.
    #[cfg(feature = "password-hashing")]
    async fn set_user_password<'user, U: Into<NamedReference<'user>> + Send + Sync>(
        &self,
        user: U,
        password: SensitiveString,
    ) -> Result<(), crate::Error>;

            
    /// Authenticates as a user with a authentication method.
    #[cfg(all(feature = "multiuser", feature = "password-hashing"))]
    async fn authenticate<'user, U: Into<NamedReference<'user>> + Send + Sync>(
        &self,
        user: U,
        authentication: Authentication,
    ) -> Result<Authenticated, crate::Error>;

            
    /// Adds a user to a permission group.
    #[cfg(feature = "multiuser")]
    async fn add_permission_group_to_user<
        'user,
        'group,
        U: Into<NamedReference<'user>> + Send + Sync,
        G: Into<NamedReference<'group>> + Send + Sync,
    >(
        &self,
        user: U,
        permission_group: G,
    ) -> Result<(), crate::Error>;

            
    /// Removes a user from a permission group.
    #[cfg(feature = "multiuser")]
    async fn remove_permission_group_from_user<
        'user,
        'group,
        U: Into<NamedReference<'user>> + Send + Sync,
        G: Into<NamedReference<'group>> + Send + Sync,
    >(
        &self,
        user: U,
        permission_group: G,
    ) -> Result<(), crate::Error>;

            
    /// Adds a user to a permission group.
    #[cfg(feature = "multiuser")]
    async fn add_role_to_user<
        'user,
        'role,
        U: Into<NamedReference<'user>> + Send + Sync,
        R: Into<NamedReference<'role>> + Send + Sync,
    >(
        &self,
        user: U,
        role: R,
    ) -> Result<(), crate::Error>;

            
    /// Removes a user from a permission group.
    #[cfg(feature = "multiuser")]
    async fn remove_role_from_user<
        'user,
        'role,
        U: Into<NamedReference<'user>> + Send + Sync,
        R: Into<NamedReference<'role>> + Send + Sync,
    >(
        &self,
        user: U,
        role: R,
    ) -> Result<(), crate::Error>;
}

            
/// A database stored in `BonsaiDb`.
1288
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
pub struct Database {
    /// The name of the database.
    pub name: String,
    /// The schema defining the database.
    pub schema: SchemaName,
}

            
/// A plain-text password. This struct automatically overwrites the password
/// with zeroes when dropped.
#[cfg(feature = "multiuser")]
1748
#[derive(Clone, Serialize, Deserialize, Zeroize)]
#[zeroize(drop)]
#[serde(transparent)]
pub struct SensitiveString(pub String);

            
#[cfg(feature = "multiuser")]
impl std::fmt::Debug for SensitiveString {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.write_str("SensitiveString(...)")
    }
}

            
#[cfg(feature = "multiuser")]
impl Deref for SensitiveString {
    type Target = String;

            
230
    fn deref(&self) -> &Self::Target {
230
        &self.0
230
    }
}

            
/// User authentication methods.
10
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum Authentication {
    /// Authenticate using a password.
    #[cfg(feature = "password-hashing")]
    Password(crate::connection::SensitiveString),
}

            
/// Information about the authenticated session.
10
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Authenticated {
    /// The user id logged in as.
    pub user_id: u64,
    /// The effective permissions granted.
    pub permissions: Permissions,
}

            
#[doc(hidden)]
#[macro_export]
macro_rules! __doctest_prelude {
    () => {
        use bonsaidb_core::{
            connection::Connection,
            define_basic_unique_mapped_view,
            document::{CollectionDocument, Document, OwnedDocument},
            schema::{
                Collection, CollectionName, CollectionViewSchema, DefaultSerialization,
                DefaultViewSerialization, Name, NamedCollection, ReduceResult, Schema, SchemaName,
                Schematic, SerializedCollection, View, ViewMapResult, ViewMappedValue,
            },
            Error,
        };
        use serde::{Deserialize, Serialize};

            
        #[derive(Debug)]
        pub struct MySchema;

            
        impl Schema for MySchema {
            fn schema_name() -> SchemaName {
                SchemaName::new("MyAuthority", "MySchema")
            }

            
            fn define_collections(schema: &mut Schematic) -> Result<(), Error> {
                Ok(())
            }
        }

            
        #[derive(Debug, Serialize, Deserialize, Default)]
        pub struct MyCollection {
            pub name: String,
            pub rank: u32,
            pub score: f32,
        }

            
        impl MyCollection {
            pub fn named(s: impl Into<String>) -> Self {
                Self::new(s, 0, 0.)
            }

            
            pub fn new(s: impl Into<String>, rank: u32, score: f32) -> Self {
                Self {
                    name: s.into(),
                    rank,
                    score,
                }
            }
        }

            
        impl Collection for MyCollection {
            fn collection_name() -> CollectionName {
                CollectionName::new("MyAuthority", "MyCollection")
            }

            
            fn define_views(schema: &mut Schematic) -> Result<(), bonsaidb_core::Error> {
                schema.define_view(MyCollectionByName)?;
                Ok(())
            }
        }

            
        impl NamedCollection for MyCollection {
            type ByNameView = MyCollectionByName;
        }

            
        impl DefaultSerialization for MyCollection {}

            
        #[derive(Debug)]
        pub struct ScoresByRank;

            
        impl View for ScoresByRank {
            type Collection = MyCollection;
            type Key = u32;
            type Value = f32;

            
            fn name(&self) -> Name {
                Name::new("scores-by-rank")
            }
        }

            
        impl CollectionViewSchema for ScoresByRank {
            type View = Self;
            fn map(
                &self,
                document: CollectionDocument<<Self::View as View>::Collection>,
            ) -> ViewMapResult<Self::View> {
                Ok(document
                    .header
                    .emit_key_and_value(document.contents.rank, document.contents.score))
            }

            
            fn reduce(
                &self,
                mappings: &[ViewMappedValue<Self::View>],
                rereduce: bool,
            ) -> ReduceResult<Self::View> {
                if mappings.is_empty() {
                    Ok(0.)
                } else {
                    Ok(mappings.iter().map(|map| map.value).sum::<f32>() / mappings.len() as f32)
                }
            }
        }

            
        impl DefaultViewSerialization for ScoresByRank {}

            
        define_basic_unique_mapped_view!(
            MyCollectionByName,
            MyCollection,
            1,
            "by-name",
            String,
            (),
            |document: CollectionDocument<MyCollection>| {
                document.header.emit_key(document.contents.name.clone())
            },
        );
    };
}