1
use std::{marker::PhantomData, ops::Deref, sync::Arc};
2

            
3
use actionable::{Action, Identifier};
4
use arc_bytes::serde::Bytes;
5
use async_trait::async_trait;
6
use futures::{future::BoxFuture, Future, FutureExt};
7
use serde::{Deserialize, Serialize};
8
use zeroize::Zeroize;
9

            
10
use crate::{
11
    document::{CollectionDocument, CollectionHeader, Document, HasHeader, Header, OwnedDocument},
12
    key::{IntoPrefixRange, Key, KeyEncoding},
13
    permissions::Permissions,
14
    schema::{
15
        self,
16
        view::{self, map::MappedDocuments},
17
        Map, MappedValue, Nameable, NamedReference, Schema, SchemaName, SerializedCollection,
18
    },
19
    transaction, Error,
20
};
21

            
22
mod has_session;
23
mod lowlevel;
24

            
25
pub use self::{
26
    has_session::HasSession,
27
    lowlevel::{AsyncLowLevelConnection, LowLevelConnection},
28
};
29

            
30
/// A connection to a database's [`Schema`](schema::Schema), giving access to
31
/// [`Collection`s](crate::schema::Collection) and
32
/// [`Views`s](crate::schema::View). This trait is not safe to use within async
33
/// contexts and will block the current thread. For async access, use
34
/// [`AsyncConnection`].
35
pub trait Connection: LowLevelConnection + Sized + Send + Sync {
36
    /// The [`StorageConnection`] type that is paired with this type.
37
    type Storage: StorageConnection<Database = Self>;
38

            
39
    /// Returns the [`StorageConnection`] implementor that this database belongs to.
40
    fn storage(&self) -> Self::Storage;
41

            
42
    /// Accesses a collection for the connected [`Schema`](schema::Schema).
43
28021
    fn collection<C: schema::Collection>(&self) -> Collection<'_, Self, C> {
44
28021
        Collection::new(self)
45
28021
    }
46

            
47
    /// Accesses a [`schema::View`] from this connection.
48
19373
    fn view<V: schema::SerializedView>(&'_ self) -> View<'_, Self, V, V::Key> {
49
19373
        View::new(self)
50
19373
    }
51

            
52
    /// Lists [executed transactions](transaction::Executed) from this
53
    /// [`Schema`](schema::Schema). By default, a maximum of 1000 entries will
54
    /// be returned, but that limit can be overridden by setting `result_limit`.
55
    /// A hard limit of 100,000 results will be returned. To begin listing after
56
    /// another known `transaction_id`, pass `transaction_id + 1` into
57
    /// `starting_id`.
58
    fn list_executed_transactions(
59
        &self,
60
        starting_id: Option<u64>,
61
        result_limit: Option<u32>,
62
    ) -> Result<Vec<transaction::Executed>, Error>;
63

            
64
    /// Fetches the last transaction id that has been committed, if any.
65
    fn last_transaction_id(&self) -> Result<Option<u64>, Error>;
66

            
67
    /// Compacts the entire database to reclaim unused disk space.
68
    ///
69
    /// This process is done by writing data to a new file and swapping the file
70
    /// once the process completes. This ensures that if a hardware failure,
71
    /// power outage, or crash occurs that the original collection data is left
72
    /// untouched.
73
    ///
74
    /// ## Errors
75
    ///
76
    /// * [`Error::Io`]: an error occurred while compacting the database.
77
    fn compact(&self) -> Result<(), crate::Error>;
78

            
79
    /// Compacts the collection to reclaim unused disk space.
80
    ///
81
    /// This process is done by writing data to a new file and swapping the file
82
    /// once the process completes. This ensures that if a hardware failure,
83
    /// power outage, or crash occurs that the original collection data is left
84
    /// untouched.
85
    ///
86
    /// ## Errors
87
    ///
88
    /// * [`Error::CollectionNotFound`]: database `name` does not exist.
89
    /// * [`Error::Io`]: an error occurred while compacting the database.
90
6
    fn compact_collection<C: schema::Collection>(&self) -> Result<(), crate::Error> {
91
6
        self.compact_collection_by_name(C::collection_name())
92
6
    }
93

            
94
    /// Compacts the key value store to reclaim unused disk space.
95
    ///
96
    /// This process is done by writing data to a new file and swapping the file
97
    /// once the process completes. This ensures that if a hardware failure,
98
    /// power outage, or crash occurs that the original collection data is left
99
    /// untouched.
100
    ///
101
    /// ## Errors
102
    ///
103
    /// * [`Error::Io`]: an error occurred while compacting the database.
104
    fn compact_key_value_store(&self) -> Result<(), crate::Error>;
105
}
106

            
107
/// Interacts with a collection over a `Connection`.
108
///
109
/// These examples in this type use this basic collection definition:
110
///
111
/// ```rust
112
/// use bonsaidb_core::{schema::Collection, Error};
113
/// use serde::{Deserialize, Serialize};
114
///
115
/// #[derive(Debug, Serialize, Deserialize, Default, Collection)]
116
/// #[collection(name = "MyCollection")]
117
/// # #[collection(core = bonsaidb_core)]
118
/// pub struct MyCollection {
119
///     pub rank: u32,
120
///     pub score: f32,
121
/// }
122
/// ```
123
pub struct Collection<'a, Cn, Cl> {
124
    connection: &'a Cn,
125
    _phantom: PhantomData<Cl>, /* allows for extension traits to be written for collections of specific types */
126
}
127

            
128
impl<'a, Cn, Cl> Clone for Collection<'a, Cn, Cl> {
129
    fn clone(&self) -> Self {
130
        Self {
131
            connection: self.connection,
132
            _phantom: PhantomData,
133
        }
134
    }
135
}
136

            
137
impl<'a, Cn, Cl> Collection<'a, Cn, Cl>
138
where
139
    Cn: Connection,
140
    Cl: schema::Collection,
141
{
142
    /// Creates a new instance using `connection`.
143
28021
    fn new(connection: &'a Cn) -> Self {
144
28021
        Self {
145
28021
            connection,
146
28021
            _phantom: PhantomData::default(),
147
28021
        }
148
28021
    }
149

            
150
    /// Adds a new `Document<Cl>` with the contents `item`.
151
    ///
152
    /// ## Automatic ID Assignment
153
    ///
154
    /// This function calls [`SerializedCollection::natural_id()`] to try to
155
    /// retrieve a primary key value from `item`. If an id is returned, the item
156
    /// is inserted with that id. If an id is not returned, an id will be
157
    /// automatically assigned, if possible, by the storage backend, which uses the [`Key`]
158
    /// trait to assign ids.
159
    ///
160
    /// ```rust
161
    /// # bonsaidb_core::__doctest_prelude!();
162
    /// # use bonsaidb_core::connection::Connection;
163
    /// # fn test_fn<C: Connection>(db: &C) -> Result<(), Error> {
164
    /// let inserted_header = db
165
    ///     .collection::<MyCollection>()
166
    ///     .push(&MyCollection::default())?;
167
    /// println!(
168
    ///     "Inserted id {} with revision {}",
169
    ///     inserted_header.id, inserted_header.revision
170
    /// );
171
    /// # Ok(())
172
    /// # }
173
    /// ```
174
29597
    pub fn push(
175
29597
        &self,
176
29597
        item: &<Cl as SerializedCollection>::Contents,
177
29597
    ) -> Result<CollectionHeader<Cl::PrimaryKey>, crate::Error>
178
29597
    where
179
29597
        Cl: schema::SerializedCollection,
180
29597
    {
181
29597
        let contents = Cl::serialize(item)?;
182
29597
        if let Some(natural_id) = Cl::natural_id(item) {
183
1
            self.insert_bytes(natural_id, contents)
184
        } else {
185
29596
            self.push_bytes(contents)
186
        }
187
29597
    }
188

            
189
    /// Adds a new `Document<Cl>` with the `contents`.
190
    ///
191
    /// ## Automatic ID Assignment
192
    ///
193
    /// An id will be automatically assigned, if possible, by the storage backend, which uses
194
    /// the [`Key`] trait to assign ids.
195
    ///
196
    /// ```rust
197
    /// # bonsaidb_core::__doctest_prelude!();
198
    /// # use bonsaidb_core::connection::Connection;
199
    /// # fn test_fn<C: Connection>(db: &C) -> Result<(), Error> {
200
    /// let inserted_header = db.collection::<MyCollection>().push_bytes(vec![])?;
201
    /// println!(
202
    ///     "Inserted id {} with revision {}",
203
    ///     inserted_header.id, inserted_header.revision
204
    /// );
205
    /// # Ok(())
206
    /// # }
207
    /// ```
208
29596
    pub fn push_bytes<B: Into<Bytes> + Send>(
209
29596
        &self,
210
29596
        contents: B,
211
29596
    ) -> Result<CollectionHeader<Cl::PrimaryKey>, crate::Error> {
212
29596
        self.connection
213
29596
            .insert::<Cl, _, B>(Option::<Cl::PrimaryKey>::None, contents)
214
29596
    }
215

            
216
    /// Adds a new `Document<Cl>` with the given `id` and contents `item`.
217
    ///
218
    /// ```rust
219
    /// # bonsaidb_core::__doctest_prelude!();
220
    /// # use bonsaidb_core::connection::Connection;
221
    /// # fn test_fn<C: Connection>(db: &C) -> Result<(), Error> {
222
    /// let inserted_header = db
223
    ///     .collection::<MyCollection>()
224
    ///     .insert(42, &MyCollection::default())?;
225
    /// println!(
226
    ///     "Inserted id {} with revision {}",
227
    ///     inserted_header.id, inserted_header.revision
228
    /// );
229
    /// # Ok(())
230
    /// # }
231
    /// ```
232
7
    pub fn insert<PrimaryKey>(
233
7
        &self,
234
7
        id: PrimaryKey,
235
7
        item: &<Cl as SerializedCollection>::Contents,
236
7
    ) -> Result<CollectionHeader<Cl::PrimaryKey>, crate::Error>
237
7
    where
238
7
        Cl: schema::SerializedCollection,
239
7
        PrimaryKey: for<'k> KeyEncoding<'k, Cl::PrimaryKey>,
240
7
    {
241
7
        let contents = Cl::serialize(item)?;
242
7
        self.connection.insert::<Cl, _, _>(Some(id), contents)
243
7
    }
244

            
245
    /// Adds a new `Document<Cl>` with the the given `id` and `contents`.
246
    ///
247
    /// ```rust
248
    /// # bonsaidb_core::__doctest_prelude!();
249
    /// # use bonsaidb_core::connection::Connection;
250
    /// # fn test_fn<C: Connection>(db: &C) -> Result<(), Error> {
251
    /// let inserted_header = db.collection::<MyCollection>().insert_bytes(42, vec![])?;
252
    /// println!(
253
    ///     "Inserted id {} with revision {}",
254
    ///     inserted_header.id, inserted_header.revision
255
    /// );
256
    /// # Ok(())
257
    /// # }
258
    /// ```
259
1
    pub fn insert_bytes<B: Into<Bytes> + Send>(
260
1
        &self,
261
1
        id: Cl::PrimaryKey,
262
1
        contents: B,
263
1
    ) -> Result<CollectionHeader<Cl::PrimaryKey>, crate::Error> {
264
1
        self.connection.insert::<Cl, _, B>(Some(id), contents)
265
1
    }
266

            
267
    /// Updates an existing document. Upon success, `doc.revision` will be
268
    /// updated with the new revision.
269
    ///
270
    /// ```rust
271
    /// # bonsaidb_core::__doctest_prelude!();
272
    /// # use bonsaidb_core::connection::Connection;
273
    /// # fn test_fn<C: Connection>(db: &C) -> Result<(), Error> {
274
    /// if let Some(mut document) = db.collection::<MyCollection>().get(42)? {
275
    ///     // modify the document
276
    ///     db.collection::<MyCollection>().update(&mut document);
277
    ///     println!("Updated revision: {:?}", document.header.revision);
278
    /// }
279
    /// # Ok(())
280
    /// # }
281
    /// ```
282
    pub fn update<D: Document<Cl> + Send + Sync>(&self, doc: &mut D) -> Result<(), Error> {
283
        self.connection.update::<Cl, D>(doc)
284
    }
285

            
286
    /// Overwrites an existing document, or inserts a new document. Upon success,
287
    /// `doc.revision` will be updated with the new revision information.
288
    ///
289
    /// ```rust
290
    /// # bonsaidb_core::__doctest_prelude!();
291
    /// # use bonsaidb_core::connection::Connection;
292
    /// # fn test_fn<C: Connection>(db: &C) -> Result<(), Error> {
293
    /// if let Some(mut document) = db.collection::<MyCollection>().get(42)? {
294
    ///     // modify the document
295
    ///     db.collection::<MyCollection>().overwrite(&mut document);
296
    ///     println!("Updated revision: {:?}", document.header.revision);
297
    /// }
298
    /// # Ok(())
299
    /// # }
300
    /// ```
301
3
    pub fn overwrite<D: Document<Cl> + Send + Sync>(&self, doc: &mut D) -> Result<(), Error> {
302
3
        let contents = doc.bytes()?;
303
3
        doc.set_collection_header(self.connection.overwrite::<Cl, _>(doc.id(), contents)?)
304
3
    }
305

            
306
    /// Retrieves a `Document<Cl>` with `id` from the connection.
307
    ///
308
    /// ```rust
309
    /// # bonsaidb_core::__doctest_prelude!();
310
    /// # use bonsaidb_core::connection::Connection;
311
    /// # fn test_fn<C: Connection>(db: &C) -> Result<(), Error> {
312
    /// if let Some(doc) = db.collection::<MyCollection>().get(42)? {
313
    ///     println!(
314
    ///         "Retrieved bytes {:?} with revision {}",
315
    ///         doc.contents, doc.header.revision
316
    ///     );
317
    ///     let deserialized = MyCollection::document_contents(&doc)?;
318
    ///     println!("Deserialized contents: {:?}", deserialized);
319
    /// }
320
    /// # Ok(())
321
    /// # }
322
    /// ```
323
1074
    pub fn get<PrimaryKey>(&self, id: PrimaryKey) -> Result<Option<OwnedDocument>, Error>
324
1074
    where
325
1074
        PrimaryKey: for<'k> KeyEncoding<'k, Cl::PrimaryKey>,
326
1074
    {
327
1074
        self.connection.get::<Cl, _>(id)
328
1074
    }
329

            
330
    /// Retrieves all documents matching `ids`. Documents that are not found
331
    /// are not returned, but no error will be generated.
332
    ///
333
    /// ```rust
334
    /// # bonsaidb_core::__doctest_prelude!();
335
    /// # use bonsaidb_core::connection::Connection;
336
    /// # fn test_fn<C: Connection>(db: &C) -> Result<(), Error> {
337
    /// for doc in db.collection::<MyCollection>().get_multiple([42, 43])? {
338
    ///     println!("Retrieved #{} with bytes {:?}", doc.header.id, doc.contents);
339
    ///     let deserialized = MyCollection::document_contents(&doc)?;
340
    ///     println!("Deserialized contents: {:?}", deserialized);
341
    /// }
342
    /// # Ok(())
343
    /// # }
344
    /// ```
345
216
    pub fn get_multiple<DocumentIds, PrimaryKey, I>(
346
216
        &self,
347
216
        ids: DocumentIds,
348
216
    ) -> Result<Vec<OwnedDocument>, Error>
349
216
    where
350
216
        DocumentIds: IntoIterator<Item = PrimaryKey, IntoIter = I> + Send + Sync,
351
216
        I: Iterator<Item = PrimaryKey> + Send + Sync,
352
216
        PrimaryKey: for<'k> KeyEncoding<'k, Cl::PrimaryKey>,
353
216
    {
354
216
        self.connection.get_multiple::<Cl, _, _, _>(ids)
355
216
    }
356

            
357
    /// Retrieves all documents matching the range of `ids`.
358
    ///
359
    /// ```rust
360
    /// # bonsaidb_core::__doctest_prelude!();
361
    /// # use bonsaidb_core::connection::Connection;
362
    /// # fn test_fn<C: Connection>(db: &C) -> Result<(), Error> {
363
    /// for doc in db
364
    ///     .collection::<MyCollection>()
365
    ///     .list(42..)
366
    ///     .descending()
367
    ///     .limit(20)
368
    ///     .query()?
369
    /// {
370
    ///     println!("Retrieved #{} with bytes {:?}", doc.header.id, doc.contents);
371
    ///     let deserialized = MyCollection::document_contents(&doc)?;
372
    ///     println!("Deserialized contents: {:?}", deserialized);
373
    /// }
374
    /// # Ok(())
375
    /// # }
376
    /// ```
377
    pub fn list<PrimaryKey, R>(&'a self, ids: R) -> List<'a, Cn, Cl, PrimaryKey>
378
    where
379
        R: Into<Range<PrimaryKey>>,
380
        PrimaryKey: for<'k> KeyEncoding<'k, Cl::PrimaryKey>,
381
    {
382
        List::new(PossiblyOwned::Borrowed(self), ids.into())
383
    }
384

            
385
    /// Retrieves all documents with ids that start with `prefix`.
386
    ///
387
    /// ```rust
388
    /// use bonsaidb_core::{
389
    ///     connection::Connection,
390
    ///     document::OwnedDocument,
391
    ///     schema::{Collection, Schematic, SerializedCollection},
392
    ///     Error,
393
    /// };
394
    /// use serde::{Deserialize, Serialize};
395
    ///
396
    /// #[derive(Debug, Serialize, Deserialize, Default, Collection)]
397
    /// #[collection(name = "MyCollection", primary_key = String)]
398
    /// # #[collection(core = bonsaidb_core)]
399
    /// pub struct MyCollection;
400
    ///
401
    /// fn starts_with_a<C: Connection>(db: &C) -> Result<Vec<OwnedDocument>, Error> {
402
    ///     db.collection::<MyCollection>()
403
    ///         .list_with_prefix(String::from("a"))
404
    ///         .query()
405
    /// }
406
    /// ```
407
    pub fn list_with_prefix(&'a self, prefix: Cl::PrimaryKey) -> List<'a, Cn, Cl, Cl::PrimaryKey>
408
    where
409
        Cl::PrimaryKey: IntoPrefixRange,
410
    {
411
        List::new(PossiblyOwned::Borrowed(self), prefix.into_prefix_range())
412
    }
413

            
414
    /// Retrieves all documents.
415
    ///
416
    /// ```rust
417
    /// # bonsaidb_core::__doctest_prelude!();
418
    /// # use bonsaidb_core::connection::Connection;
419
    /// # fn test_fn<C: Connection>(db: &C) -> Result<(), Error> {
420
    /// for doc in db.collection::<MyCollection>().all().query()? {
421
    ///     println!("Retrieved #{} with bytes {:?}", doc.header.id, doc.contents);
422
    ///     let deserialized = MyCollection::document_contents(&doc)?;
423
    ///     println!("Deserialized contents: {:?}", deserialized);
424
    /// }
425
    /// # Ok(())
426
    /// # }
427
    /// ```
428
    pub fn all(&'a self) -> List<'a, Cn, Cl, Cl::PrimaryKey> {
429
        List::new(PossiblyOwned::Borrowed(self), Range::from(..))
430
    }
431

            
432
    /// Removes a `Document` from the database.
433
    ///
434
    /// ```rust
435
    /// # bonsaidb_core::__doctest_prelude!();
436
    /// # use bonsaidb_core::connection::Connection;
437
    /// # fn test_fn<C: Connection>(db: &C) -> Result<(), Error> {
438
    /// if let Some(doc) = db.collection::<MyCollection>().get(42)? {
439
    ///     db.collection::<MyCollection>().delete(&doc)?;
440
    /// }
441
    /// # Ok(())
442
    /// # }
443
    /// ```
444
14
    pub fn delete<H: HasHeader + Send + Sync>(&self, doc: &H) -> Result<(), Error> {
445
14
        self.connection.delete::<Cl, H>(doc)
446
14
    }
447
}
448

            
449
/// Retrieves a list of documents from a collection. This structure also offers
450
/// functions to customize the options for the operation.
451
#[must_use]
452
pub struct List<'a, Cn, Cl, PrimaryKey>
453
where
454
    Cl: schema::Collection,
455
{
456
    collection: PossiblyOwned<'a, Collection<'a, Cn, Cl>>,
457
    range: Range<PrimaryKey>,
458
    sort: Sort,
459
    limit: Option<u32>,
460
}
461

            
462
impl<'a, Cn, Cl, PrimaryKey> List<'a, Cn, Cl, PrimaryKey>
463
where
464
    Cl: schema::Collection,
465
    Cn: Connection,
466
    PrimaryKey: for<'k> KeyEncoding<'k, Cl::PrimaryKey>,
467
{
468
33
    pub(crate) fn new(
469
33
        collection: PossiblyOwned<'a, Collection<'a, Cn, Cl>>,
470
33
        range: Range<PrimaryKey>,
471
33
    ) -> Self {
472
33
        Self {
473
33
            collection,
474
33
            range,
475
33
            sort: Sort::Ascending,
476
33
            limit: None,
477
33
        }
478
33
    }
479

            
480
    /// Lists documents by id in ascending order.
481
    pub fn ascending(mut self) -> Self {
482
        self.sort = Sort::Ascending;
483
        self
484
    }
485

            
486
    /// Lists documents by id in descending order.
487
3
    pub fn descending(mut self) -> Self {
488
3
        self.sort = Sort::Descending;
489
3
        self
490
3
    }
491

            
492
    /// Sets the maximum number of results to return.
493
3
    pub fn limit(mut self, maximum_results: u32) -> Self {
494
3
        self.limit = Some(maximum_results);
495
3
        self
496
3
    }
497

            
498
    /// Returns the number of documents contained within the range.
499
    ///
500
    /// Order and limit are ignored if they were set.
501
    ///
502
    /// ```rust
503
    /// # bonsaidb_core::__doctest_prelude!();
504
    /// # use bonsaidb_core::connection::Connection;
505
    /// # fn test_fn<C: Connection>(db: &C) -> Result<(), Error> {
506
    /// println!(
507
    ///     "Number of documents with id 42 or larger: {}",
508
    ///     db.collection::<MyCollection>().list(42..).count()?
509
    /// );
510
    /// println!(
511
    ///     "Number of documents in MyCollection: {}",
512
    ///     db.collection::<MyCollection>().all().count()?
513
    /// );
514
    /// # Ok(())
515
    /// # }
516
    /// ```
517
6
    pub fn count(self) -> Result<u64, Error> {
518
6
        let Self {
519
6
            collection, range, ..
520
6
        } = self;
521
6
        collection.connection.count::<Cl, _, _>(range)
522
6
    }
523

            
524
    /// Returns the list of headers for documents contained within the range.
525
    ///
526
    /// ```rust
527
    /// # bonsaidb_core::__doctest_prelude!();
528
    /// # use bonsaidb_core::connection::Connection;
529
    /// # fn test_fn<C: Connection>(db: &C) -> Result<(), Error> {
530
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
531
    /// println!(
532
    ///     "Headers with id 42 or larger: {:?}",
533
    ///     db.collection::<MyCollection>().list(42..).headers()?
534
    /// );
535
    /// println!(
536
    ///     "Headers in MyCollection: {:?}",
537
    ///     db.collection::<MyCollection>().all().headers()?
538
    /// );
539
    /// # Ok(())
540
    /// # })
541
    /// # }
542
    /// ```
543
3
    pub fn headers(self) -> Result<Vec<Header>, Error> {
544
3
        let Self {
545
3
            collection,
546
3
            range,
547
3
            sort,
548
3
            limit,
549
3
            ..
550
3
        } = self;
551
3
        collection
552
3
            .connection
553
3
            .list_headers::<Cl, _, _>(range, sort, limit)
554
3
    }
555

            
556
    /// Retrieves the matching documents.
557
    ///
558
    /// ```rust
559
    /// # bonsaidb_core::__doctest_prelude!();
560
    /// # use bonsaidb_core::connection::Connection;
561
    /// # fn test_fn<C: Connection>(db: &C) -> Result<(), Error> {
562
    /// for doc in db.collection::<MyCollection>().all().query()? {
563
    ///     println!("Retrieved #{} with bytes {:?}", doc.header.id, doc.contents);
564
    ///     let deserialized = MyCollection::document_contents(&doc)?;
565
    ///     println!("Deserialized contents: {:?}", deserialized);
566
    /// }
567
    /// # Ok(())
568
    /// # }
569
    /// ```
570
24
    pub fn query(self) -> Result<Vec<OwnedDocument>, Error> {
571
24
        let Self {
572
24
            collection,
573
24
            range,
574
24
            sort,
575
24
            limit,
576
24
        } = self;
577
24
        collection.connection.list::<Cl, _, _>(range, sort, limit)
578
24
    }
579
}
580

            
581
/// Parameters to query a [`schema::View`].
582
///
583
/// The examples for this type use this view definition:
584
///
585
/// ```rust
586
/// # mod collection {
587
/// # bonsaidb_core::__doctest_prelude!();
588
/// # }
589
/// # use collection::MyCollection;
590
/// use bonsaidb_core::{
591
///     define_basic_unique_mapped_view,
592
///     document::{CollectionDocument, Emit},
593
///     schema::{
594
///         CollectionViewSchema, DefaultViewSerialization, Name, ReduceResult, View,
595
///         ViewMapResult, ViewMappedValue,
596
///     },
597
/// };
598
///
599
/// #[derive(Debug, Clone, View)]
600
/// #[view(collection = MyCollection, key = u32, value = f32, name = "scores-by-rank")]
601
/// # #[view(core = bonsaidb_core)]
602
/// pub struct ScoresByRank;
603
///
604
/// impl CollectionViewSchema for ScoresByRank {
605
///     type View = Self;
606
///     fn map(
607
///         &self,
608
///         document: CollectionDocument<<Self::View as View>::Collection>,
609
///     ) -> ViewMapResult<Self::View> {
610
///         document
611
///             .header
612
///             .emit_key_and_value(document.contents.rank, document.contents.score)
613
///     }
614
///
615
///     fn reduce(
616
///         &self,
617
///         mappings: &[ViewMappedValue<Self::View>],
618
///         rereduce: bool,
619
///     ) -> ReduceResult<Self::View> {
620
///         if mappings.is_empty() {
621
///             Ok(0.)
622
///         } else {
623
///             Ok(mappings.iter().map(|map| map.value).sum::<f32>() / mappings.len() as f32)
624
///         }
625
///     }
626
/// }
627
/// ```
628
#[must_use]
629
pub struct View<'a, Cn, V: schema::SerializedView, Key> {
630
    connection: &'a Cn,
631

            
632
    /// Key filtering criteria.
633
    pub key: Option<QueryKey<Key>>,
634

            
635
    /// The view's data access policy. The default value is [`AccessPolicy::UpdateBefore`].
636
    pub access_policy: AccessPolicy,
637

            
638
    /// The sort order of the query.
639
    pub sort: Sort,
640

            
641
    /// The maximum number of results to return.
642
    pub limit: Option<u32>,
643

            
644
    _view: PhantomData<V>,
645
}
646

            
647
impl<'a, Cn, V, Key> View<'a, Cn, V, Key>
648
where
649
    V: schema::SerializedView,
650
    Cn: Connection,
651
    Key: for<'k> KeyEncoding<'k, V::Key>,
652
{
653
19403
    fn new(connection: &'a Cn) -> Self {
654
19403
        Self {
655
19403
            connection,
656
19403
            key: None,
657
19403
            access_policy: AccessPolicy::UpdateBefore,
658
19403
            sort: Sort::Ascending,
659
19403
            limit: None,
660
19403
            _view: PhantomData,
661
19403
        }
662
19403
    }
663

            
664
    /// Filters for entries in the view with `key`.
665
    ///
666
    /// ```rust
667
    /// # bonsaidb_core::__doctest_prelude!();
668
    /// # use bonsaidb_core::connection::Connection;
669
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
670
    /// // score is an f32 in this example
671
    /// for mapping in db.view::<ScoresByRank>().with_key(42).query()? {
672
    ///     assert_eq!(mapping.key, 42);
673
    ///     println!("Rank {} has a score of {:3}", mapping.key, mapping.value);
674
    /// }
675
    /// # Ok(())
676
    /// # }
677
    /// ```
678
15938
    pub fn with_key<K: for<'k> KeyEncoding<'k, V::Key>>(self, key: K) -> View<'a, Cn, V, K> {
679
15938
        View {
680
15938
            connection: self.connection,
681
15938
            key: Some(QueryKey::Matches(key)),
682
15938
            access_policy: self.access_policy,
683
15938
            sort: self.sort,
684
15938
            limit: self.limit,
685
15938
            _view: PhantomData,
686
15938
        }
687
15938
    }
688

            
689
    /// Filters for entries in the view with `keys`.
690
    ///
691
    /// ```rust
692
    /// # bonsaidb_core::__doctest_prelude!();
693
    /// # use bonsaidb_core::connection::Connection;
694
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
695
    /// // score is an f32 in this example
696
    /// for mapping in db.view::<ScoresByRank>().with_keys([42, 43]).query()? {
697
    ///     println!("Rank {} has a score of {:3}", mapping.key, mapping.value);
698
    /// }
699
    /// # Ok(())
700
    /// # }
701
    /// ```
702
7
    pub fn with_keys<K, IntoIter: IntoIterator<Item = K>>(
703
7
        self,
704
7
        keys: IntoIter,
705
7
    ) -> View<'a, Cn, V, K> {
706
7
        View {
707
7
            connection: self.connection,
708
7
            key: Some(QueryKey::Multiple(keys.into_iter().collect())),
709
7
            access_policy: self.access_policy,
710
7
            sort: self.sort,
711
7
            limit: self.limit,
712
7
            _view: PhantomData,
713
7
        }
714
7
    }
715

            
716
    /// Filters for entries in the view with the range `keys`.
717
    ///
718
    /// ```rust
719
    /// # bonsaidb_core::__doctest_prelude!();
720
    /// # use bonsaidb_core::connection::Connection;
721
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
722
    /// // score is an f32 in this example
723
    /// for mapping in db.view::<ScoresByRank>().with_key_range(42..).query()? {
724
    ///     assert!(mapping.key >= 42);
725
    ///     println!("Rank {} has a score of {:3}", mapping.key, mapping.value);
726
    /// }
727
    /// # Ok(())
728
    /// # }
729
    /// ```
730
7
    pub fn with_key_range<K, R: Into<Range<K>>>(self, range: R) -> View<'a, Cn, V, K> {
731
7
        View {
732
7
            connection: self.connection,
733
7
            key: Some(QueryKey::Range(range.into())),
734
7
            access_policy: self.access_policy,
735
7
            sort: self.sort,
736
7
            limit: self.limit,
737
7
            _view: PhantomData,
738
7
        }
739
7
    }
740

            
741
    /// Filters for entries in the view with keys that begin with `prefix`.
742
    ///
743
    /// ```rust
744
    /// # bonsaidb_core::__doctest_prelude!();
745
    /// # use bonsaidb_core::connection::Connection;
746
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
747
    /// #[derive(View, Debug, Clone)]
748
    /// #[view(name = "by-name", key = String, collection = MyCollection)]
749
    /// # #[view(core = bonsaidb_core)]
750
    /// struct ByName;
751
    ///
752
    /// // score is an f32 in this example
753
    /// for mapping in db
754
    ///     .view::<ByName>()
755
    ///     .with_key_prefix(String::from("a"))
756
    ///     .query()?
757
    /// {
758
    ///     assert!(mapping.key.starts_with("a"));
759
    ///     println!("{} in document {:?}", mapping.key, mapping.source);
760
    /// }
761
    /// # Ok(())
762
    /// # }
763
    /// ```
764
1
    pub fn with_key_prefix(self, prefix: V::Key) -> View<'a, Cn, V, V::Key>
765
1
    where
766
1
        V::Key: IntoPrefixRange,
767
1
    {
768
1
        View {
769
1
            connection: self.connection,
770
1
            key: Some(QueryKey::Range(prefix.into_prefix_range())),
771
1
            access_policy: self.access_policy,
772
1
            sort: self.sort,
773
1
            limit: self.limit,
774
1
            _view: PhantomData,
775
1
        }
776
1
    }
777

            
778
    /// Sets the access policy for queries.
779
    ///
780
    /// ```rust
781
    /// # bonsaidb_core::__doctest_prelude!();
782
    /// # use bonsaidb_core::connection::Connection;
783
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
784
    /// // score is an f32 in this example
785
    /// for mapping in db
786
    ///     .view::<ScoresByRank>()
787
    ///     .with_access_policy(AccessPolicy::UpdateAfter)
788
    ///     .query()?
789
    /// {
790
    ///     println!("Rank {} has a score of {:3}", mapping.key, mapping.value);
791
    /// }
792
    /// # Ok(())
793
    /// # }
794
    /// ```
795
11
    pub fn with_access_policy(mut self, policy: AccessPolicy) -> Self {
796
11
        self.access_policy = policy;
797
11
        self
798
11
    }
799

            
800
    /// Queries the view in ascending order. This is the default sorting
801
    /// behavior.
802
    ///
803
    /// ```rust
804
    /// # bonsaidb_core::__doctest_prelude!();
805
    /// # use bonsaidb_core::connection::Connection;
806
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
807
    /// // score is an f32 in this example
808
    /// for mapping in db.view::<ScoresByRank>().ascending().query()? {
809
    ///     println!("Rank {} has a score of {:3}", mapping.key, mapping.value);
810
    /// }
811
    /// # Ok(())
812
    /// # }
813
    /// ```
814
    pub fn ascending(mut self) -> Self {
815
        self.sort = Sort::Ascending;
816
        self
817
    }
818

            
819
    /// Queries the view in descending order.
820
    ///
821
    /// ```rust
822
    /// # bonsaidb_core::__doctest_prelude!();
823
    /// # use bonsaidb_core::connection::Connection;
824
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
825
    /// // score is an f32 in this example
826
    /// for mapping in db.view::<ScoresByRank>().descending().query()? {
827
    ///     println!("Rank {} has a score of {:3}", mapping.key, mapping.value);
828
    /// }
829
    /// # Ok(())
830
    /// # }
831
    /// ```
832
3
    pub fn descending(mut self) -> Self {
833
3
        self.sort = Sort::Descending;
834
3
        self
835
3
    }
836

            
837
    /// Sets the maximum number of results to return.
838
    ///
839
    /// ```rust
840
    /// # bonsaidb_core::__doctest_prelude!();
841
    /// # use bonsaidb_core::connection::Connection;
842
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
843
    /// // score is an f32 in this example
844
    /// let mappings = db.view::<ScoresByRank>().limit(10).query()?;
845
    /// assert!(mappings.len() <= 10);
846
    /// # Ok(())
847
    /// # }
848
    /// ```
849
3
    pub fn limit(mut self, maximum_results: u32) -> Self {
850
3
        self.limit = Some(maximum_results);
851
3
        self
852
3
    }
853

            
854
    /// Executes the query and retrieves the results.
855
    ///
856
    /// ```rust
857
    /// # bonsaidb_core::__doctest_prelude!();
858
    /// # use bonsaidb_core::connection::Connection;
859
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
860
    /// // score is an f32 in this example
861
    /// for mapping in db.view::<ScoresByRank>().query()? {
862
    ///     println!("Rank {} has a score of {:3}", mapping.key, mapping.value);
863
    /// }
864
    /// # Ok(())
865
    /// # }
866
    /// ```
867
18740
    pub fn query(self) -> Result<ViewMappings<V>, Error> {
868
18740
        self.connection
869
18740
            .query::<V, Key>(self.key, self.sort, self.limit, self.access_policy)
870
18740
    }
871

            
872
    /// Executes the query and retrieves the results with the associated [`Document`s](crate::document::OwnedDocument).
873
    ///
874
    /// ```rust
875
    /// # bonsaidb_core::__doctest_prelude!();
876
    /// # use bonsaidb_core::connection::Connection;
877
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
878
    /// for mapping in &db
879
    ///     .view::<ScoresByRank>()
880
    ///     .with_key_range(42..=44)
881
    ///     .query_with_docs()?
882
    /// {
883
    ///     println!(
884
    ///         "Mapping from #{} with rank: {} and score: {}. Document bytes: {:?}",
885
    ///         mapping.document.header.id, mapping.key, mapping.value, mapping.document.contents
886
    ///     );
887
    /// }
888
    /// # Ok(())
889
    /// # }
890
    /// ```
891
505
    pub fn query_with_docs(self) -> Result<MappedDocuments<OwnedDocument, V>, Error> {
892
505
        self.connection.query_with_docs::<V, Key>(
893
505
            self.key,
894
505
            self.sort,
895
505
            self.limit,
896
505
            self.access_policy,
897
505
        )
898
505
    }
899

            
900
    /// Executes the query and retrieves the results with the associated [`CollectionDocument`s](crate::document::CollectionDocument).
901
    ///
902
    /// ```rust
903
    /// # bonsaidb_core::__doctest_prelude!();
904
    /// # use bonsaidb_core::connection::Connection;
905
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
906
    /// for mapping in &db
907
    ///     .view::<ScoresByRank>()
908
    ///     .with_key_range(42..=44)
909
    ///     .query_with_collection_docs()?
910
    /// {
911
    ///     println!(
912
    ///         "Mapping from #{} with rank: {} and score: {}. Deserialized Contents: {:?}",
913
    ///         mapping.document.header.id, mapping.key, mapping.value, mapping.document.contents
914
    ///     );
915
    /// }
916
    /// # Ok(())
917
    /// # }
918
    /// ```
919
25
    pub fn query_with_collection_docs(
920
25
        self,
921
25
    ) -> Result<MappedDocuments<CollectionDocument<V::Collection>, V>, Error>
922
25
    where
923
25
        V::Collection: SerializedCollection,
924
25
        <V::Collection as SerializedCollection>::Contents: std::fmt::Debug,
925
25
    {
926
25
        self.connection.query_with_collection_docs::<V, Key>(
927
25
            self.key,
928
25
            self.sort,
929
25
            self.limit,
930
25
            self.access_policy,
931
25
        )
932
25
    }
933

            
934
    /// Executes a reduce over the results of the query
935
    ///
936
    /// ```rust
937
    /// # bonsaidb_core::__doctest_prelude!();
938
    /// # use bonsaidb_core::connection::Connection;
939
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
940
    /// // score is an f32 in this example
941
    /// let score = db.view::<ScoresByRank>().reduce()?;
942
    /// println!("Average score: {:3}", score);
943
    /// # Ok(())
944
    /// # }
945
    /// ```
946
22
    pub fn reduce(self) -> Result<V::Value, Error> {
947
22
        self.connection
948
22
            .reduce::<V, Key>(self.key, self.access_policy)
949
22
    }
950

            
951
    /// Executes a reduce over the results of the query, grouping by key.
952
    ///
953
    /// ```rust
954
    /// # bonsaidb_core::__doctest_prelude!();
955
    /// # use bonsaidb_core::connection::Connection;
956
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
957
    /// // score is an f32 in this example
958
    /// for mapping in db.view::<ScoresByRank>().reduce_grouped()? {
959
    ///     println!(
960
    ///         "Rank {} has an average score of {:3}",
961
    ///         mapping.key, mapping.value
962
    ///     );
963
    /// }
964
    /// # Ok(())
965
    /// # }
966
    /// ```
967
18
    pub fn reduce_grouped(self) -> Result<GroupedReductions<V>, Error> {
968
18
        self.connection
969
18
            .reduce_grouped::<V, Key>(self.key, self.access_policy)
970
18
    }
971

            
972
    /// Deletes all of the associated documents that match this view query.
973
    ///
974
    /// ```rust
975
    /// # bonsaidb_core::__doctest_prelude!();
976
    /// # use bonsaidb_core::connection::Connection;
977
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
978
    /// db.view::<ScoresByRank>().delete_docs()?;
979
    /// # Ok(())
980
    /// # }
981
    /// ```
982
6
    pub fn delete_docs(self) -> Result<u64, Error> {
983
6
        self.connection
984
6
            .delete_docs::<V, Key>(self.key, self.access_policy)
985
6
    }
986
}
987

            
988
/// This type is the result of `query()`. It is a list of mappings, which
989
/// contains:
990
///
991
/// - The key emitted during the map function.
992
/// - The value emitted during the map function.
993
/// - The source document header that the mappings originated from.
994
pub type ViewMappings<V> = Vec<Map<<V as schema::View>::Key, <V as schema::View>::Value>>;
995
/// This type is the result of `reduce_grouped()`. It is a list of all matching
996
/// keys and the reduced value of all mapped entries for that key.
997
pub type GroupedReductions<V> =
998
    Vec<MappedValue<<V as schema::View>::Key, <V as schema::View>::Value>>;
999

            
/// A connection to a database's [`Schema`](schema::Schema), giving access to
/// [`Collection`s](crate::schema::Collection) and
/// [`Views`s](crate::schema::View). All functions on this trait are safe to use
/// in an asynchronous context.
#[async_trait]
pub trait AsyncConnection: AsyncLowLevelConnection + Sized + Send + Sync {
    /// The [`AsyncStorageConnection`] type that is paired with this type.
    type Storage: AsyncStorageConnection<Database = Self>;

            
    /// Returns the [`StorageConnection`] implementor that this database belongs
    /// to.
    fn storage(&self) -> Self::Storage;

            
    /// Accesses a collection for the connected [`Schema`](schema::Schema).
6105
    fn collection<C: schema::Collection>(&self) -> AsyncCollection<'_, Self, C> {
6105
        AsyncCollection::new(self)
6105
    }

            
    /// Initializes [`View`] for [`schema::View`] `V`.
26817
    fn view<V: schema::SerializedView>(&'_ self) -> AsyncView<'_, Self, V> {
26817
        AsyncView::new(self)
26817
    }

            
    /// Lists [executed transactions](transaction::Executed) from this [`Schema`](schema::Schema). By default, a maximum of
    /// 1000 entries will be returned, but that limit can be overridden by
    /// setting `result_limit`. A hard limit of 100,000 results will be
    /// returned. To begin listing after another known `transaction_id`, pass
    /// `transaction_id + 1` into `starting_id`.
    async fn list_executed_transactions(
        &self,
        starting_id: Option<u64>,
        result_limit: Option<u32>,
    ) -> Result<Vec<transaction::Executed>, Error>;

            
    /// Fetches the last transaction id that has been committed, if any.
    async fn last_transaction_id(&self) -> Result<Option<u64>, Error>;

            
    /// Compacts the entire database to reclaim unused disk space.
    ///
    /// This process is done by writing data to a new file and swapping the file
    /// once the process completes. This ensures that if a hardware failure,
    /// power outage, or crash occurs that the original collection data is left
    /// untouched.
    ///
    /// ## Errors
    ///
    /// * [`Error::Io`]: an error occurred while compacting the database.
    async fn compact(&self) -> Result<(), crate::Error>;

            
    /// Compacts the collection to reclaim unused disk space.
    ///
    /// This process is done by writing data to a new file and swapping the file
    /// once the process completes. This ensures that if a hardware failure,
    /// power outage, or crash occurs that the original collection data is left
    /// untouched.
    ///
    /// ## Errors
    ///
    /// * [`Error::CollectionNotFound`]: database `name` does not exist.
    /// * [`Error::Io`]: an error occurred while compacting the database.
2
    async fn compact_collection<C: schema::Collection>(&self) -> Result<(), crate::Error> {
2
        self.compact_collection_by_name(C::collection_name()).await
4
    }

            
    /// Compacts the key value store to reclaim unused disk space.
    ///
    /// This process is done by writing data to a new file and swapping the file
    /// once the process completes. This ensures that if a hardware failure,
    /// power outage, or crash occurs that the original collection data is left
    /// untouched.
    ///
    /// ## Errors
    ///
    /// * [`Error::Io`]: an error occurred while compacting the database.
    async fn compact_key_value_store(&self) -> Result<(), crate::Error>;
}

            
/// Interacts with a collection over a `Connection`.
///
/// These examples in this type use this basic collection definition:
///
/// ```rust
/// use bonsaidb_core::{schema::Collection, Error};
/// use serde::{Deserialize, Serialize};
///
/// #[derive(Debug, Serialize, Deserialize, Default, Collection)]
/// #[collection(name = "MyCollection")]
/// # #[collection(core = bonsaidb_core)]
/// pub struct MyCollection {
///     pub rank: u32,
///     pub score: f32,
/// }
/// ```
pub struct AsyncCollection<'a, Cn, Cl> {
    connection: &'a Cn,
    _phantom: PhantomData<Cl>, /* allows for extension traits to be written for collections of specific types */
}

            
impl<'a, Cn, Cl> Clone for AsyncCollection<'a, Cn, Cl> {
    fn clone(&self) -> Self {
        Self {
            connection: self.connection,
            _phantom: PhantomData,
        }
    }
}

            
impl<'a, Cn, Cl> AsyncCollection<'a, Cn, Cl>
where
    Cn: AsyncConnection,
    Cl: schema::Collection,
{
    /// Creates a new instance using `connection`.
6105
    fn new(connection: &'a Cn) -> Self {
6105
        Self {
6105
            connection,
6105
            _phantom: PhantomData::default(),
6105
        }
6105
    }

            
    /// Adds a new `Document<Cl>` with the contents `item`.
    ///
    /// ## Automatic ID Assignment
    ///
    /// This function calls [`SerializedCollection::natural_id()`] to try to
    /// retrieve a primary key value from `item`. If an id is returned, the item
    /// is inserted with that id. If an id is not returned, an id will be
    /// automatically assigned, if possible, by the storage backend, which uses the [`Key`]
    /// trait to assign ids.
    ///
    /// ```rust
    /// # bonsaidb_core::__doctest_prelude!();
    /// # use bonsaidb_core::connection::AsyncConnection;
    /// # fn test_fn<C: AsyncConnection>(db: &C) -> Result<(), Error> {
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
    /// let inserted_header = db
    ///     .collection::<MyCollection>()
    ///     .push(&MyCollection::default())
    ///     .await?;
    /// println!(
    ///     "Inserted id {} with revision {}",
    ///     inserted_header.id, inserted_header.revision
    /// );
    /// # Ok(())
    /// # })
    /// # }
    /// ```
9098
    pub async fn push(
9098
        &self,
9098
        item: &<Cl as SerializedCollection>::Contents,
9098
    ) -> Result<CollectionHeader<Cl::PrimaryKey>, crate::Error>
9098
    where
9098
        Cl: schema::SerializedCollection,
9098
    {
9098
        let contents = Cl::serialize(item)?;
9098
        if let Some(natural_id) = Cl::natural_id(item) {
            self.insert_bytes(natural_id, contents).await
        } else {
12177
            self.push_bytes(contents).await
        }
9098
    }

            
    /// Adds a new `Document<Cl>` with the `contents`.
    ///
    /// ## Automatic ID Assignment
    ///
    /// An id will be automatically assigned, if possible, by the storage backend, which uses
    /// the [`Key`] trait to assign ids.
    ///
    /// ```rust
    /// # bonsaidb_core::__doctest_prelude!();
    /// # use bonsaidb_core::connection::AsyncConnection;
    /// # fn test_fn<C: AsyncConnection>(db: &C) -> Result<(), Error> {
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
    /// let inserted_header = db.collection::<MyCollection>().push_bytes(vec![]).await?;
    /// println!(
    ///     "Inserted id {} with revision {}",
    ///     inserted_header.id, inserted_header.revision
    /// );
    /// # Ok(())
    /// # })
    /// # }
    /// ```
9098
    pub async fn push_bytes<B: Into<Bytes> + Send>(
9098
        &self,
9098
        contents: B,
9098
    ) -> Result<CollectionHeader<Cl::PrimaryKey>, crate::Error> {
9098
        self.connection
12177
            .insert::<Cl, _, B>(Option::<Cl::PrimaryKey>::None, contents)
11928
            .await
9098
    }

            
    /// Adds a new `Document<Cl>` with the given `id` and contents `item`.
    ///
    /// ```rust
    /// # bonsaidb_core::__doctest_prelude!();
    /// # use bonsaidb_core::connection::AsyncConnection;
    /// # fn test_fn<C: AsyncConnection>(db: &C) -> Result<(), Error> {
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
    /// let inserted_header = db
    ///     .collection::<MyCollection>()
    ///     .insert(42, &MyCollection::default())
    ///     .await?;
    /// println!(
    ///     "Inserted id {} with revision {}",
    ///     inserted_header.id, inserted_header.revision
    /// );
    /// # Ok(())
    /// # })
    /// # }
    /// ```
1012
    pub async fn insert<PrimaryKey>(
1012
        &self,
1012
        id: PrimaryKey,
1012
        item: &<Cl as SerializedCollection>::Contents,
1012
    ) -> Result<CollectionHeader<Cl::PrimaryKey>, crate::Error>
1012
    where
1012
        Cl: schema::SerializedCollection,
1012
        PrimaryKey: for<'k> KeyEncoding<'k, Cl::PrimaryKey>,
1012
    {
1012
        let contents = Cl::serialize(item)?;
3153
        self.connection.insert::<Cl, _, _>(Some(id), contents).await
1012
    }

            
    /// Adds a new `Document<Cl>` with the the given `id` and `contents`.
    ///
    /// ```rust
    /// # bonsaidb_core::__doctest_prelude!();
    /// # use bonsaidb_core::connection::AsyncConnection;
    /// # fn test_fn<C: AsyncConnection>(db: &C) -> Result<(), Error> {
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
    /// let inserted_header = db
    ///     .collection::<MyCollection>()
    ///     .insert_bytes(42, vec![])
    ///     .await?;
    /// println!(
    ///     "Inserted id {} with revision {}",
    ///     inserted_header.id, inserted_header.revision
    /// );
    /// # Ok(())
    /// # })
    /// # }
    /// ```
    pub async fn insert_bytes<B: Into<Bytes> + Send>(
        &self,
        id: Cl::PrimaryKey,
        contents: B,
    ) -> Result<CollectionHeader<Cl::PrimaryKey>, crate::Error> {
        self.connection.insert::<Cl, _, B>(Some(id), contents).await
    }

            
    /// Updates an existing document. Upon success, `doc.revision` will be
    /// updated with the new revision.
    ///
    /// ```rust
    /// # bonsaidb_core::__doctest_prelude!();
    /// # use bonsaidb_core::connection::AsyncConnection;
    /// # fn test_fn<C: AsyncConnection>(db: &C) -> Result<(), Error> {
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
    /// if let Some(mut document) = db.collection::<MyCollection>().get(42).await? {
    ///     // modify the document
    ///     db.collection::<MyCollection>().update(&mut document);
    ///     println!("Updated revision: {:?}", document.header.revision);
    /// }
    /// # Ok(())
    /// # })
    /// # }
    /// ```
    pub async fn update<D: Document<Cl> + Send + Sync>(&self, doc: &mut D) -> Result<(), Error> {
        self.connection.update::<Cl, D>(doc).await
    }

            
    /// Overwrites an existing document, or inserts a new document. Upon success,
    /// `doc.revision` will be updated with the new revision information.
    ///
    /// ```rust
    /// # bonsaidb_core::__doctest_prelude!();
    /// # use bonsaidb_core::connection::AsyncConnection;
    /// # fn test_fn<C: AsyncConnection>(db: &C) -> Result<(), Error> {
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
    /// if let Some(mut document) = db.collection::<MyCollection>().get(42).await? {
    ///     // modify the document
    ///     db.collection::<MyCollection>().overwrite(&mut document);
    ///     println!("Updated revision: {:?}", document.header.revision);
    /// }
    /// # Ok(())
    /// # })
    /// # }
    /// ```
5
    pub async fn overwrite<D: Document<Cl> + Send + Sync>(&self, doc: &mut D) -> Result<(), Error> {
5
        let contents = doc.bytes()?;
        doc.set_collection_header(
5
            self.connection
5
                .overwrite::<Cl, _>(doc.id(), contents)
5
                .await?,
        )
5
    }

            
    /// Retrieves a `Document<Cl>` with `id` from the connection.
    ///
    /// ```rust
    /// # bonsaidb_core::__doctest_prelude!();
    /// # use bonsaidb_core::connection::AsyncConnection;
    /// # fn test_fn<C: AsyncConnection>(db: &C) -> Result<(), Error> {
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
    /// if let Some(doc) = db.collection::<MyCollection>().get(42).await? {
    ///     println!(
    ///         "Retrieved bytes {:?} with revision {}",
    ///         doc.contents, doc.header.revision
    ///     );
    ///     let deserialized = MyCollection::document_contents(&doc)?;
    ///     println!("Deserialized contents: {:?}", deserialized);
    /// }
    /// # Ok(())
    /// # })
    /// # }
    /// ```
1543
    pub async fn get<PrimaryKey>(&self, id: PrimaryKey) -> Result<Option<OwnedDocument>, Error>
1543
    where
1543
        PrimaryKey: for<'k> KeyEncoding<'k, Cl::PrimaryKey>,
1543
    {
4536
        self.connection.get::<Cl, _>(id).await
1543
    }

            
    /// Retrieves all documents matching `ids`. Documents that are not found
    /// are not returned, but no error will be generated.
    ///
    /// ```rust
    /// # bonsaidb_core::__doctest_prelude!();
    /// # use bonsaidb_core::connection::AsyncConnection;
    /// # fn test_fn<C: AsyncConnection>(db: &C) -> Result<(), Error> {
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
    /// for doc in db
    ///     .collection::<MyCollection>()
    ///     .get_multiple([42, 43])
    ///     .await?
    /// {
    ///     println!("Retrieved #{} with bytes {:?}", doc.header.id, doc.contents);
    ///     let deserialized = MyCollection::document_contents(&doc)?;
    ///     println!("Deserialized contents: {:?}", deserialized);
    /// }
    /// # Ok(())
    /// # })
    /// # }
    /// ```
10
    pub async fn get_multiple<DocumentIds, PrimaryKey, I>(
10
        &self,
10
        ids: DocumentIds,
10
    ) -> Result<Vec<OwnedDocument>, Error>
10
    where
10
        DocumentIds: IntoIterator<Item = PrimaryKey, IntoIter = I> + Send + Sync,
10
        I: Iterator<Item = PrimaryKey> + Send + Sync,
10
        PrimaryKey: for<'k> KeyEncoding<'k, Cl::PrimaryKey>,
10
    {
10
        self.connection.get_multiple::<Cl, _, _, _>(ids).await
10
    }

            
    /// Retrieves all documents matching the range of `ids`.
    ///
    /// ```rust
    /// # bonsaidb_core::__doctest_prelude!();
    /// # use bonsaidb_core::connection::AsyncConnection;
    /// # fn test_fn<C: AsyncConnection>(db: &C) -> Result<(), Error> {
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
    /// for doc in db
    ///     .collection::<MyCollection>()
    ///     .list(42..)
    ///     .descending()
    ///     .limit(20)
    ///     .await?
    /// {
    ///     println!("Retrieved #{} with bytes {:?}", doc.header.id, doc.contents);
    ///     let deserialized = MyCollection::document_contents(&doc)?;
    ///     println!("Deserialized contents: {:?}", deserialized);
    /// }
    /// # Ok(())
    /// # })
    /// # }
    /// ```
    pub fn list<PrimaryKey, R>(&'a self, ids: R) -> AsyncList<'a, Cn, Cl, PrimaryKey>
    where
        R: Into<Range<PrimaryKey>>,
        PrimaryKey: for<'k> KeyEncoding<'k, Cl::PrimaryKey>,
    {
        AsyncList::new(PossiblyOwned::Borrowed(self), ids.into())
    }

            
    /// Retrieves all documents with ids that start with `prefix`.
    ///
    /// ```rust
    /// use bonsaidb_core::{
    ///     connection::AsyncConnection,
    ///     document::OwnedDocument,
    ///     schema::{Collection, Schematic, SerializedCollection},
    ///     Error,
    /// };
    /// use serde::{Deserialize, Serialize};
    ///
    /// #[derive(Debug, Serialize, Deserialize, Default, Collection)]
    /// #[collection(name = "MyCollection", primary_key = String)]
    /// # #[collection(core = bonsaidb_core)]
    /// pub struct MyCollection;
    ///
    /// async fn starts_with_a<C: AsyncConnection>(db: &C) -> Result<Vec<OwnedDocument>, Error> {
    ///     db.collection::<MyCollection>()
    ///         .list_with_prefix(String::from("a"))
    ///         .await
    /// }
    /// ```
    pub fn list_with_prefix(
        &'a self,
        prefix: Cl::PrimaryKey,
    ) -> AsyncList<'a, Cn, Cl, Cl::PrimaryKey>
    where
        Cl::PrimaryKey: IntoPrefixRange,
    {
        AsyncList::new(PossiblyOwned::Borrowed(self), prefix.into_prefix_range())
    }

            
    /// Retrieves all documents.
    ///
    /// ```rust
    /// # bonsaidb_core::__doctest_prelude!();
    /// # use bonsaidb_core::connection::AsyncConnection;
    /// # fn test_fn<C: AsyncConnection>(db: &C) -> Result<(), Error> {
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
    /// for doc in db.collection::<MyCollection>().all().await? {
    ///     println!("Retrieved #{} with bytes {:?}", doc.header.id, doc.contents);
    ///     let deserialized = MyCollection::document_contents(&doc)?;
    ///     println!("Deserialized contents: {:?}", deserialized);
    /// }
    /// # Ok(())
    /// # })
    /// # }
    /// ```
    pub fn all(&'a self) -> AsyncList<'a, Cn, Cl, Cl::PrimaryKey> {
        AsyncList::new(PossiblyOwned::Borrowed(self), Range::from(..))
    }

            
    /// Removes a `Document` from the database.
    ///
    /// ```rust
    /// # bonsaidb_core::__doctest_prelude!();
    /// # use bonsaidb_core::connection::AsyncConnection;
    /// # fn test_fn<C: AsyncConnection>(db: &C) -> Result<(), Error> {
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
    /// if let Some(doc) = db.collection::<MyCollection>().get(42).await? {
    ///     db.collection::<MyCollection>().delete(&doc).await?;
    /// }
    /// # Ok(())
    /// # })
    /// # }
    /// ```
975
    pub async fn delete<H: HasHeader + Send + Sync>(&self, doc: &H) -> Result<(), Error> {
2149
        self.connection.delete::<Cl, H>(doc).await
975
    }
}

            
pub(crate) struct AsyncListBuilder<'a, Cn, Cl, PrimaryKey>
where
    Cl: schema::Collection,
{
    collection: PossiblyOwned<'a, AsyncCollection<'a, Cn, Cl>>,
    range: Range<PrimaryKey>,
    sort: Sort,
    limit: Option<u32>,
}

            
pub(crate) enum PossiblyOwned<'a, Cl> {
    Owned(Cl),
    Borrowed(&'a Cl),
}

            
impl<'a, Cl> Deref for PossiblyOwned<'a, Cl> {
    type Target = Cl;

            
68
    fn deref(&self) -> &Self::Target {
68
        match self {
68
            PossiblyOwned::Owned(value) => value,
            PossiblyOwned::Borrowed(value) => value,
        }
68
    }
}

            
pub(crate) enum ListState<'a, Cn, Cl, PrimaryKey>
where
    Cl: schema::Collection,
{
    Pending(Option<AsyncListBuilder<'a, Cn, Cl, PrimaryKey>>),
    Executing(BoxFuture<'a, Result<Vec<OwnedDocument>, Error>>),
}

            
/// Retrieves a list of documents from a collection, when awaited. This
/// structure also offers functions to customize the options for the operation.
#[must_use]
pub struct AsyncList<'a, Cn, Cl, PrimaryKey>
where
    Cl: schema::Collection,
{
    state: ListState<'a, Cn, Cl, PrimaryKey>,
}

            
impl<'a, Cn, Cl, PrimaryKey> AsyncList<'a, Cn, Cl, PrimaryKey>
where
    Cl: schema::Collection,
    Cn: AsyncConnection,
    PrimaryKey: for<'k> KeyEncoding<'k, Cl::PrimaryKey>,
{
35
    pub(crate) fn new(
35
        collection: PossiblyOwned<'a, AsyncCollection<'a, Cn, Cl>>,
35
        range: Range<PrimaryKey>,
35
    ) -> Self {
35
        Self {
35
            state: ListState::Pending(Some(AsyncListBuilder {
35
                collection,
35
                range,
35
                sort: Sort::Ascending,
35
                limit: None,
35
            })),
35
        }
35
    }

            
10
    fn builder(&mut self) -> &mut AsyncListBuilder<'a, Cn, Cl, PrimaryKey> {
10
        if let ListState::Pending(Some(builder)) = &mut self.state {
10
            builder
        } else {
            unreachable!("Attempted to use after retrieving the result")
        }
10
    }

            
    /// Lists documents by id in ascending order.
    pub fn ascending(mut self) -> Self {
        self.builder().sort = Sort::Ascending;
        self
    }

            
    /// Lists documents by id in descending order.
5
    pub fn descending(mut self) -> Self {
5
        self.builder().sort = Sort::Descending;
5
        self
5
    }

            
    /// Sets the maximum number of results to return.
5
    pub fn limit(mut self, maximum_results: u32) -> Self {
5
        self.builder().limit = Some(maximum_results);
5
        self
5
    }

            
    /// Returns the list of headers for documents contained within the range.
    ///
    /// ```rust
    /// # bonsaidb_core::__doctest_prelude!();
    /// # use bonsaidb_core::connection::AsyncConnection;
    /// # fn test_fn<C: AsyncConnection>(db: &C) -> Result<(), Error> {
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
    /// println!(
    ///     "Number of documents with id 42 or larger: {:?}",
    ///     db.collection::<MyCollection>().list(42..).headers().await?
    /// );
    /// println!(
    ///     "Number of documents in MyCollection: {:?}",
    ///     db.collection::<MyCollection>().all().headers().await?
    /// );
    /// # Ok(())
    /// # })
    /// # }
    /// ```
5
    pub async fn headers(self) -> Result<Vec<Header>, Error> {
5
        match self.state {
            ListState::Pending(Some(AsyncListBuilder {
5
                collection,
5
                range,
5
                sort,
5
                limit,
5
                ..
5
            })) => {
5
                collection
5
                    .connection
5
                    .list_headers::<Cl, _, _>(range, sort, limit)
5
                    .await
            }
            _ => unreachable!("Attempted to use after retrieving the result"),
        }
5
    }

            
    /// Returns the number of documents contained within the range.
    ///
    /// Order and limit are ignored if they were set.
    ///
    /// ```rust
    /// # bonsaidb_core::__doctest_prelude!();
    /// # use bonsaidb_core::connection::AsyncConnection;
    /// # fn test_fn<C: AsyncConnection>(db: &C) -> Result<(), Error> {
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
    /// println!(
    ///     "Number of documents with id 42 or larger: {}",
    ///     db.collection::<MyCollection>().list(42..).count().await?
    /// );
    /// println!(
    ///     "Number of documents in MyCollection: {}",
    ///     db.collection::<MyCollection>().all().count().await?
    /// );
    /// # Ok(())
    /// # })
    /// # }
    /// ```
10
    pub async fn count(self) -> Result<u64, Error> {
10
        match self.state {
            ListState::Pending(Some(AsyncListBuilder {
10
                collection, range, ..
10
            })) => collection.connection.count::<Cl, _, _>(range).await,
            _ => unreachable!("Attempted to use after retrieving the result"),
        }
10
    }
}

            
#[allow(clippy::type_repetition_in_bounds)]
impl<'a, Cn, Cl, PrimaryKey> Future for AsyncList<'a, Cn, Cl, PrimaryKey>
where
    Cn: AsyncConnection,
    Cl: schema::Collection + Unpin,
    Cl::PrimaryKey: Unpin,
    PrimaryKey: Unpin + for<'k> KeyEncoding<'k, Cl::PrimaryKey> + 'a,
{
    type Output = Result<Vec<OwnedDocument>, Error>;

            
60
    fn poll(
60
        mut self: std::pin::Pin<&mut Self>,
60
        cx: &mut std::task::Context<'_>,
60
    ) -> std::task::Poll<Self::Output> {
60
        match &mut self.state {
40
            ListState::Executing(future) => future.as_mut().poll(cx),
20
            ListState::Pending(builder) => {
20
                let AsyncListBuilder {
20
                    collection,
20
                    range,
20
                    sort,
20
                    limit,
20
                } = builder.take().unwrap();
20

            
20
                let future = async move {
20
                    collection
20
                        .connection
20
                        .list::<Cl, _, _>(range, sort, limit)
20
                        .await
20
                }
20
                .boxed();
20

            
20
                self.state = ListState::Executing(future);
20
                self.poll(cx)
            }
        }
60
    }
}

            
/// Parameters to query a [`schema::View`].
///
/// The examples for this type use this view definition:
///
/// ```rust
/// # mod collection {
/// # bonsaidb_core::__doctest_prelude!();
/// # }
/// # use collection::MyCollection;
/// use bonsaidb_core::{
///     define_basic_unique_mapped_view,
///     document::{CollectionDocument, Emit},
///     schema::{
///         CollectionViewSchema, DefaultViewSerialization, Name, ReduceResult, View,
///         ViewMapResult, ViewMappedValue,
///     },
/// };
///
/// #[derive(Debug, Clone, View)]
/// #[view(collection = MyCollection, key = u32, value = f32, name = "scores-by-rank")]
/// # #[view(core = bonsaidb_core)]
/// pub struct ScoresByRank;
///
/// impl CollectionViewSchema for ScoresByRank {
///     type View = Self;
///     fn map(
///         &self,
///         document: CollectionDocument<<Self::View as View>::Collection>,
///     ) -> ViewMapResult<Self::View> {
///         document
///             .header
///             .emit_key_and_value(document.contents.rank, document.contents.score)
///     }
///
///     fn reduce(
///         &self,
///         mappings: &[ViewMappedValue<Self::View>],
///         rereduce: bool,
///     ) -> ReduceResult<Self::View> {
///         if mappings.is_empty() {
///             Ok(0.)
///         } else {
///             Ok(mappings.iter().map(|map| map.value).sum::<f32>() / mappings.len() as f32)
///         }
///     }
/// }
/// ```
#[must_use]
pub struct AsyncView<'a, Cn, V: schema::SerializedView> {
    connection: &'a Cn,

            
    /// Key filtering criteria.
    pub key: Option<QueryKey<V::Key>>,

            
    /// The view's data access policy. The default value is [`AccessPolicy::UpdateBefore`].
    pub access_policy: AccessPolicy,

            
    /// The sort order of the query.
    pub sort: Sort,

            
    /// The maximum number of results to return.
    pub limit: Option<u32>,
}

            
impl<'a, Cn, V> AsyncView<'a, Cn, V>
where
    V: schema::SerializedView,
    Cn: AsyncConnection,
{
26817
    fn new(connection: &'a Cn) -> Self {
26817
        Self {
26817
            connection,
26817
            key: None,
26817
            access_policy: AccessPolicy::UpdateBefore,
26817
            sort: Sort::Ascending,
26817
            limit: None,
26817
        }
26817
    }

            
    /// Filters for entries in the view with `key`.
    ///
    /// ```rust
    /// # bonsaidb_core::__doctest_prelude!();
    /// # use bonsaidb_core::connection::AsyncConnection;
    /// # fn test_fn<C: AsyncConnection>(db: C) -> Result<(), Error> {
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
    /// // score is an f32 in this example
    /// for mapping in db.view::<ScoresByRank>().with_key(42).query().await? {
    ///     assert_eq!(mapping.key, 42);
    ///     println!("Rank {} has a score of {:3}", mapping.key, mapping.value);
    /// }
    /// # Ok(())
    /// # })
    /// # }
    /// ```
26743
    pub fn with_key(mut self, key: V::Key) -> Self {
26743
        self.key = Some(QueryKey::Matches(key));
26743
        self
26743
    }

            
    /// Filters for entries in the view with `keys`.
    ///
    /// ```rust
    /// # bonsaidb_core::__doctest_prelude!();
    /// # use bonsaidb_core::connection::AsyncConnection;
    /// # fn test_fn<C: AsyncConnection>(db: C) -> Result<(), Error> {
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
    /// // score is an f32 in this example
    /// for mapping in db
    ///     .view::<ScoresByRank>()
    ///     .with_keys([42, 43])
    ///     .query()
    ///     .await?
    /// {
    ///     println!("Rank {} has a score of {:3}", mapping.key, mapping.value);
    /// }
    /// # Ok(())
    /// # })
    /// # }
    /// ```
11
    pub fn with_keys<IntoIter: IntoIterator<Item = V::Key>>(mut self, keys: IntoIter) -> Self {
11
        self.key = Some(QueryKey::Multiple(keys.into_iter().collect()));
11
        self
11
    }

            
    /// Filters for entries in the view with the range `keys`.
    ///
    /// ```rust
    /// # bonsaidb_core::__doctest_prelude!();
    /// # use bonsaidb_core::connection::AsyncConnection;
    /// # fn test_fn<C: AsyncConnection>(db: C) -> Result<(), Error> {
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
    /// // score is an f32 in this example
    /// for mapping in db
    ///     .view::<ScoresByRank>()
    ///     .with_key_range(42..)
    ///     .query()
    ///     .await?
    /// {
    ///     assert!(mapping.key >= 42);
    ///     println!("Rank {} has a score of {:3}", mapping.key, mapping.value);
    /// }
    /// # Ok(())
    /// # })
    /// # }
    /// ```
13
    pub fn with_key_range<R: Into<Range<V::Key>>>(mut self, range: R) -> Self {
13
        self.key = Some(QueryKey::Range(range.into()));
13
        self
13
    }

            
    /// Filters for entries in the view with keys that begin with `prefix`.
    ///
    /// ```rust
    /// # bonsaidb_core::__doctest_prelude!();
    /// # use bonsaidb_core::connection::AsyncConnection;
    /// # fn test_fn<C: AsyncConnection>(db: C) -> Result<(), Error> {
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
    /// #[derive(View, Debug, Clone)]
    /// #[view(name = "by-name", key = String, collection = MyCollection)]
    /// # #[view(core = bonsaidb_core)]
    /// struct ByName;
    ///
    /// // score is an f32 in this example
    /// for mapping in db
    ///     .view::<ByName>()
    ///     .with_key_prefix(String::from("a"))
    ///     .query()
    ///     .await?
    /// {
    ///     assert!(mapping.key.starts_with("a"));
    ///     println!("{} in document {:?}", mapping.key, mapping.source);
    /// }
    /// # Ok(())
    /// # })
    /// # }
    /// ```
    pub fn with_key_prefix(mut self, prefix: V::Key) -> Self
    where
        V::Key: IntoPrefixRange,
    {
        self.key = Some(QueryKey::Range(prefix.into_prefix_range()));
        self
    }

            
    /// Sets the access policy for queries.
    ///
    /// ```rust
    /// # bonsaidb_core::__doctest_prelude!();
    /// # use bonsaidb_core::connection::AsyncConnection;