1
use std::{collections::BTreeMap, marker::PhantomData, ops::Deref};
2

            
3
use arc_bytes::serde::Bytes;
4
use async_trait::async_trait;
5
use futures::{future::BoxFuture, Future, FutureExt};
6
use serde::{Deserialize, Serialize};
7
#[cfg(feature = "multiuser")]
8
use zeroize::Zeroize;
9

            
10
#[cfg(feature = "multiuser")]
11
use crate::schema::Nameable;
12
use crate::{
13
    document::{
14
        AnyDocumentId, CollectionDocument, CollectionHeader, Document, HasHeader, OwnedDocument,
15
    },
16
    key::Key,
17
    permissions::Permissions,
18
    schema::{
19
        self,
20
        view::{self, map::MappedDocuments},
21
        Map, MappedValue, Schema, SchemaName, SerializedCollection,
22
    },
23
    transaction::{self, OperationResult, Transaction},
24
    Error,
25
};
26

            
27
/// Defines all interactions with a [`schema::Schema`], regardless of whether it
28
/// is local or remote.
29
#[async_trait]
30
pub trait Connection: Send + Sync {
31
    /// Accesses a collection for the connected [`schema::Schema`].
32
26722
    fn collection<C: schema::Collection>(&self) -> Collection<'_, Self, C>
33
26722
    where
34
26722
        Self: Sized,
35
26722
    {
36
26722
        Collection::new(self)
37
26722
    }
38

            
39
    /// Inserts a newly created document into the connected [`schema::Schema`]
40
    /// for the [`Collection`] `C`. If `id` is `None` a unique id will be
41
    /// generated. If an id is provided and a document already exists with that
42
    /// id, a conflict error will be returned.
43
    ///
44
    /// This is the lower-level API. For better ergonomics, consider using
45
    /// one of:
46
    ///
47
    /// - [`SerializedCollection::push_into()`]
48
    /// - [`SerializedCollection::insert_into()`]
49
    /// - [`self.collection::<Collection>().insert()`](Collection::insert)
50
    /// - [`self.collection::<Collection>().push()`](Collection::push)
51
30922
    async fn insert<
52
30922
        C: schema::Collection,
53
30922
        PrimaryKey: Into<AnyDocumentId<C::PrimaryKey>> + Send,
54
30922
        B: Into<Bytes> + Send,
55
30922
    >(
56
30922
        &self,
57
30922
        id: Option<PrimaryKey>,
58
30922
        contents: B,
59
30922
    ) -> Result<CollectionHeader<C::PrimaryKey>, Error> {
60
30922
        let contents = contents.into();
61
30377
        let results = self
62
            .apply_transaction(Transaction::insert(
63
30922
                C::collection_name(),
64
30922
                id.map(|id| id.into().to_document_id()).transpose()?,
65
30922
                contents,
66
35595
            ))
67
35595
            .await?;
68
30377
        if let Some(OperationResult::DocumentUpdated { header, .. }) = results.into_iter().next() {
69
30377
            CollectionHeader::try_from(header)
70
        } else {
71
            unreachable!(
72
                "apply_transaction on a single insert should yield a single DocumentUpdated entry"
73
            )
74
        }
75
61844
    }
76

            
77
    /// Updates an existing document in the connected [`schema::Schema`] for the
78
    /// [`Collection`] `C`. Upon success, `doc.revision` will be updated with
79
    /// the new revision.
80
    ///
81
    /// This is the lower-level API. For better ergonomics, consider using
82
    /// one of:
83
    ///
84
    /// - [`CollectionDocument::update()`]
85
    /// - [`self.collection::<Collection>().update()`](Collection::update)
86
4928
    async fn update<C: schema::Collection, D: Document<C> + Send + Sync>(
87
4928
        &self,
88
4928
        doc: &mut D,
89
4928
    ) -> Result<(), Error> {
90
4908
        let results = self
91
            .apply_transaction(Transaction::update(
92
4928
                C::collection_name(),
93
4928
                doc.header().into_header()?,
94
4928
                doc.bytes()?,
95
6464
            ))
96
6464
            .await?;
97
4908
        if let Some(OperationResult::DocumentUpdated { header, .. }) = results.into_iter().next() {
98
4908
            doc.set_header(header)?;
99
4908
            Ok(())
100
        } else {
101
            unreachable!(
102
                "apply_transaction on a single update should yield a single DocumentUpdated entry"
103
            )
104
        }
105
9856
    }
106

            
107
    /// Overwrites an existing document, or inserts a new document. Upon success,
108
    /// `doc.revision` will be updated with the new revision information.
109
    ///
110
    /// This is the lower-level API. For better ergonomics, consider using
111
    /// one of:
112
    ///
113
    /// - [`SerializedCollection::overwrite()`]
114
    /// - [`SerializedCollection::overwrite_into()`]
115
    /// - [`self.collection::<Collection>().overwrite()`](Collection::overwrite)
116
511
    async fn overwrite<'a, C, PrimaryKey>(
117
511
        &self,
118
511
        id: PrimaryKey,
119
511
        contents: Vec<u8>,
120
511
    ) -> Result<CollectionHeader<C::PrimaryKey>, Error>
121
511
    where
122
511
        C: schema::Collection,
123
511
        PrimaryKey: Into<AnyDocumentId<C::PrimaryKey>> + Send,
124
511
    {
125
511
        let results = self
126
            .apply_transaction(Transaction::overwrite(
127
511
                C::collection_name(),
128
511
                id.into().to_document_id()?,
129
511
                contents,
130
1555
            ))
131
1555
            .await?;
132
511
        if let Some(OperationResult::DocumentUpdated { header, .. }) = results.into_iter().next() {
133
511
            CollectionHeader::try_from(header)
134
        } else {
135
            unreachable!(
136
                "apply_transaction on a single update should yield a single DocumentUpdated entry"
137
            )
138
        }
139
1022
    }
140

            
141
    /// Retrieves a stored document from [`Collection`] `C` identified by `id`.
142
    ///
143
    /// This is the lower-level API. For better ergonomics, consider using
144
    /// one of:
145
    ///
146
    /// - [`SerializedCollection::get()`]
147
    /// - [`self.collection::<Collection>().get()`](Collection::get)
148
    async fn get<C, PrimaryKey>(&self, id: PrimaryKey) -> Result<Option<OwnedDocument>, Error>
149
    where
150
        C: schema::Collection,
151
        PrimaryKey: Into<AnyDocumentId<C::PrimaryKey>> + Send;
152

            
153
    /// Retrieves all documents matching `ids`. Documents that are not found
154
    /// are not returned, but no error will be generated.
155
    ///
156
    /// This is the lower-level API. For better ergonomics, consider using
157
    /// one of:
158
    ///
159
    /// - [`SerializedCollection::get_multiple()`]
160
    /// - [`self.collection::<Collection>().get_multiple()`](Collection::get_multiple)
161
    async fn get_multiple<C, PrimaryKey, DocumentIds, I>(
162
        &self,
163
        ids: DocumentIds,
164
    ) -> Result<Vec<OwnedDocument>, Error>
165
    where
166
        C: schema::Collection,
167
        DocumentIds: IntoIterator<Item = PrimaryKey, IntoIter = I> + Send + Sync,
168
        I: Iterator<Item = PrimaryKey> + Send + Sync,
169
        PrimaryKey: Into<AnyDocumentId<C::PrimaryKey>> + Send + Sync;
170

            
171
    /// Retrieves all documents within the range of `ids`. Documents that are
172
    /// not found are not returned, but no error will be generated. To retrieve
173
    /// all documents, pass in `..` for `ids`.
174
    ///
175
    /// This is the lower-level API. For better ergonomics, consider using
176
    /// one of:
177
    ///
178
    /// - [`SerializedCollection::all()`]
179
    /// - [`self.collection::<Collection>().all()`](Collection::all)
180
    /// - [`SerializedCollection::list()`]
181
    /// - [`self.collection::<Collection>().list()`](Collection::list)
182
    async fn list<C, R, PrimaryKey>(
183
        &self,
184
        ids: R,
185
        order: Sort,
186
        limit: Option<usize>,
187
    ) -> Result<Vec<OwnedDocument>, Error>
188
    where
189
        C: schema::Collection,
190
        R: Into<Range<PrimaryKey>> + Send,
191
        PrimaryKey: Into<AnyDocumentId<C::PrimaryKey>> + Send;
192

            
193
    /// Removes a `Document` from the database.
194
    ///
195
    /// This is the lower-level API. For better ergonomics, consider using
196
    /// one of:
197
    ///
198
    /// - [`CollectionDocument::delete()`]
199
    /// - [`self.collection::<Collection>().delete()`](Collection::delete)
200
13504
    async fn delete<C: schema::Collection, H: HasHeader + Send + Sync>(
201
13504
        &self,
202
13504
        doc: &H,
203
13504
    ) -> Result<(), Error> {
204
13504
        let results = self
205
14531
            .apply_transaction(Transaction::delete(C::collection_name(), doc.header()?))
206
13815
            .await?;
207
13504
        if let OperationResult::DocumentDeleted { .. } = &results[0] {
208
13504
            Ok(())
209
        } else {
210
            unreachable!(
211
                "apply_transaction on a single update should yield a single DocumentUpdated entry"
212
            )
213
        }
214
27008
    }
215

            
216
    /// Initializes [`View`] for [`schema::View`] `V`.
217
    #[must_use]
218
59915
    fn view<V: schema::SerializedView>(&'_ self) -> View<'_, Self, V>
219
59915
    where
220
59915
        Self: Sized,
221
59915
    {
222
59915
        View::new(self)
223
59915
    }
224

            
225
    /// Queries for view entries matching [`View`].
226
    ///
227
    /// This is the lower-level API. For better ergonomics, consider querying
228
    /// the view using [`self.view::<View>().query()`](View::query) instead.
229
    /// The parameters for the query can be customized on the builder returned
230
    /// from [`Self::view()`].
231
    #[must_use]
232
    async fn query<V: schema::SerializedView>(
233
        &self,
234
        key: Option<QueryKey<V::Key>>,
235
        order: Sort,
236
        limit: Option<usize>,
237
        access_policy: AccessPolicy,
238
    ) -> Result<Vec<Map<V::Key, V::Value>>, Error>
239
    where
240
        Self: Sized;
241

            
242
    /// Queries for view entries matching [`View`] with their source documents.
243
    ///
244
    /// This is the lower-level API. For better ergonomics, consider querying
245
    /// the view using [`self.view::<View>().query_with_docs()`](View::query_with_docs) instead.
246
    /// The parameters for the query can be customized on the builder returned
247
    /// from [`Self::view()`].
248
    #[must_use]
249
    async fn query_with_docs<V: schema::SerializedView>(
250
        &self,
251
        key: Option<QueryKey<V::Key>>,
252
        order: Sort,
253
        limit: Option<usize>,
254
        access_policy: AccessPolicy,
255
    ) -> Result<MappedDocuments<OwnedDocument, V>, Error>
256
    where
257
        Self: Sized;
258

            
259
    /// Queries for view entries matching [`View`] with their source documents, deserialized.
260
    ///
261
    /// This is the lower-level API. For better ergonomics, consider querying
262
    /// the view using [`self.view::<View>().query_with_collection_docs()`](View::query_with_collection_docs) instead.
263
    /// The parameters for the query can be customized on the builder returned
264
    /// from [`Self::view()`].
265
    #[must_use]
266
104
    async fn query_with_collection_docs<V>(
267
104
        &self,
268
104
        key: Option<QueryKey<V::Key>>,
269
104
        order: Sort,
270
104
        limit: Option<usize>,
271
104
        access_policy: AccessPolicy,
272
104
    ) -> Result<MappedDocuments<CollectionDocument<V::Collection>, V>, Error>
273
104
    where
274
104
        V: schema::SerializedView,
275
104
        V::Collection: SerializedCollection,
276
104
        <V::Collection as SerializedCollection>::Contents: std::fmt::Debug,
277
104
        Self: Sized,
278
104
    {
279
104
        let mapped_docs = self
280
180
            .query_with_docs::<V>(key, order, limit, access_policy)
281
180
            .await?;
282
104
        let mut collection_docs = BTreeMap::new();
283
211
        for (id, doc) in mapped_docs.documents {
284
107
            collection_docs.insert(id, CollectionDocument::<V::Collection>::try_from(&doc)?);
285
        }
286
104
        Ok(MappedDocuments {
287
104
            mappings: mapped_docs.mappings,
288
104
            documents: collection_docs,
289
104
        })
290
208
    }
291

            
292
    /// Reduces the view entries matching [`View`].
293
    ///
294
    /// This is the lower-level API. For better ergonomics, consider reducing
295
    /// the view using [`self.view::<View>().reduce()`](View::reduce) instead.
296
    /// The parameters for the query can be customized on the builder returned
297
    /// from [`Self::view()`].
298
    #[must_use]
299
    async fn reduce<V: schema::SerializedView>(
300
        &self,
301
        key: Option<QueryKey<V::Key>>,
302
        access_policy: AccessPolicy,
303
    ) -> Result<V::Value, Error>
304
    where
305
        Self: Sized;
306

            
307
    /// Reduces the view entries matching [`View`], reducing the values by each
308
    /// unique key.
309
    ///
310
    /// This is the lower-level API. For better ergonomics, consider reducing
311
    /// the view using
312
    /// [`self.view::<View>().reduce_grouped()`](View::reduce_grouped) instead.
313
    /// The parameters for the query can be customized on the builder returned
314
    /// from [`Self::view()`].
315
    #[must_use]
316
    async fn reduce_grouped<V: schema::SerializedView>(
317
        &self,
318
        key: Option<QueryKey<V::Key>>,
319
        access_policy: AccessPolicy,
320
    ) -> Result<Vec<MappedValue<V::Key, V::Value>>, Error>
321
    where
322
        Self: Sized;
323

            
324
    /// Deletes all of the documents associated with this view.
325
    ///
326
    /// This is the lower-level API. For better ergonomics, consider querying
327
    /// the view using [`self.view::<View>().delete_docs()`](View::delete_docs()) instead.
328
    /// The parameters for the query can be customized on the builder returned
329
    /// from [`Self::view()`].
330
    #[must_use]
331
    async fn delete_docs<V: schema::SerializedView>(
332
        &self,
333
        key: Option<QueryKey<V::Key>>,
334
        access_policy: AccessPolicy,
335
    ) -> Result<u64, Error>
336
    where
337
        Self: Sized;
338

            
339
    /// Applies a [`Transaction`] to the [`schema::Schema`]. If any operation in the
340
    /// [`Transaction`] fails, none of the operations will be applied to the
341
    /// [`schema::Schema`].
342
    async fn apply_transaction(
343
        &self,
344
        transaction: Transaction,
345
    ) -> Result<Vec<OperationResult>, Error>;
346

            
347
    /// Lists executed [`Transaction`]s from this [`schema::Schema`]. By default, a maximum of
348
    /// 1000 entries will be returned, but that limit can be overridden by
349
    /// setting `result_limit`. A hard limit of 100,000 results will be
350
    /// returned. To begin listing after another known `transaction_id`, pass
351
    /// `transaction_id + 1` into `starting_id`.
352
    async fn list_executed_transactions(
353
        &self,
354
        starting_id: Option<u64>,
355
        result_limit: Option<usize>,
356
    ) -> Result<Vec<transaction::Executed>, Error>;
357

            
358
    /// Fetches the last transaction id that has been committed, if any.
359
    async fn last_transaction_id(&self) -> Result<Option<u64>, Error>;
360

            
361
    /// Compacts the entire database to reclaim unused disk space.
362
    ///
363
    /// This process is done by writing data to a new file and swapping the file
364
    /// once the process completes. This ensures that if a hardware failure,
365
    /// power outage, or crash occurs that the original collection data is left
366
    /// untouched.
367
    ///
368
    /// ## Errors
369
    ///
370
    /// * [`Error::Io`]: an error occurred while compacting the database.
371
    async fn compact(&self) -> Result<(), crate::Error>;
372

            
373
    /// Compacts the collection to reclaim unused disk space.
374
    ///
375
    /// This process is done by writing data to a new file and swapping the file
376
    /// once the process completes. This ensures that if a hardware failure,
377
    /// power outage, or crash occurs that the original collection data is left
378
    /// untouched.
379
    ///
380
    /// ## Errors
381
    ///
382
    /// * [`Error::CollectionNotFound`]: database `name` does not exist.
383
    /// * [`Error::Io`]: an error occurred while compacting the database.
384
    async fn compact_collection<C: schema::Collection>(&self) -> Result<(), crate::Error>;
385

            
386
    /// Compacts the key value store to reclaim unused disk space.
387
    ///
388
    /// This process is done by writing data to a new file and swapping the file
389
    /// once the process completes. This ensures that if a hardware failure,
390
    /// power outage, or crash occurs that the original collection data is left
391
    /// untouched.
392
    ///
393
    /// ## Errors
394
    ///
395
    /// * [`Error::Io`]: an error occurred while compacting the database.
396
    async fn compact_key_value_store(&self) -> Result<(), crate::Error>;
397
}
398

            
399
/// Interacts with a collection over a `Connection`.
400
///
401
/// These examples in this type use this basic collection definition:
402
///
403
/// ```rust
404
/// use bonsaidb_core::{schema::Collection, Error};
405
/// use serde::{Deserialize, Serialize};
406
///
407
/// #[derive(Debug, Serialize, Deserialize, Default, Collection)]
408
/// #[collection(name = "MyCollection")]
409
/// # #[collection(core = bonsaidb_core)]
410
/// pub struct MyCollection {
411
///     pub rank: u32,
412
///     pub score: f32,
413
/// }
414
/// ```
415
pub struct Collection<'a, Cn, Cl> {
416
    connection: &'a Cn,
417
    _phantom: PhantomData<Cl>, /* allows for extension traits to be written for collections of specific types */
418
}
419

            
420
impl<'a, Cn, Cl> Clone for Collection<'a, Cn, Cl> {
421
    fn clone(&self) -> Self {
422
        Self {
423
            connection: self.connection,
424
            _phantom: PhantomData,
425
        }
426
    }
427
}
428

            
429
impl<'a, Cn, Cl> Collection<'a, Cn, Cl>
430
where
431
    Cn: Connection,
432
    Cl: schema::Collection,
433
{
434
    /// Creates a new instance using `connection`.
435
26722
    fn new(connection: &'a Cn) -> Self {
436
26722
        Self {
437
26722
            connection,
438
26722
            _phantom: PhantomData::default(),
439
26722
        }
440
26722
    }
441

            
442
    /// Adds a new `Document<Cl>` with the contents `item`.
443
    ///
444
    /// ## Automatic Id Assignment
445
    ///
446
    /// This function calls [`SerializedCollection::natural_id()`] to try to
447
    /// retrieve a primary key value from `item`. If an id is returned, the item
448
    /// is inserted with that id. If an id is not returned, an id will be
449
    /// automatically assigned, if possible, by the storage backend, which uses the [`Key`]
450
    /// trait to assign ids.
451
    ///
452
    /// ```rust
453
    /// # bonsaidb_core::__doctest_prelude!();
454
    /// # fn test_fn<C: Connection>(db: &C) -> Result<(), Error> {
455
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
456
    /// let inserted_header = db
457
    ///     .collection::<MyCollection>()
458
    ///     .push(&MyCollection::default())
459
    ///     .await?;
460
    /// println!(
461
    ///     "Inserted id {} with revision {}",
462
    ///     inserted_header.id, inserted_header.revision
463
    /// );
464
    /// # Ok(())
465
    /// # })
466
    /// # }
467
    /// ```
468
29403
    pub async fn push(
469
29403
        &self,
470
29403
        item: &<Cl as SerializedCollection>::Contents,
471
29403
    ) -> Result<CollectionHeader<Cl::PrimaryKey>, crate::Error>
472
29403
    where
473
29403
        Cl: schema::SerializedCollection,
474
29403
    {
475
29403
        let contents = Cl::serialize(item)?;
476
29403
        if let Some(natural_id) = Cl::natural_id(item) {
477
1
            self.insert_bytes(natural_id, contents).await
478
        } else {
479
32089
            self.push_bytes(contents).await
480
        }
481
29403
    }
482

            
483
    /// Adds a new `Document<Cl>` with the `contents`.
484
    ///
485
    /// ## Automatic Id Assignment
486
    ///
487
    /// An id will be automatically assigned, if possible, by the storage backend, which uses
488
    /// the [`Key`] trait to assign ids.
489
    ///
490
    /// ```rust
491
    /// # bonsaidb_core::__doctest_prelude!();
492
    /// # fn test_fn<C: Connection>(db: &C) -> Result<(), Error> {
493
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
494
    /// let inserted_header = db.collection::<MyCollection>().push_bytes(vec![]).await?;
495
    /// println!(
496
    ///     "Inserted id {} with revision {}",
497
    ///     inserted_header.id, inserted_header.revision
498
    /// );
499
    /// # Ok(())
500
    /// # })
501
    /// # }
502
    /// ```
503
29402
    pub async fn push_bytes<B: Into<Bytes> + Send>(
504
29402
        &self,
505
29402
        contents: B,
506
29402
    ) -> Result<CollectionHeader<Cl::PrimaryKey>, crate::Error>
507
29402
    where
508
29402
        Cl: schema::SerializedCollection,
509
29402
    {
510
29402
        self.connection
511
32089
            .insert::<Cl, _, B>(Option::<Cl::PrimaryKey>::None, contents)
512
31029
            .await
513
29402
    }
514

            
515
    /// Adds a new `Document<Cl>` with the given `id` and contents `item`.
516
    ///
517
    /// ```rust
518
    /// # bonsaidb_core::__doctest_prelude!();
519
    /// # fn test_fn<C: Connection>(db: &C) -> Result<(), Error> {
520
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
521
    /// let inserted_header = db
522
    ///     .collection::<MyCollection>()
523
    ///     .insert(42, &MyCollection::default())
524
    ///     .await?;
525
    /// println!(
526
    ///     "Inserted id {} with revision {}",
527
    ///     inserted_header.id, inserted_header.revision
528
    /// );
529
    /// # Ok(())
530
    /// # })
531
    /// # }
532
    /// ```
533
1013
    pub async fn insert<PrimaryKey>(
534
1013
        &self,
535
1013
        id: PrimaryKey,
536
1013
        item: &<Cl as SerializedCollection>::Contents,
537
1013
    ) -> Result<CollectionHeader<Cl::PrimaryKey>, crate::Error>
538
1013
    where
539
1013
        Cl: schema::SerializedCollection,
540
1013
        PrimaryKey: Into<AnyDocumentId<Cl::PrimaryKey>> + Send + Sync,
541
1013
    {
542
1013
        let contents = Cl::serialize(item)?;
543
3006
        self.connection.insert::<Cl, _, _>(Some(id), contents).await
544
1013
    }
545

            
546
    /// Adds a new `Document<Cl>` with the the given `id` and `contents`.
547
    ///
548
    /// ```rust
549
    /// # bonsaidb_core::__doctest_prelude!();
550
    /// # fn test_fn<C: Connection>(db: &C) -> Result<(), Error> {
551
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
552
    /// let inserted_header = db
553
    ///     .collection::<MyCollection>()
554
    ///     .insert_bytes(42, vec![])
555
    ///     .await?;
556
    /// println!(
557
    ///     "Inserted id {} with revision {}",
558
    ///     inserted_header.id, inserted_header.revision
559
    /// );
560
    /// # Ok(())
561
    /// # })
562
    /// # }
563
    /// ```
564
1
    pub async fn insert_bytes<B: Into<Bytes> + Send>(
565
1
        &self,
566
1
        id: Cl::PrimaryKey,
567
1
        contents: B,
568
1
    ) -> Result<CollectionHeader<Cl::PrimaryKey>, crate::Error>
569
1
    where
570
1
        Cl: schema::SerializedCollection,
571
1
    {
572
1
        self.connection.insert::<Cl, _, B>(Some(id), contents).await
573
1
    }
574

            
575
    /// Updates an existing document. Upon success, `doc.revision` will be
576
    /// updated with the new revision.
577
    ///
578
    /// ```rust
579
    /// # bonsaidb_core::__doctest_prelude!();
580
    /// # fn test_fn<C: Connection>(db: &C) -> Result<(), Error> {
581
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
582
    /// if let Some(mut document) = db.collection::<MyCollection>().get(42).await? {
583
    ///     // modify the document
584
    ///     db.collection::<MyCollection>().update(&mut document);
585
    ///     println!("Updated revision: {:?}", document.header.revision);
586
    /// }
587
    /// # Ok(())
588
    /// # })
589
    /// # }
590
    /// ```
591
    pub async fn update<D: Document<Cl> + Send + Sync>(&self, doc: &mut D) -> Result<(), Error> {
592
        self.connection.update::<Cl, D>(doc).await
593
    }
594

            
595
    /// Overwrites an existing document, or inserts a new document. Upon success,
596
    /// `doc.revision` will be updated with the new revision information.
597
    ///
598
    /// ```rust
599
    /// # bonsaidb_core::__doctest_prelude!();
600
    /// # fn test_fn<C: Connection>(db: &C) -> Result<(), Error> {
601
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
602
    /// if let Some(mut document) = db.collection::<MyCollection>().get(42).await? {
603
    ///     // modify the document
604
    ///     db.collection::<MyCollection>().overwrite(&mut document);
605
    ///     println!("Updated revision: {:?}", document.header.revision);
606
    /// }
607
    /// # Ok(())
608
    /// # })
609
    /// # }
610
    /// ```
611
5
    pub async fn overwrite<D: Document<Cl> + Send + Sync>(&self, doc: &mut D) -> Result<(), Error> {
612
5
        let contents = doc.bytes()?;
613
        doc.set_collection_header(
614
5
            self.connection
615
5
                .overwrite::<Cl, _>(doc.key(), contents)
616
5
                .await?,
617
        )
618
5
    }
619

            
620
    /// Retrieves a `Document<Cl>` with `id` from the connection.
621
    ///
622
    /// ```rust
623
    /// # bonsaidb_core::__doctest_prelude!();
624
    /// # fn test_fn<C: Connection>(db: &C) -> Result<(), Error> {
625
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
626
    /// if let Some(doc) = db.collection::<MyCollection>().get(42).await? {
627
    ///     println!(
628
    ///         "Retrieved bytes {:?} with revision {}",
629
    ///         doc.contents, doc.header.revision
630
    ///     );
631
    ///     let deserialized = MyCollection::document_contents(&doc)?;
632
    ///     println!("Deserialized contents: {:?}", deserialized);
633
    /// }
634
    /// # Ok(())
635
    /// # })
636
    /// # }
637
    /// ```
638
1571
    pub async fn get<PrimaryKey>(&self, id: PrimaryKey) -> Result<Option<OwnedDocument>, Error>
639
1571
    where
640
1571
        PrimaryKey: Into<AnyDocumentId<Cl::PrimaryKey>> + Send,
641
1571
    {
642
4285
        self.connection.get::<Cl, _>(id).await
643
1571
    }
644

            
645
    /// Retrieves all documents matching `ids`. Documents that are not found
646
    /// are not returned, but no error will be generated.
647
    ///
648
    /// ```rust
649
    /// # bonsaidb_core::__doctest_prelude!();
650
    /// # fn test_fn<C: Connection>(db: &C) -> Result<(), Error> {
651
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
652
    /// for doc in db
653
    ///     .collection::<MyCollection>()
654
    ///     .get_multiple([42, 43])
655
    ///     .await?
656
    /// {
657
    ///     println!("Retrieved #{} with bytes {:?}", doc.header.id, doc.contents);
658
    ///     let deserialized = MyCollection::document_contents(&doc)?;
659
    ///     println!("Deserialized contents: {:?}", deserialized);
660
    /// }
661
    /// # Ok(())
662
    /// # })
663
    /// # }
664
    /// ```
665
15
    pub async fn get_multiple<DocumentIds, PrimaryKey, I>(
666
15
        &self,
667
15
        ids: DocumentIds,
668
15
    ) -> Result<Vec<OwnedDocument>, Error>
669
15
    where
670
15
        DocumentIds: IntoIterator<Item = PrimaryKey, IntoIter = I> + Send + Sync,
671
15
        I: Iterator<Item = PrimaryKey> + Send + Sync,
672
15
        PrimaryKey: Into<AnyDocumentId<Cl::PrimaryKey>> + Send + Sync,
673
15
    {
674
15
        self.connection.get_multiple::<Cl, _, _, _>(ids).await
675
15
    }
676

            
677
    /// Retrieves all documents matching the range of `ids`.
678
    ///
679
    /// ```rust
680
    /// # bonsaidb_core::__doctest_prelude!();
681
    /// # fn test_fn<C: Connection>(db: &C) -> Result<(), Error> {
682
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
683
    /// for doc in db
684
    ///     .collection::<MyCollection>()
685
    ///     .list(42..)
686
    ///     .descending()
687
    ///     .limit(20)
688
    ///     .await?
689
    /// {
690
    ///     println!("Retrieved #{} with bytes {:?}", doc.header.id, doc.contents);
691
    ///     let deserialized = MyCollection::document_contents(&doc)?;
692
    ///     println!("Deserialized contents: {:?}", deserialized);
693
    /// }
694
    /// # Ok(())
695
    /// # })
696
    /// # }
697
    /// ```
698
    pub fn list<PrimaryKey, R>(&'a self, ids: R) -> List<'a, Cn, Cl>
699
    where
700
        R: Into<Range<PrimaryKey>>,
701
        PrimaryKey: Into<AnyDocumentId<Cl::PrimaryKey>>,
702
    {
703
        List::new(
704
            PossiblyOwned::Borrowed(self),
705
            ids.into().map(PrimaryKey::into),
706
        )
707
    }
708

            
709
    /// Retrieves all documents.
710
    ///
711
    /// ```rust
712
    /// # bonsaidb_core::__doctest_prelude!();
713
    /// # fn test_fn<C: Connection>(db: &C) -> Result<(), Error> {
714
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
715
    /// for doc in db.collection::<MyCollection>().all().await? {
716
    ///     println!("Retrieved #{} with bytes {:?}", doc.header.id, doc.contents);
717
    ///     let deserialized = MyCollection::document_contents(&doc)?;
718
    ///     println!("Deserialized contents: {:?}", deserialized);
719
    /// }
720
    /// # Ok(())
721
    /// # })
722
    /// # }
723
    /// ```
724
    pub fn all(&'a self) -> List<'a, Cn, Cl> {
725
        List::new(PossiblyOwned::Borrowed(self), Range::from(..))
726
    }
727

            
728
    /// Removes a `Document` from the database.
729
    ///
730
    /// ```rust
731
    /// # bonsaidb_core::__doctest_prelude!();
732
    /// # fn test_fn<C: Connection>(db: &C) -> Result<(), Error> {
733
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
734
    /// if let Some(doc) = db.collection::<MyCollection>().get(42).await? {
735
    ///     db.collection::<MyCollection>().delete(&doc).await?;
736
    /// }
737
    /// # Ok(())
738
    /// # })
739
    /// # }
740
    /// ```
741
927
    pub async fn delete<H: HasHeader + Send + Sync>(&self, doc: &H) -> Result<(), Error> {
742
1980
        self.connection.delete::<Cl, H>(doc).await
743
927
    }
744
}
745

            
746
pub(crate) struct ListBuilder<'a, Cn, Cl>
747
where
748
    Cl: schema::Collection,
749
{
750
    collection: PossiblyOwned<'a, Collection<'a, Cn, Cl>>,
751
    range: Range<AnyDocumentId<Cl::PrimaryKey>>,
752
    sort: Sort,
753
    limit: Option<usize>,
754
}
755

            
756
pub(crate) enum PossiblyOwned<'a, Cl> {
757
    Owned(Cl),
758
    Borrowed(&'a Cl),
759
}
760

            
761
impl<'a, Cl> Deref for PossiblyOwned<'a, Cl> {
762
    type Target = Cl;
763

            
764
20
    fn deref(&self) -> &Self::Target {
765
20
        match self {
766
20
            PossiblyOwned::Owned(value) => value,
767
            PossiblyOwned::Borrowed(value) => value,
768
        }
769
20
    }
770
}
771

            
772
pub(crate) enum ListState<'a, Cn, Cl>
773
where
774
    Cl: schema::Collection,
775
{
776
    Pending(Option<ListBuilder<'a, Cn, Cl>>),
777
    Executing(BoxFuture<'a, Result<Vec<OwnedDocument>, Error>>),
778
}
779

            
780
/// Executes [`Connection::list()`] when awaited. Also offers methods to
781
/// customize the options for the operation.
782
#[must_use]
783
pub struct List<'a, Cn, Cl>
784
where
785
    Cl: schema::Collection,
786
{
787
    state: ListState<'a, Cn, Cl>,
788
}
789

            
790
impl<'a, Cn, Cl> List<'a, Cn, Cl>
791
where
792
    Cl: schema::Collection,
793
{
794
20
    pub(crate) fn new(
795
20
        collection: PossiblyOwned<'a, Collection<'a, Cn, Cl>>,
796
20
        range: Range<AnyDocumentId<Cl::PrimaryKey>>,
797
20
    ) -> Self {
798
20
        Self {
799
20
            state: ListState::Pending(Some(ListBuilder {
800
20
                collection,
801
20
                range,
802
20
                sort: Sort::Ascending,
803
20
                limit: None,
804
20
            })),
805
20
        }
806
20
    }
807

            
808
10
    fn builder(&mut self) -> &mut ListBuilder<'a, Cn, Cl> {
809
10
        if let ListState::Pending(Some(builder)) = &mut self.state {
810
10
            builder
811
        } else {
812
            unreachable!("Attempted to use after retrieving the result")
813
        }
814
10
    }
815

            
816
    /// Lists documents by id in ascending order.
817
    pub fn ascending(mut self) -> Self {
818
        self.builder().sort = Sort::Ascending;
819
        self
820
    }
821

            
822
    /// Lists documents by id in descending order.
823
5
    pub fn descending(mut self) -> Self {
824
5
        self.builder().sort = Sort::Descending;
825
5
        self
826
5
    }
827

            
828
    /// Sets the maximum number of results to return.
829
5
    pub fn limit(mut self, maximum_results: usize) -> Self {
830
5
        self.builder().limit = Some(maximum_results);
831
5
        self
832
5
    }
833
}
834

            
835
impl<'a, Cn, Cl> Future for List<'a, Cn, Cl>
836
where
837
    Cn: Connection,
838
    Cl: schema::Collection + Unpin,
839
    Cl::PrimaryKey: Unpin,
840
{
841
    type Output = Result<Vec<OwnedDocument>, Error>;
842

            
843
60
    fn poll(
844
60
        mut self: std::pin::Pin<&mut Self>,
845
60
        cx: &mut std::task::Context<'_>,
846
60
    ) -> std::task::Poll<Self::Output> {
847
60
        match &mut self.state {
848
40
            ListState::Executing(future) => future.as_mut().poll(cx),
849
20
            ListState::Pending(builder) => {
850
20
                let ListBuilder {
851
20
                    collection,
852
20
                    range,
853
20
                    sort,
854
20
                    limit,
855
20
                } = builder.take().unwrap();
856
20

            
857
20
                let future = async move {
858
20
                    collection
859
20
                        .connection
860
20
                        .list::<Cl, _, _>(range, sort, limit)
861
20
                        .await
862
20
                }
863
20
                .boxed();
864
20

            
865
20
                self.state = ListState::Executing(future);
866
20
                self.poll(cx)
867
            }
868
        }
869
60
    }
870
}
871

            
872
/// Parameters to query a [`schema::View`].
873
///
874
/// The examples for this type use this view definition:
875
///
876
/// ```rust
877
/// # mod collection {
878
/// # bonsaidb_core::__doctest_prelude!();
879
/// # }
880
/// # use collection::MyCollection;
881
/// use bonsaidb_core::{
882
///     define_basic_unique_mapped_view,
883
///     document::{CollectionDocument, Emit},
884
///     schema::{
885
///         CollectionViewSchema, DefaultViewSerialization, Name, ReduceResult, View,
886
///         ViewMapResult, ViewMappedValue,
887
///     },
888
/// };
889
///
890
/// #[derive(Debug, Clone, View)]
891
/// #[view(collection = MyCollection, key = u32, value = f32, name = "scores-by-rank")]
892
/// # #[view(core = bonsaidb_core)]
893
/// pub struct ScoresByRank;
894
///
895
/// impl CollectionViewSchema for ScoresByRank {
896
///     type View = Self;
897
///     fn map(
898
///         &self,
899
///         document: CollectionDocument<<Self::View as View>::Collection>,
900
///     ) -> ViewMapResult<Self::View> {
901
///         document
902
///             .header
903
///             .emit_key_and_value(document.contents.rank, document.contents.score)
904
///     }
905
///
906
///     fn reduce(
907
///         &self,
908
///         mappings: &[ViewMappedValue<Self::View>],
909
///         rereduce: bool,
910
///     ) -> ReduceResult<Self::View> {
911
///         if mappings.is_empty() {
912
///             Ok(0.)
913
///         } else {
914
///             Ok(mappings.iter().map(|map| map.value).sum::<f32>() / mappings.len() as f32)
915
///         }
916
///     }
917
/// }
918
/// ```
919
pub struct View<'a, Cn, V: schema::SerializedView> {
920
    connection: &'a Cn,
921

            
922
    /// Key filtering criteria.
923
    pub key: Option<QueryKey<V::Key>>,
924

            
925
    /// The view's data access policy. The default value is [`AccessPolicy::UpdateBefore`].
926
    pub access_policy: AccessPolicy,
927

            
928
    /// The sort order of the query.
929
    pub sort: Sort,
930

            
931
    /// The maximum number of results to return.
932
    pub limit: Option<usize>,
933
}
934

            
935
impl<'a, Cn, V> View<'a, Cn, V>
936
where
937
    V: schema::SerializedView,
938
    Cn: Connection,
939
{
940
59916
    fn new(connection: &'a Cn) -> Self {
941
59916
        Self {
942
59916
            connection,
943
59916
            key: None,
944
59916
            access_policy: AccessPolicy::UpdateBefore,
945
59916
            sort: Sort::Ascending,
946
59916
            limit: None,
947
59916
        }
948
59916
    }
949

            
950
    /// Filters for entries in the view with `key`.
951
    ///
952
    /// ```rust
953
    /// # bonsaidb_core::__doctest_prelude!();
954
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
955
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
956
    /// // score is an f32 in this example
957
    /// for mapping in db.view::<ScoresByRank>().with_key(42).query().await? {
958
    ///     assert_eq!(mapping.key, 42);
959
    ///     println!("Rank {} has a score of {:3}", mapping.key, mapping.value);
960
    /// }
961
    /// # Ok(())
962
    /// # })
963
    /// # }
964
    /// ```
965
    #[must_use]
966
57254
    pub fn with_key(mut self, key: V::Key) -> Self {
967
57254
        self.key = Some(QueryKey::Matches(key));
968
57254
        self
969
57254
    }
970

            
971
    /// Filters for entries in the view with `keys`.
972
    ///
973
    /// ```rust
974
    /// # bonsaidb_core::__doctest_prelude!();
975
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
976
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
977
    /// // score is an f32 in this example
978
    /// for mapping in db
979
    ///     .view::<ScoresByRank>()
980
    ///     .with_keys([42, 43])
981
    ///     .query()
982
    ///     .await?
983
    /// {
984
    ///     println!("Rank {} has a score of {:3}", mapping.key, mapping.value);
985
    /// }
986
    /// # Ok(())
987
    /// # })
988
    /// # }
989
    /// ```
990
    #[must_use]
991
11
    pub fn with_keys<IntoIter: IntoIterator<Item = V::Key>>(mut self, keys: IntoIter) -> Self {
992
11
        self.key = Some(QueryKey::Multiple(keys.into_iter().collect()));
993
11
        self
994
11
    }
995

            
996
    /// Filters for entries in the view with the range `keys`.
997
    ///
998
    /// ```rust
999
    /// # bonsaidb_core::__doctest_prelude!();
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
    /// // score is an f32 in this example
    /// for mapping in db
    ///     .view::<ScoresByRank>()
    ///     .with_key_range(42..)
    ///     .query()
    ///     .await?
    /// {
    ///     assert!(mapping.key >= 42);
    ///     println!("Rank {} has a score of {:3}", mapping.key, mapping.value);
    /// }
    /// # Ok(())
    /// # })
    /// # }
    /// ```
    #[must_use]
13
    pub fn with_key_range<R: Into<Range<V::Key>>>(mut self, range: R) -> Self {
13
        self.key = Some(QueryKey::Range(range.into()));
13
        self
13
    }

            
    /// Sets the access policy for queries.
    ///
    /// ```rust
    /// # bonsaidb_core::__doctest_prelude!();
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
    /// // score is an f32 in this example
    /// for mapping in db
    ///     .view::<ScoresByRank>()
    ///     .with_access_policy(AccessPolicy::UpdateAfter)
    ///     .query()
    ///     .await?
    /// {
    ///     println!("Rank {} has a score of {:3}", mapping.key, mapping.value);
    /// }
    /// # Ok(())
    /// # })
    /// # }
    /// ```
15153
    pub fn with_access_policy(mut self, policy: AccessPolicy) -> Self {
15153
        self.access_policy = policy;
15153
        self
15153
    }

            
    /// Queries the view in ascending order. This is the default sorting
    /// behavior.
    ///
    /// ```rust
    /// # bonsaidb_core::__doctest_prelude!();
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
    /// // score is an f32 in this example
    /// for mapping in db.view::<ScoresByRank>().ascending().query().await? {
    ///     println!("Rank {} has a score of {:3}", mapping.key, mapping.value);
    /// }
    /// # Ok(())
    /// # })
    /// # }
    /// ```
    pub fn ascending(mut self) -> Self {
        self.sort = Sort::Ascending;
        self
    }

            
    /// Queries the view in descending order.
    ///
    /// ```rust
    /// # bonsaidb_core::__doctest_prelude!();
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
    /// // score is an f32 in this example
    /// for mapping in db.view::<ScoresByRank>().descending().query().await? {
    ///     println!("Rank {} has a score of {:3}", mapping.key, mapping.value);
    /// }
    /// # Ok(())
    /// # })
    /// # }
    /// ```
5
    pub fn descending(mut self) -> Self {
5
        self.sort = Sort::Descending;
5
        self
5
    }

            
    /// Sets the maximum number of results to return.
    ///
    /// ```rust
    /// # bonsaidb_core::__doctest_prelude!();
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
    /// // score is an f32 in this example
    /// let mappings = db.view::<ScoresByRank>().limit(10).query().await?;
    /// assert!(mappings.len() <= 10);
    /// # Ok(())
    /// # })
    /// # }
    /// ```
5
    pub fn limit(mut self, maximum_results: usize) -> Self {
5
        self.limit = Some(maximum_results);
5
        self
5
    }

            
    /// Executes the query and retrieves the results.
    ///
    /// ```rust
    /// # bonsaidb_core::__doctest_prelude!();
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
    /// // score is an f32 in this example
    /// for mapping in db.view::<ScoresByRank>().query().await? {
    ///     println!("Rank {} has a score of {:3}", mapping.key, mapping.value);
    /// }
    /// # Ok(())
    /// # })
    /// # }
    /// ```
36590
    pub async fn query(self) -> Result<Vec<Map<V::Key, V::Value>>, Error> {
36590
        self.connection
36590
            .query::<V>(self.key, self.sort, self.limit, self.access_policy)
33904
            .await
36590
    }

            
    /// Executes the query and retrieves the results with the associated [`Document`s](crate::document::OwnedDocument).
    ///
    /// ```rust
    /// # bonsaidb_core::__doctest_prelude!();
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
    /// for mapping in &db
    ///     .view::<ScoresByRank>()
    ///     .with_key_range(42..=44)
    ///     .query_with_docs()
    ///     .await?
    /// {
    ///     println!(
    ///         "Mapping from #{} with rank: {} and score: {}. Document bytes: {:?}",
    ///         mapping.document.header.id, mapping.key, mapping.value, mapping.document.contents
    ///     );
    /// }
    /// # Ok(())
    /// # })
    /// # }
    /// ```
7772
    pub async fn query_with_docs(self) -> Result<MappedDocuments<OwnedDocument, V>, Error> {
7772
        self.connection
8065
            .query_with_docs::<V>(self.key, self.sort, self.limit, self.access_policy)
8064
            .await
7772
    }

            
    /// Executes the query and retrieves the results with the associated [`CollectionDocument`s](crate::document::CollectionDocument).
    ///
    /// ```rust
    /// # bonsaidb_core::__doctest_prelude!();
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
    /// for mapping in &db
    ///     .view::<ScoresByRank>()
    ///     .with_key_range(42..=44)
    ///     .query_with_collection_docs()
    ///     .await?
    /// {
    ///     println!(
    ///         "Mapping from #{} with rank: {} and score: {}. Deserialized Contents: {:?}",
    ///         mapping.document.header.id, mapping.key, mapping.value, mapping.document.contents
    ///     );
    /// }
    /// # Ok(())
    /// # })
    /// # }
    /// ```
104
    pub async fn query_with_collection_docs(
104
        self,
104
    ) -> Result<MappedDocuments<CollectionDocument<V::Collection>, V>, Error>
104
    where
104
        V::Collection: SerializedCollection,
104
        <V::Collection as SerializedCollection>::Contents: std::fmt::Debug,
104
    {
104
        self.connection
180
            .query_with_collection_docs::<V>(self.key, self.sort, self.limit, self.access_policy)
180
            .await
104
    }

            
    /// Executes a reduce over the results of the query
    ///
    /// ```rust
    /// # bonsaidb_core::__doctest_prelude!();
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
    /// // score is an f32 in this example
    /// let score = db.view::<ScoresByRank>().reduce().await?;
    /// println!("Average score: {:3}", score);
    /// # Ok(())
    /// # })
    /// # }
    /// ```
15433
    pub async fn reduce(self) -> Result<V::Value, Error> {
15433
        self.connection
15434
            .reduce::<V>(self.key, self.access_policy)
7876
            .await
15433
    }

            
    /// Executes a reduce over the results of the query, grouping by key.
    ///
    /// ```rust
    /// # bonsaidb_core::__doctest_prelude!();
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
    /// // score is an f32 in this example
    /// for mapping in db.view::<ScoresByRank>().reduce_grouped().await? {
    ///     println!(
    ///         "Rank {} has an average score of {:3}",
    ///         mapping.key, mapping.value
    ///     );
    /// }
    /// # Ok(())
    /// # })
    /// # }
    /// ```
12
    pub async fn reduce_grouped(self) -> Result<Vec<MappedValue<V::Key, V::Value>>, Error> {
12
        self.connection
15
            .reduce_grouped::<V>(self.key, self.access_policy)
12
            .await
12
    }

            
    /// Deletes all of the associated documents that match this view query.
    ///
    /// ```rust
    /// # bonsaidb_core::__doctest_prelude!();
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
    /// db.view::<ScoresByRank>().delete_docs().await?;
    /// # Ok(())
    /// # })
    /// # }
    /// ```
5
    pub async fn delete_docs(self) -> Result<u64, Error> {
5
        self.connection
8
            .delete_docs::<V>(self.key, self.access_policy)
8
            .await
5
    }
}

            
/// A sort order.
7796
#[derive(Clone, Copy, Serialize, Deserialize, Debug)]
pub enum Sort {
    /// Sort ascending (A -> Z).
    Ascending,
    /// Sort descending (Z -> A).
    Descending,
}

            
/// Filters a [`View`] by key.
23180
#[derive(Clone, Serialize, Deserialize, Debug)]
pub enum QueryKey<K> {
    /// Matches all entries with the key provided.
    Matches(K),

            
    /// Matches all entires with keys in the range provided.
    Range(Range<K>),

            
    /// Matches all entries that have keys that are included in the set provided.
    Multiple(Vec<K>),
}

            
#[allow(clippy::use_self)] // clippy is wrong, Self is different because of generic parameters
impl<K: for<'a> Key<'a>> QueryKey<K> {
    /// Converts this key to a serialized format using the [`Key`] trait.
249302
    pub fn serialized(&self) -> Result<QueryKey<Bytes>, Error> {
249302
        match self {
249278
            Self::Matches(key) => key
249278
                .as_ord_bytes()
249278
                .map_err(|err| Error::Database(view::Error::key_serialization(err).to_string()))
249278
                .map(|v| QueryKey::Matches(Bytes::from(v.to_vec()))),
13
            Self::Range(range) => Ok(QueryKey::Range(range.as_ord_bytes().map_err(|err| {
                Error::Database(view::Error::key_serialization(err).to_string())
13
            })?)),
11
            Self::Multiple(keys) => {
11
                let keys = keys
11
                    .iter()
22
                    .map(|key| {
22
                        key.as_ord_bytes()
22
                            .map(|key| Bytes::from(key.to_vec()))
22
                            .map_err(|err| {
                                Error::Database(view::Error::key_serialization(err).to_string())
22
                            })
22
                    })
11
                    .collect::<Result<Vec<_>, Error>>()?;

            
11
                Ok(QueryKey::Multiple(keys))
            }
        }
249302
    }
}

            
#[allow(clippy::use_self)] // clippy is wrong, Self is different because of generic parameters
impl<'a, T> QueryKey<T>
where
    T: AsRef<[u8]>,
{
    /// Deserializes the bytes into `K` via the [`Key`] trait.
    pub fn deserialized<K: for<'k> Key<'k>>(&self) -> Result<QueryKey<K>, Error> {
        match self {
            Self::Matches(key) => K::from_ord_bytes(key.as_ref())
                .map_err(|err| Error::Database(view::Error::key_serialization(err).to_string()))
                .map(QueryKey::Matches),
            Self::Range(range) => Ok(QueryKey::Range(range.deserialize().map_err(|err| {
                Error::Database(view::Error::key_serialization(err).to_string())
            })?)),
            Self::Multiple(keys) => {
                let keys = keys
                    .iter()
                    .map(|key| {
                        K::from_ord_bytes(key.as_ref()).map_err(|err| {
                            Error::Database(view::Error::key_serialization(err).to_string())
                        })
                    })
                    .collect::<Result<Vec<_>, Error>>()?;

            
                Ok(QueryKey::Multiple(keys))
            }
        }
    }
}

            
/// A range type that can represent all std range types and be serialized.
24
#[derive(Serialize, Deserialize, Debug, Copy, Clone, Eq, PartialEq)]
pub struct Range<T> {
    /// The start of the range.
    pub start: Bound<T>,
    /// The end of the range.
    pub end: Bound<T>,
}

            
/// A range bound that can be serialized.
48
#[derive(Serialize, Deserialize, Debug, Copy, Clone, Eq, PartialEq)]
pub enum Bound<T> {
    /// No bound.
    Unbounded,
    /// Bounded by the contained value (inclusive).
    Included(T),
    /// Bounded by the contained value (exclusive).
    Excluded(T),
}

            
impl<T> Range<T> {
    /// Maps each contained value with the function provided.
15
    pub fn map<U, F: Fn(T) -> U>(self, map: F) -> Range<U> {
15
        Range {
15
            start: self.start.map(&map),
15
            end: self.end.map(&map),
15
        }
15
    }
    /// Maps each contained value with the function provided. The callback's
    /// return type is a Result, unlike with `map`.
20
    pub fn map_result<U, E, F: Fn(T) -> Result<U, E>>(self, map: F) -> Result<Range<U>, E> {
20
        Ok(Range {
20
            start: self.start.map_result(&map)?,
20
            end: self.end.map_result(&map)?,
        })
20
    }

            
    /// Maps each contained value as a reference.
229
    pub fn map_ref<U: ?Sized, F: Fn(&T) -> &U>(&self, map: F) -> Range<&U> {
229
        Range {
229
            start: self.start.map_ref(&map),
229
            end: self.end.map_ref(&map),
229
        }
229
    }
}

            
impl<'a, T: Key<'a>> Range<T> {
    /// Serializes the range's contained values to big-endian bytes.
242
    pub fn as_ord_bytes(&'a self) -> Result<Range<Bytes>, T::Error> {
242
        Ok(Range {
242
            start: self.start.as_ord_bytes()?,
242
            end: self.end.as_ord_bytes()?,
        })
242
    }
}

            
impl<'a, B> Range<B>
where
    B: AsRef<[u8]>,
{
    /// Deserializes the range's contained values from big-endian bytes.
    pub fn deserialize<T: for<'k> Key<'k>>(&'a self) -> Result<Range<T>, <T as Key<'_>>::Error> {
        Ok(Range {
            start: self.start.deserialize()?,
            end: self.start.deserialize()?,
        })
    }
}

            
impl<T> Bound<T> {
    /// Maps the contained value, if any, and returns the resulting `Bound`.
30
    pub fn map<U, F: Fn(T) -> U>(self, map: F) -> Bound<U> {
30
        match self {
            Bound::Unbounded => Bound::Unbounded,
25
            Bound::Included(value) => Bound::Included(map(value)),
5
            Bound::Excluded(value) => Bound::Excluded(map(value)),
        }
30
    }

            
    /// Maps the contained value with the function provided. The callback's
    /// return type is a Result, unlike with `map`.
40
    pub fn map_result<U, E, F: Fn(T) -> Result<U, E>>(self, map: F) -> Result<Bound<U>, E> {
40
        Ok(match self {
10
            Bound::Unbounded => Bound::Unbounded,
25
            Bound::Included(value) => Bound::Included(map(value)?),
5
            Bound::Excluded(value) => Bound::Excluded(map(value)?),
        })
40
    }

            
    /// Maps each contained value as a reference.
458
    pub fn map_ref<U: ?Sized, F: Fn(&T) -> &U>(&self, map: F) -> Bound<&U> {
458
        match self {
            Bound::Unbounded => Bound::Unbounded,
383
            Bound::Included(value) => Bound::Included(map(value)),
75
            Bound::Excluded(value) => Bound::Excluded(map(value)),
        }
458
    }
}

            
impl<'a, T: Key<'a>> Bound<T> {
    /// Serializes the contained value to big-endian bytes.
484
    pub fn as_ord_bytes(&'a self) -> Result<Bound<Bytes>, T::Error> {
484
        match self {
            Bound::Unbounded => Ok(Bound::Unbounded),
406
            Bound::Included(value) => {
406
                Ok(Bound::Included(Bytes::from(value.as_ord_bytes()?.to_vec())))
            }
78
            Bound::Excluded(value) => {
78
                Ok(Bound::Excluded(Bytes::from(value.as_ord_bytes()?.to_vec())))
            }
        }
484
    }
}

            
impl<'a, B> Bound<B>
where
    B: AsRef<[u8]>,
{
    /// Deserializes the bound's contained value from big-endian bytes.
    pub fn deserialize<T: for<'k> Key<'k>>(&'a self) -> Result<Bound<T>, <T as Key<'_>>::Error> {
        match self {
            Bound::Unbounded => Ok(Bound::Unbounded),
            Bound::Included(value) => Ok(Bound::Included(T::from_ord_bytes(value.as_ref())?)),
            Bound::Excluded(value) => Ok(Bound::Excluded(T::from_ord_bytes(value.as_ref())?)),
        }
    }
}

            
impl<T> std::ops::RangeBounds<T> for Range<T> {
228841
    fn start_bound(&self) -> std::ops::Bound<&T> {
228841
        std::ops::Bound::from(&self.start)
228841
    }

            
202355
    fn end_bound(&self) -> std::ops::Bound<&T> {
202355
        std::ops::Bound::from(&self.end)
202355
    }
}

            
impl<'a, T> From<&'a Bound<T>> for std::ops::Bound<&'a T> {
431196
    fn from(bound: &'a Bound<T>) -> Self {
431196
        match bound {
331175
            Bound::Unbounded => std::ops::Bound::Unbounded,
98096
            Bound::Included(value) => std::ops::Bound::Included(value),
1925
            Bound::Excluded(value) => std::ops::Bound::Excluded(value),
        }
431196
    }
}

            
impl<T> From<std::ops::Range<T>> for Range<T> {
8
    fn from(range: std::ops::Range<T>) -> Self {
8
        Self {
8
            start: Bound::Included(range.start),
8
            end: Bound::Excluded(range.end),
8
        }
8
    }
}

            
impl<T> From<std::ops::RangeFrom<T>> for Range<T> {
13449
    fn from(range: std::ops::RangeFrom<T>) -> Self {
13449
        Self {
13449
            start: Bound::Included(range.start),
13449
            end: Bound::Unbounded,
13449
        }
13449
    }
}

            
impl<T> From<std::ops::RangeTo<T>> for Range<T> {
    fn from(range: std::ops::RangeTo<T>) -> Self {
        Self {
            start: Bound::Unbounded,
            end: Bound::Excluded(range.end),
        }
    }
}

            
impl<T: Clone> From<std::ops::RangeInclusive<T>> for Range<T> {
20
    fn from(range: std::ops::RangeInclusive<T>) -> Self {
20
        Self {
20
            start: Bound::Included(range.start().clone()),
20
            end: Bound::Included(range.end().clone()),
20
        }
20
    }
}

            
impl<T> From<std::ops::RangeToInclusive<T>> for Range<T> {
    fn from(range: std::ops::RangeToInclusive<T>) -> Self {
        Self {
            start: Bound::Unbounded,
            end: Bound::Included(range.end),
        }
    }
}

            
impl<T> From<std::ops::RangeFull> for Range<T> {
12927
    fn from(_: std::ops::RangeFull) -> Self {
12927
        Self {
12927
            start: Bound::Unbounded,
12927
            end: Bound::Unbounded,
12927
        }
12927
    }
}

            
/// Changes how the view's outdated data will be treated.
23214
#[derive(Clone, Serialize, Deserialize, Debug)]
pub enum AccessPolicy {
    /// Update any changed documents before returning a response.
    UpdateBefore,

            
    /// Return the results, which may be out-of-date, and start an update job in
    /// the background. This pattern is useful when you want to ensure you
    /// provide consistent response times while ensuring the database is
    /// updating in the background.
    UpdateAfter,

            
    /// Returns the results, which may be out-of-date, and do not start any
    /// background jobs. This mode is useful if you're using a view as a cache
    /// and have a background process that is responsible for controlling when
    /// data is refreshed and updated. While the default `UpdateBefore`
    /// shouldn't have much overhead, this option removes all overhead related
    /// to view updating from the query.
    NoUpdate,
}

            
/// Functions for interacting with a multi-database BonsaiDb instance.
#[async_trait]
pub trait StorageConnection: Send + Sync {
    /// The type that represents a database for this implementation.
    type Database: Connection;

            
    /// Creates a database named `name` with the `Schema` provided.
    ///
    /// ## Errors
    ///
    /// * [`Error::InvalidDatabaseName`]: `name` must begin with an alphanumeric
    ///   character (`[a-zA-Z0-9]`), and all remaining characters must be
    ///   alphanumeric, a period (`.`), or a hyphen (`-`).
    /// * [`Error::DatabaseNameAlreadyTaken`]: `name` was already used for a
    ///   previous database name. Database names are case insensitive. Returned
    ///   if `only_if_needed` is false.
984
    async fn create_database<DB: Schema>(
984
        &self,
984
        name: &str,
984
        only_if_needed: bool,
984
    ) -> Result<(), crate::Error> {
2544
        self.create_database_with_schema(name, DB::schema_name(), only_if_needed)
2542
            .await
1967
    }

            
    /// Returns a reference to database `name` with schema `DB`.
    async fn database<DB: Schema>(&self, name: &str) -> Result<Self::Database, crate::Error>;

            
    /// Creates a database named `name` using the [`SchemaName`] `schema`.
    ///
    /// ## Errors
    ///
    /// * [`Error::InvalidDatabaseName`]: `name` must begin with an alphanumeric
    ///   character (`[a-zA-Z0-9]`), and all remaining characters must be
    ///   alphanumeric, a period (`.`), or a hyphen (`-`).
    /// * [`Error::DatabaseNameAlreadyTaken`]: `name` was already used for a
    ///   previous database name. Database names are case insensitive. Returned
    ///   if `only_if_needed` is false.
    async fn create_database_with_schema(
        &self,
        name: &str,
        schema: SchemaName,
        only_if_needed: bool,
    ) -> Result<(), crate::Error>;

            
    /// Deletes a database named `name`.
    ///
    /// ## Errors
    ///
    /// * [`Error::DatabaseNotFound`]: database `name` does not exist.
    /// * [`Error::Io`]: an error occurred while deleting files.
    async fn delete_database(&self, name: &str) -> Result<(), crate::Error>;

            
    /// Lists the databases in this storage.
    async fn list_databases(&self) -> Result<Vec<Database>, crate::Error>;

            
    /// Lists the [`SchemaName`]s registered with this storage.
    async fn list_available_schemas(&self) -> Result<Vec<SchemaName>, crate::Error>;

            
    /// Creates a user.
    #[cfg(feature = "multiuser")]
    async fn create_user(&self, username: &str) -> Result<u64, crate::Error>;

            
    /// Sets a user's password.
    #[cfg(feature = "password-hashing")]
    async fn set_user_password<'user, U: Nameable<'user, u64> + Send + Sync>(
        &self,
        user: U,
        password: SensitiveString,
    ) -> Result<(), crate::Error>;

            
    /// Authenticates as a user with a authentication method.
    #[cfg(all(feature = "multiuser", feature = "password-hashing"))]
    async fn authenticate<'user, U: Nameable<'user, u64> + Send + Sync>(
        &self,
        user: U,
        authentication: Authentication,
    ) -> Result<Authenticated, crate::Error>;

            
    /// Adds a user to a permission group.
    #[cfg(feature = "multiuser")]
    async fn add_permission_group_to_user<
        'user,
        'group,
        U: Nameable<'user, u64> + Send + Sync,
        G: Nameable<'group, u64> + Send + Sync,
    >(
        &self,
        user: U,
        permission_group: G,
    ) -> Result<(), crate::Error>;

            
    /// Removes a user from a permission group.
    #[cfg(feature = "multiuser")]
    async fn remove_permission_group_from_user<
        'user,
        'group,
        U: Nameable<'user, u64> + Send + Sync,
        G: Nameable<'group, u64> + Send + Sync,
    >(
        &self,
        user: U,
        permission_group: G,
    ) -> Result<(), crate::Error>;

            
    /// Adds a user to a permission group.
    #[cfg(feature = "multiuser")]
    async fn add_role_to_user<
        'user,
        'role,
        U: Nameable<'user, u64> + Send + Sync,
        R: Nameable<'role, u64> + Send + Sync,
    >(
        &self,
        user: U,
        role: R,
    ) -> Result<(), crate::Error>;

            
    /// Removes a user from a permission group.
    #[cfg(feature = "multiuser")]
    async fn remove_role_from_user<
        'user,
        'role,
        U: Nameable<'user, u64> + Send + Sync,
        R: Nameable<'role, u64> + Send + Sync,
    >(
        &self,
        user: U,
        role: R,
    ) -> Result<(), crate::Error>;
}

            
/// A database stored in BonsaiDb.
1288
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
pub struct Database {
    /// The name of the database.
    pub name: String,
    /// The schema defining the database.
    pub schema: SchemaName,
}

            
/// A plain-text password. This struct automatically overwrites the password
/// with zeroes when dropped.
#[cfg(feature = "multiuser")]
1976
#[derive(Clone, Serialize, Deserialize, Zeroize)]
#[zeroize(drop)]
#[serde(transparent)]
pub struct SensitiveString(pub String);

            
#[cfg(feature = "multiuser")]
impl std::fmt::Debug for SensitiveString {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.write_str("SensitiveString(...)")
    }
}

            
#[cfg(feature = "multiuser")]
impl Deref for SensitiveString {
    type Target = String;

            
260
    fn deref(&self) -> &Self::Target {
260
        &self.0
260
    }
}

            
/// User authentication methods.
10
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum Authentication {
    /// Authenticate using a password.
    #[cfg(feature = "password-hashing")]
    Password(crate::connection::SensitiveString),
}

            
/// Information about the authenticated session.
10
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Authenticated {
    /// The user id logged in as.
    pub user_id: u64,
    /// The effective permissions granted.
    pub permissions: Permissions,
}

            
#[doc(hidden)]
#[macro_export]
macro_rules! __doctest_prelude {
    () => {
        use bonsaidb_core::{
            connection::{AccessPolicy, Connection},
            define_basic_unique_mapped_view,
            document::{CollectionDocument,Emit, Document, OwnedDocument},
            schema::{

            
                Collection, CollectionName, CollectionViewSchema, DefaultSerialization,
                DefaultViewSerialization, Name, NamedCollection, ReduceResult, Schema, SchemaName,
                Schematic, SerializedCollection, View, ViewMapResult, ViewMappedValue,
            },
            Error,
        };
        use serde::{Deserialize, Serialize};

            
        #[derive(Debug, Schema)]
        #[schema(name = "MySchema", collections = [MyCollection], core = $crate)]
        pub struct MySchema;

            
        #[derive( Debug, Serialize, Deserialize, Default, Collection)]
        #[collection(name = "MyCollection", views = [MyCollectionByName], core = $crate)]
        pub struct MyCollection {
            pub name: String,
            pub rank: u32,
            pub score: f32,
        }

            
        impl MyCollection {
            pub fn named(s: impl Into<String>) -> Self {
                Self::new(s, 0, 0.)
            }

            
            pub fn new(s: impl Into<String>, rank: u32, score: f32) -> Self {
                Self {
                    name: s.into(),
                    rank,
                    score,
                }
            }
        }

            
        impl NamedCollection for MyCollection {
            type ByNameView = MyCollectionByName;
        }

            
        #[derive(Debug, Clone, View)]
        #[view(collection = MyCollection, key = u32, value = f32, name = "scores-by-rank", core = $crate)]
        pub struct ScoresByRank;

            
        impl CollectionViewSchema for ScoresByRank {
            type View = Self;
            fn map(
                &self,
                document: CollectionDocument<<Self::View as View>::Collection>,
            ) -> ViewMapResult<Self::View> {
                document
                    .header
                    .emit_key_and_value(document.contents.rank, document.contents.score)
            }

            
            fn reduce(
                &self,
                mappings: &[ViewMappedValue<Self::View>],
                rereduce: bool,
            ) -> ReduceResult<Self::View> {
                if mappings.is_empty() {
                    Ok(0.)
                } else {
                    Ok(mappings.iter().map(|map| map.value).sum::<f32>() / mappings.len() as f32)
                }
            }
        }

            
        define_basic_unique_mapped_view!(
            MyCollectionByName,
            MyCollection,
            1,
            "by-name",
            String,
            (),
            |document: CollectionDocument<MyCollection>| {
                document.header.emit_key(document.contents.name.clone())
            },
        );
    };
}