1
use std::{collections::BTreeMap, marker::PhantomData, ops::Deref};
2

            
3
use arc_bytes::serde::Bytes;
4
use async_trait::async_trait;
5
use futures::{future::BoxFuture, Future, FutureExt};
6
use serde::{Deserialize, Serialize};
7
#[cfg(feature = "multiuser")]
8
use zeroize::Zeroize;
9

            
10
#[cfg(feature = "multiuser")]
11
use crate::schema::Nameable;
12
use crate::{
13
    document::{
14
        AnyDocumentId, CollectionDocument, CollectionHeader, Document, HasHeader, OwnedDocument,
15
    },
16
    key::{IntoPrefixRange, Key},
17
    permissions::Permissions,
18
    schema::{
19
        self,
20
        view::{self, map::MappedDocuments},
21
        Map, MappedValue, Schema, SchemaName, SerializedCollection,
22
    },
23
    transaction::{self, OperationResult, Transaction},
24
    Error,
25
};
26

            
27
/// Defines all interactions with a [`schema::Schema`], regardless of whether it
28
/// is local or remote.
29
#[async_trait]
30
pub trait Connection: Send + Sync {
31
    /// Accesses a collection for the connected [`schema::Schema`].
32
27666
    fn collection<C: schema::Collection>(&self) -> Collection<'_, Self, C>
33
27666
    where
34
27666
        Self: Sized,
35
27666
    {
36
27666
        Collection::new(self)
37
27666
    }
38

            
39
    /// Inserts a newly created document into the connected [`schema::Schema`]
40
    /// for the [`Collection`] `C`. If `id` is `None` a unique id will be
41
    /// generated. If an id is provided and a document already exists with that
42
    /// id, a conflict error will be returned.
43
    ///
44
    /// This is the lower-level API. For better ergonomics, consider using
45
    /// one of:
46
    ///
47
    /// - [`SerializedCollection::push_into()`]
48
    /// - [`SerializedCollection::insert_into()`]
49
    /// - [`self.collection::<Collection>().insert()`](Collection::insert)
50
    /// - [`self.collection::<Collection>().push()`](Collection::push)
51
31758
    async fn insert<
52
31758
        C: schema::Collection,
53
31758
        PrimaryKey: Into<AnyDocumentId<C::PrimaryKey>> + Send,
54
31758
        B: Into<Bytes> + Send,
55
31758
    >(
56
31758
        &self,
57
31758
        id: Option<PrimaryKey>,
58
31758
        contents: B,
59
31758
    ) -> Result<CollectionHeader<C::PrimaryKey>, Error> {
60
31758
        let contents = contents.into();
61
31206
        let results = self
62
            .apply_transaction(Transaction::insert(
63
31758
                C::collection_name(),
64
31758
                id.map(|id| id.into().to_document_id()).transpose()?,
65
31758
                contents,
66
38037
            ))
67
38037
            .await?;
68
31206
        if let Some(OperationResult::DocumentUpdated { header, .. }) = results.into_iter().next() {
69
31206
            CollectionHeader::try_from(header)
70
        } else {
71
            unreachable!(
72
                "apply_transaction on a single insert should yield a single DocumentUpdated entry"
73
            )
74
        }
75
63516
    }
76

            
77
    /// Updates an existing document in the connected [`schema::Schema`] for the
78
    /// [`Collection`] `C`. Upon success, `doc.revision` will be updated with
79
    /// the new revision.
80
    ///
81
    /// This is the lower-level API. For better ergonomics, consider using
82
    /// one of:
83
    ///
84
    /// - [`CollectionDocument::update()`]
85
    /// - [`self.collection::<Collection>().update()`](Collection::update)
86
5048
    async fn update<C: schema::Collection, D: Document<C> + Send + Sync>(
87
5048
        &self,
88
5048
        doc: &mut D,
89
5048
    ) -> Result<(), Error> {
90
5028
        let results = self
91
            .apply_transaction(Transaction::update(
92
5048
                C::collection_name(),
93
5048
                doc.header().into_header()?,
94
5048
                doc.bytes()?,
95
6928
            ))
96
6928
            .await?;
97
5028
        if let Some(OperationResult::DocumentUpdated { header, .. }) = results.into_iter().next() {
98
5028
            doc.set_header(header)?;
99
5028
            Ok(())
100
        } else {
101
            unreachable!(
102
                "apply_transaction on a single update should yield a single DocumentUpdated entry"
103
            )
104
        }
105
10096
    }
106

            
107
    /// Overwrites an existing document, or inserts a new document. Upon success,
108
    /// `doc.revision` will be updated with the new revision information.
109
    ///
110
    /// This is the lower-level API. For better ergonomics, consider using
111
    /// one of:
112
    ///
113
    /// - [`SerializedCollection::overwrite()`]
114
    /// - [`SerializedCollection::overwrite_into()`]
115
    /// - [`self.collection::<Collection>().overwrite()`](Collection::overwrite)
116
511
    async fn overwrite<'a, C, PrimaryKey>(
117
511
        &self,
118
511
        id: PrimaryKey,
119
511
        contents: Vec<u8>,
120
511
    ) -> Result<CollectionHeader<C::PrimaryKey>, Error>
121
511
    where
122
511
        C: schema::Collection,
123
511
        PrimaryKey: Into<AnyDocumentId<C::PrimaryKey>> + Send,
124
511
    {
125
511
        let results = self
126
            .apply_transaction(Transaction::overwrite(
127
511
                C::collection_name(),
128
511
                id.into().to_document_id()?,
129
511
                contents,
130
1814
            ))
131
1814
            .await?;
132
511
        if let Some(OperationResult::DocumentUpdated { header, .. }) = results.into_iter().next() {
133
511
            CollectionHeader::try_from(header)
134
        } else {
135
            unreachable!(
136
                "apply_transaction on a single update should yield a single DocumentUpdated entry"
137
            )
138
        }
139
1022
    }
140

            
141
    /// Retrieves a stored document from [`Collection`] `C` identified by `id`.
142
    ///
143
    /// This is the lower-level API. For better ergonomics, consider using
144
    /// one of:
145
    ///
146
    /// - [`SerializedCollection::get()`]
147
    /// - [`self.collection::<Collection>().get()`](Collection::get)
148
    async fn get<C, PrimaryKey>(&self, id: PrimaryKey) -> Result<Option<OwnedDocument>, Error>
149
    where
150
        C: schema::Collection,
151
        PrimaryKey: Into<AnyDocumentId<C::PrimaryKey>> + Send;
152

            
153
    /// Retrieves all documents matching `ids`. Documents that are not found
154
    /// are not returned, but no error will be generated.
155
    ///
156
    /// This is the lower-level API. For better ergonomics, consider using
157
    /// one of:
158
    ///
159
    /// - [`SerializedCollection::get_multiple()`]
160
    /// - [`self.collection::<Collection>().get_multiple()`](Collection::get_multiple)
161
    async fn get_multiple<C, PrimaryKey, DocumentIds, I>(
162
        &self,
163
        ids: DocumentIds,
164
    ) -> Result<Vec<OwnedDocument>, Error>
165
    where
166
        C: schema::Collection,
167
        DocumentIds: IntoIterator<Item = PrimaryKey, IntoIter = I> + Send + Sync,
168
        I: Iterator<Item = PrimaryKey> + Send + Sync,
169
        PrimaryKey: Into<AnyDocumentId<C::PrimaryKey>> + Send + Sync;
170

            
171
    /// Retrieves all documents within the range of `ids`. To retrieve all
172
    /// documents, pass in `..` for `ids`.
173
    ///
174
    /// This is the lower-level API. For better ergonomics, consider using one
175
    /// of:
176
    ///
177
    /// - [`SerializedCollection::all()`]
178
    /// - [`self.collection::<Collection>().all()`](Collection::all)
179
    /// - [`SerializedCollection::list()`]
180
    /// - [`self.collection::<Collection>().list()`](Collection::list)
181
    async fn list<C, R, PrimaryKey>(
182
        &self,
183
        ids: R,
184
        order: Sort,
185
        limit: Option<usize>,
186
    ) -> Result<Vec<OwnedDocument>, Error>
187
    where
188
        C: schema::Collection,
189
        R: Into<Range<PrimaryKey>> + Send,
190
        PrimaryKey: Into<AnyDocumentId<C::PrimaryKey>> + Send;
191

            
192
    /// Counts the number of documents within the range of `ids`.
193
    ///
194
    /// This is the lower-level API. For better ergonomics, consider using
195
    /// one of:
196
    ///
197
    /// - [`SerializedCollection::all().count()`](schema::List::count)
198
    /// - [`self.collection::<Collection>().all().count()`](List::count)
199
    /// - [`SerializedCollection::list().count()`](schema::List::count)
200
    /// - [`self.collection::<Collection>().list().count()`](List::count)
201
    async fn count<C, R, PrimaryKey>(&self, ids: R) -> Result<u64, Error>
202
    where
203
        C: schema::Collection,
204
        R: Into<Range<PrimaryKey>> + Send,
205
        PrimaryKey: Into<AnyDocumentId<C::PrimaryKey>> + Send;
206

            
207
    /// Removes a `Document` from the database.
208
    ///
209
    /// This is the lower-level API. For better ergonomics, consider using
210
    /// one of:
211
    ///
212
    /// - [`CollectionDocument::delete()`]
213
    /// - [`self.collection::<Collection>().delete()`](Collection::delete)
214
13975
    async fn delete<C: schema::Collection, H: HasHeader + Send + Sync>(
215
13975
        &self,
216
13975
        doc: &H,
217
13975
    ) -> Result<(), Error> {
218
13975
        let results = self
219
15122
            .apply_transaction(Transaction::delete(C::collection_name(), doc.header()?))
220
15055
            .await?;
221
13975
        if let OperationResult::DocumentDeleted { .. } = &results[0] {
222
13975
            Ok(())
223
        } else {
224
            unreachable!(
225
                "apply_transaction on a single update should yield a single DocumentUpdated entry"
226
            )
227
        }
228
27950
    }
229

            
230
    /// Initializes [`View`] for [`schema::View`] `V`.
231
61997
    fn view<V: schema::SerializedView>(&'_ self) -> View<'_, Self, V>
232
61997
    where
233
61997
        Self: Sized,
234
61997
    {
235
61997
        View::new(self)
236
61997
    }
237

            
238
    /// Queries for view entries matching [`View`].
239
    ///
240
    /// This is the lower-level API. For better ergonomics, consider querying
241
    /// the view using [`self.view::<View>().query()`](View::query) instead.
242
    /// The parameters for the query can be customized on the builder returned
243
    /// from [`Self::view()`].
244
    async fn query<V: schema::SerializedView>(
245
        &self,
246
        key: Option<QueryKey<V::Key>>,
247
        order: Sort,
248
        limit: Option<usize>,
249
        access_policy: AccessPolicy,
250
    ) -> Result<Vec<Map<V::Key, V::Value>>, Error>
251
    where
252
        Self: Sized;
253

            
254
    /// Queries for view entries matching [`View`] with their source documents.
255
    ///
256
    /// This is the lower-level API. For better ergonomics, consider querying
257
    /// the view using [`self.view::<View>().query_with_docs()`](View::query_with_docs) instead.
258
    /// The parameters for the query can be customized on the builder returned
259
    /// from [`Self::view()`].
260
    #[must_use]
261
    async fn query_with_docs<V: schema::SerializedView>(
262
        &self,
263
        key: Option<QueryKey<V::Key>>,
264
        order: Sort,
265
        limit: Option<usize>,
266
        access_policy: AccessPolicy,
267
    ) -> Result<MappedDocuments<OwnedDocument, V>, Error>
268
    where
269
        Self: Sized;
270

            
271
    /// Queries for view entries matching [`View`] with their source documents, deserialized.
272
    ///
273
    /// This is the lower-level API. For better ergonomics, consider querying
274
    /// the view using [`self.view::<View>().query_with_collection_docs()`](View::query_with_collection_docs) instead.
275
    /// The parameters for the query can be customized on the builder returned
276
    /// from [`Self::view()`].
277
    #[must_use]
278
122
    async fn query_with_collection_docs<V>(
279
122
        &self,
280
122
        key: Option<QueryKey<V::Key>>,
281
122
        order: Sort,
282
122
        limit: Option<usize>,
283
122
        access_policy: AccessPolicy,
284
122
    ) -> Result<MappedDocuments<CollectionDocument<V::Collection>, V>, Error>
285
122
    where
286
122
        V: schema::SerializedView,
287
122
        V::Collection: SerializedCollection,
288
122
        <V::Collection as SerializedCollection>::Contents: std::fmt::Debug,
289
122
        Self: Sized,
290
122
    {
291
122
        let mapped_docs = self
292
237
            .query_with_docs::<V>(key, order, limit, access_policy)
293
237
            .await?;
294
122
        let mut collection_docs = BTreeMap::new();
295
259
        for (id, doc) in mapped_docs.documents {
296
137
            collection_docs.insert(id, CollectionDocument::<V::Collection>::try_from(&doc)?);
297
        }
298
122
        Ok(MappedDocuments {
299
122
            mappings: mapped_docs.mappings,
300
122
            documents: collection_docs,
301
122
        })
302
244
    }
303

            
304
    /// Reduces the view entries matching [`View`].
305
    ///
306
    /// This is the lower-level API. For better ergonomics, consider reducing
307
    /// the view using [`self.view::<View>().reduce()`](View::reduce) instead.
308
    /// The parameters for the query can be customized on the builder returned
309
    /// from [`Self::view()`].
310
    #[must_use]
311
    async fn reduce<V: schema::SerializedView>(
312
        &self,
313
        key: Option<QueryKey<V::Key>>,
314
        access_policy: AccessPolicy,
315
    ) -> Result<V::Value, Error>
316
    where
317
        Self: Sized;
318

            
319
    /// Reduces the view entries matching [`View`], reducing the values by each
320
    /// unique key.
321
    ///
322
    /// This is the lower-level API. For better ergonomics, consider reducing
323
    /// the view using
324
    /// [`self.view::<View>().reduce_grouped()`](View::reduce_grouped) instead.
325
    /// The parameters for the query can be customized on the builder returned
326
    /// from [`Self::view()`].
327
    #[must_use]
328
    async fn reduce_grouped<V: schema::SerializedView>(
329
        &self,
330
        key: Option<QueryKey<V::Key>>,
331
        access_policy: AccessPolicy,
332
    ) -> Result<Vec<MappedValue<V::Key, V::Value>>, Error>
333
    where
334
        Self: Sized;
335

            
336
    /// Deletes all of the documents associated with this view.
337
    ///
338
    /// This is the lower-level API. For better ergonomics, consider querying
339
    /// the view using [`self.view::<View>().delete_docs()`](View::delete_docs()) instead.
340
    /// The parameters for the query can be customized on the builder returned
341
    /// from [`Self::view()`].
342
    #[must_use]
343
    async fn delete_docs<V: schema::SerializedView>(
344
        &self,
345
        key: Option<QueryKey<V::Key>>,
346
        access_policy: AccessPolicy,
347
    ) -> Result<u64, Error>
348
    where
349
        Self: Sized;
350

            
351
    /// Applies a [`Transaction`] to the [`schema::Schema`]. If any operation in the
352
    /// [`Transaction`] fails, none of the operations will be applied to the
353
    /// [`schema::Schema`].
354
    async fn apply_transaction(
355
        &self,
356
        transaction: Transaction,
357
    ) -> Result<Vec<OperationResult>, Error>;
358

            
359
    /// Lists executed [`Transaction`]s from this [`schema::Schema`]. By default, a maximum of
360
    /// 1000 entries will be returned, but that limit can be overridden by
361
    /// setting `result_limit`. A hard limit of 100,000 results will be
362
    /// returned. To begin listing after another known `transaction_id`, pass
363
    /// `transaction_id + 1` into `starting_id`.
364
    async fn list_executed_transactions(
365
        &self,
366
        starting_id: Option<u64>,
367
        result_limit: Option<usize>,
368
    ) -> Result<Vec<transaction::Executed>, Error>;
369

            
370
    /// Fetches the last transaction id that has been committed, if any.
371
    async fn last_transaction_id(&self) -> Result<Option<u64>, Error>;
372

            
373
    /// Compacts the entire database to reclaim unused disk space.
374
    ///
375
    /// This process is done by writing data to a new file and swapping the file
376
    /// once the process completes. This ensures that if a hardware failure,
377
    /// power outage, or crash occurs that the original collection data is left
378
    /// untouched.
379
    ///
380
    /// ## Errors
381
    ///
382
    /// * [`Error::Io`]: an error occurred while compacting the database.
383
    async fn compact(&self) -> Result<(), crate::Error>;
384

            
385
    /// Compacts the collection to reclaim unused disk space.
386
    ///
387
    /// This process is done by writing data to a new file and swapping the file
388
    /// once the process completes. This ensures that if a hardware failure,
389
    /// power outage, or crash occurs that the original collection data is left
390
    /// untouched.
391
    ///
392
    /// ## Errors
393
    ///
394
    /// * [`Error::CollectionNotFound`]: database `name` does not exist.
395
    /// * [`Error::Io`]: an error occurred while compacting the database.
396
    async fn compact_collection<C: schema::Collection>(&self) -> Result<(), crate::Error>;
397

            
398
    /// Compacts the key value store to reclaim unused disk space.
399
    ///
400
    /// This process is done by writing data to a new file and swapping the file
401
    /// once the process completes. This ensures that if a hardware failure,
402
    /// power outage, or crash occurs that the original collection data is left
403
    /// untouched.
404
    ///
405
    /// ## Errors
406
    ///
407
    /// * [`Error::Io`]: an error occurred while compacting the database.
408
    async fn compact_key_value_store(&self) -> Result<(), crate::Error>;
409
}
410

            
411
/// Interacts with a collection over a `Connection`.
412
///
413
/// These examples in this type use this basic collection definition:
414
///
415
/// ```rust
416
/// use bonsaidb_core::{schema::Collection, Error};
417
/// use serde::{Deserialize, Serialize};
418
///
419
/// #[derive(Debug, Serialize, Deserialize, Default, Collection)]
420
/// #[collection(name = "MyCollection")]
421
/// # #[collection(core = bonsaidb_core)]
422
/// pub struct MyCollection {
423
///     pub rank: u32,
424
///     pub score: f32,
425
/// }
426
/// ```
427
pub struct Collection<'a, Cn, Cl> {
428
    connection: &'a Cn,
429
    _phantom: PhantomData<Cl>, /* allows for extension traits to be written for collections of specific types */
430
}
431

            
432
impl<'a, Cn, Cl> Clone for Collection<'a, Cn, Cl> {
433
    fn clone(&self) -> Self {
434
        Self {
435
            connection: self.connection,
436
            _phantom: PhantomData,
437
        }
438
    }
439
}
440

            
441
impl<'a, Cn, Cl> Collection<'a, Cn, Cl>
442
where
443
    Cn: Connection,
444
    Cl: schema::Collection,
445
{
446
    /// Creates a new instance using `connection`.
447
27666
    fn new(connection: &'a Cn) -> Self {
448
27666
        Self {
449
27666
            connection,
450
27666
            _phantom: PhantomData::default(),
451
27666
        }
452
27666
    }
453

            
454
    /// Adds a new `Document<Cl>` with the contents `item`.
455
    ///
456
    /// ## Automatic Id Assignment
457
    ///
458
    /// This function calls [`SerializedCollection::natural_id()`] to try to
459
    /// retrieve a primary key value from `item`. If an id is returned, the item
460
    /// is inserted with that id. If an id is not returned, an id will be
461
    /// automatically assigned, if possible, by the storage backend, which uses the [`Key`]
462
    /// trait to assign ids.
463
    ///
464
    /// ```rust
465
    /// # bonsaidb_core::__doctest_prelude!();
466
    /// # fn test_fn<C: Connection>(db: &C) -> Result<(), Error> {
467
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
468
    /// let inserted_header = db
469
    ///     .collection::<MyCollection>()
470
    ///     .push(&MyCollection::default())
471
    ///     .await?;
472
    /// println!(
473
    ///     "Inserted id {} with revision {}",
474
    ///     inserted_header.id, inserted_header.revision
475
    /// );
476
    /// # Ok(())
477
    /// # })
478
    /// # }
479
    /// ```
480
30239
    pub async fn push(
481
30239
        &self,
482
30239
        item: &<Cl as SerializedCollection>::Contents,
483
30239
    ) -> Result<CollectionHeader<Cl::PrimaryKey>, crate::Error>
484
30239
    where
485
30239
        Cl: schema::SerializedCollection,
486
30239
    {
487
30239
        let contents = Cl::serialize(item)?;
488
30239
        if let Some(natural_id) = Cl::natural_id(item) {
489
1
            self.insert_bytes(natural_id, contents).await
490
        } else {
491
33295
            self.push_bytes(contents).await
492
        }
493
30239
    }
494

            
495
    /// Adds a new `Document<Cl>` with the `contents`.
496
    ///
497
    /// ## Automatic Id Assignment
498
    ///
499
    /// An id will be automatically assigned, if possible, by the storage backend, which uses
500
    /// the [`Key`] trait to assign ids.
501
    ///
502
    /// ```rust
503
    /// # bonsaidb_core::__doctest_prelude!();
504
    /// # fn test_fn<C: Connection>(db: &C) -> Result<(), Error> {
505
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
506
    /// let inserted_header = db.collection::<MyCollection>().push_bytes(vec![]).await?;
507
    /// println!(
508
    ///     "Inserted id {} with revision {}",
509
    ///     inserted_header.id, inserted_header.revision
510
    /// );
511
    /// # Ok(())
512
    /// # })
513
    /// # }
514
    /// ```
515
30238
    pub async fn push_bytes<B: Into<Bytes> + Send>(
516
30238
        &self,
517
30238
        contents: B,
518
30238
    ) -> Result<CollectionHeader<Cl::PrimaryKey>, crate::Error>
519
30238
    where
520
30238
        Cl: schema::SerializedCollection,
521
30238
    {
522
30238
        self.connection
523
33295
            .insert::<Cl, _, B>(Option::<Cl::PrimaryKey>::None, contents)
524
32927
            .await
525
30238
    }
526

            
527
    /// Adds a new `Document<Cl>` with the given `id` and contents `item`.
528
    ///
529
    /// ```rust
530
    /// # bonsaidb_core::__doctest_prelude!();
531
    /// # fn test_fn<C: Connection>(db: &C) -> Result<(), Error> {
532
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
533
    /// let inserted_header = db
534
    ///     .collection::<MyCollection>()
535
    ///     .insert(42, &MyCollection::default())
536
    ///     .await?;
537
    /// println!(
538
    ///     "Inserted id {} with revision {}",
539
    ///     inserted_header.id, inserted_header.revision
540
    /// );
541
    /// # Ok(())
542
    /// # })
543
    /// # }
544
    /// ```
545
1013
    pub async fn insert<PrimaryKey>(
546
1013
        &self,
547
1013
        id: PrimaryKey,
548
1013
        item: &<Cl as SerializedCollection>::Contents,
549
1013
    ) -> Result<CollectionHeader<Cl::PrimaryKey>, crate::Error>
550
1013
    where
551
1013
        Cl: schema::SerializedCollection,
552
1013
        PrimaryKey: Into<AnyDocumentId<Cl::PrimaryKey>> + Send + Sync,
553
1013
    {
554
1013
        let contents = Cl::serialize(item)?;
555
3367
        self.connection.insert::<Cl, _, _>(Some(id), contents).await
556
1013
    }
557

            
558
    /// Adds a new `Document<Cl>` with the the given `id` and `contents`.
559
    ///
560
    /// ```rust
561
    /// # bonsaidb_core::__doctest_prelude!();
562
    /// # fn test_fn<C: Connection>(db: &C) -> Result<(), Error> {
563
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
564
    /// let inserted_header = db
565
    ///     .collection::<MyCollection>()
566
    ///     .insert_bytes(42, vec![])
567
    ///     .await?;
568
    /// println!(
569
    ///     "Inserted id {} with revision {}",
570
    ///     inserted_header.id, inserted_header.revision
571
    /// );
572
    /// # Ok(())
573
    /// # })
574
    /// # }
575
    /// ```
576
1
    pub async fn insert_bytes<B: Into<Bytes> + Send>(
577
1
        &self,
578
1
        id: Cl::PrimaryKey,
579
1
        contents: B,
580
1
    ) -> Result<CollectionHeader<Cl::PrimaryKey>, crate::Error>
581
1
    where
582
1
        Cl: schema::SerializedCollection,
583
1
    {
584
1
        self.connection.insert::<Cl, _, B>(Some(id), contents).await
585
1
    }
586

            
587
    /// Updates an existing document. Upon success, `doc.revision` will be
588
    /// updated with the new revision.
589
    ///
590
    /// ```rust
591
    /// # bonsaidb_core::__doctest_prelude!();
592
    /// # fn test_fn<C: Connection>(db: &C) -> Result<(), Error> {
593
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
594
    /// if let Some(mut document) = db.collection::<MyCollection>().get(42).await? {
595
    ///     // modify the document
596
    ///     db.collection::<MyCollection>().update(&mut document);
597
    ///     println!("Updated revision: {:?}", document.header.revision);
598
    /// }
599
    /// # Ok(())
600
    /// # })
601
    /// # }
602
    /// ```
603
    pub async fn update<D: Document<Cl> + Send + Sync>(&self, doc: &mut D) -> Result<(), Error> {
604
        self.connection.update::<Cl, D>(doc).await
605
    }
606

            
607
    /// Overwrites an existing document, or inserts a new document. Upon success,
608
    /// `doc.revision` will be updated with the new revision information.
609
    ///
610
    /// ```rust
611
    /// # bonsaidb_core::__doctest_prelude!();
612
    /// # fn test_fn<C: Connection>(db: &C) -> Result<(), Error> {
613
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
614
    /// if let Some(mut document) = db.collection::<MyCollection>().get(42).await? {
615
    ///     // modify the document
616
    ///     db.collection::<MyCollection>().overwrite(&mut document);
617
    ///     println!("Updated revision: {:?}", document.header.revision);
618
    /// }
619
    /// # Ok(())
620
    /// # })
621
    /// # }
622
    /// ```
623
5
    pub async fn overwrite<D: Document<Cl> + Send + Sync>(&self, doc: &mut D) -> Result<(), Error> {
624
5
        let contents = doc.bytes()?;
625
        doc.set_collection_header(
626
5
            self.connection
627
5
                .overwrite::<Cl, _>(doc.key(), contents)
628
5
                .await?,
629
        )
630
5
    }
631

            
632
    /// Retrieves a `Document<Cl>` with `id` from the connection.
633
    ///
634
    /// ```rust
635
    /// # bonsaidb_core::__doctest_prelude!();
636
    /// # fn test_fn<C: Connection>(db: &C) -> Result<(), Error> {
637
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
638
    /// if let Some(doc) = db.collection::<MyCollection>().get(42).await? {
639
    ///     println!(
640
    ///         "Retrieved bytes {:?} with revision {}",
641
    ///         doc.contents, doc.header.revision
642
    ///     );
643
    ///     let deserialized = MyCollection::document_contents(&doc)?;
644
    ///     println!("Deserialized contents: {:?}", deserialized);
645
    /// }
646
    /// # Ok(())
647
    /// # })
648
    /// # }
649
    /// ```
650
1575
    pub async fn get<PrimaryKey>(&self, id: PrimaryKey) -> Result<Option<OwnedDocument>, Error>
651
1575
    where
652
1575
        PrimaryKey: Into<AnyDocumentId<Cl::PrimaryKey>> + Send,
653
1575
    {
654
4684
        self.connection.get::<Cl, _>(id).await
655
1575
    }
656

            
657
    /// Retrieves all documents matching `ids`. Documents that are not found
658
    /// are not returned, but no error will be generated.
659
    ///
660
    /// ```rust
661
    /// # bonsaidb_core::__doctest_prelude!();
662
    /// # fn test_fn<C: Connection>(db: &C) -> Result<(), Error> {
663
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
664
    /// for doc in db
665
    ///     .collection::<MyCollection>()
666
    ///     .get_multiple([42, 43])
667
    ///     .await?
668
    /// {
669
    ///     println!("Retrieved #{} with bytes {:?}", doc.header.id, doc.contents);
670
    ///     let deserialized = MyCollection::document_contents(&doc)?;
671
    ///     println!("Deserialized contents: {:?}", deserialized);
672
    /// }
673
    /// # Ok(())
674
    /// # })
675
    /// # }
676
    /// ```
677
15
    pub async fn get_multiple<DocumentIds, PrimaryKey, I>(
678
15
        &self,
679
15
        ids: DocumentIds,
680
15
    ) -> Result<Vec<OwnedDocument>, Error>
681
15
    where
682
15
        DocumentIds: IntoIterator<Item = PrimaryKey, IntoIter = I> + Send + Sync,
683
15
        I: Iterator<Item = PrimaryKey> + Send + Sync,
684
15
        PrimaryKey: Into<AnyDocumentId<Cl::PrimaryKey>> + Send + Sync,
685
15
    {
686
15
        self.connection.get_multiple::<Cl, _, _, _>(ids).await
687
15
    }
688

            
689
    /// Retrieves all documents matching the range of `ids`.
690
    ///
691
    /// ```rust
692
    /// # bonsaidb_core::__doctest_prelude!();
693
    /// # fn test_fn<C: Connection>(db: &C) -> Result<(), Error> {
694
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
695
    /// for doc in db
696
    ///     .collection::<MyCollection>()
697
    ///     .list(42..)
698
    ///     .descending()
699
    ///     .limit(20)
700
    ///     .await?
701
    /// {
702
    ///     println!("Retrieved #{} with bytes {:?}", doc.header.id, doc.contents);
703
    ///     let deserialized = MyCollection::document_contents(&doc)?;
704
    ///     println!("Deserialized contents: {:?}", deserialized);
705
    /// }
706
    /// # Ok(())
707
    /// # })
708
    /// # }
709
    /// ```
710
    pub fn list<PrimaryKey, R>(&'a self, ids: R) -> List<'a, Cn, Cl>
711
    where
712
        R: Into<Range<PrimaryKey>>,
713
        PrimaryKey: Into<AnyDocumentId<Cl::PrimaryKey>>,
714
    {
715
        List::new(
716
            PossiblyOwned::Borrowed(self),
717
            ids.into().map(PrimaryKey::into),
718
        )
719
    }
720

            
721
    /// Retrieves all documents with ids that start with `prefix`.
722
    ///
723
    /// ```rust
724
    /// use bonsaidb_core::{
725
    ///     connection::Connection,
726
    ///     document::OwnedDocument,
727
    ///     schema::{Collection, Schematic, SerializedCollection},
728
    ///     Error,
729
    /// };
730
    /// use serde::{Deserialize, Serialize};
731
    ///
732
    /// #[derive(Debug, Serialize, Deserialize, Default, Collection)]
733
    /// #[collection(name = "MyCollection", primary_key = String)]
734
    /// # #[collection(core = bonsaidb_core)]
735
    /// pub struct MyCollection;
736
    ///
737
    /// async fn starts_with_a<C: Connection>(db: &C) -> Result<Vec<OwnedDocument>, Error> {
738
    ///     db.collection::<MyCollection>()
739
    ///         .list_with_prefix(String::from("a"))
740
    ///         .await
741
    /// }
742
    /// ```
743
    pub fn list_with_prefix(&'a self, prefix: Cl::PrimaryKey) -> List<'a, Cn, Cl>
744
    where
745
        Cl::PrimaryKey: IntoPrefixRange,
746
    {
747
        List::new(
748
            PossiblyOwned::Borrowed(self),
749
            prefix.into_prefix_range().map(AnyDocumentId::Deserialized),
750
        )
751
    }
752

            
753
    /// Retrieves all documents.
754
    ///
755
    /// ```rust
756
    /// # bonsaidb_core::__doctest_prelude!();
757
    /// # fn test_fn<C: Connection>(db: &C) -> Result<(), Error> {
758
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
759
    /// for doc in db.collection::<MyCollection>().all().await? {
760
    ///     println!("Retrieved #{} with bytes {:?}", doc.header.id, doc.contents);
761
    ///     let deserialized = MyCollection::document_contents(&doc)?;
762
    ///     println!("Deserialized contents: {:?}", deserialized);
763
    /// }
764
    /// # Ok(())
765
    /// # })
766
    /// # }
767
    /// ```
768
    pub fn all(&'a self) -> List<'a, Cn, Cl> {
769
        List::new(PossiblyOwned::Borrowed(self), Range::from(..))
770
    }
771

            
772
    /// Removes a `Document` from the database.
773
    ///
774
    /// ```rust
775
    /// # bonsaidb_core::__doctest_prelude!();
776
    /// # fn test_fn<C: Connection>(db: &C) -> Result<(), Error> {
777
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
778
    /// if let Some(doc) = db.collection::<MyCollection>().get(42).await? {
779
    ///     db.collection::<MyCollection>().delete(&doc).await?;
780
    /// }
781
    /// # Ok(())
782
    /// # })
783
    /// # }
784
    /// ```
785
895
    pub async fn delete<H: HasHeader + Send + Sync>(&self, doc: &H) -> Result<(), Error> {
786
2042
        self.connection.delete::<Cl, H>(doc).await
787
895
    }
788
}
789

            
790
pub(crate) struct ListBuilder<'a, Cn, Cl>
791
where
792
    Cl: schema::Collection,
793
{
794
    collection: PossiblyOwned<'a, Collection<'a, Cn, Cl>>,
795
    range: Range<AnyDocumentId<Cl::PrimaryKey>>,
796
    sort: Sort,
797
    limit: Option<usize>,
798
}
799

            
800
pub(crate) enum PossiblyOwned<'a, Cl> {
801
    Owned(Cl),
802
    Borrowed(&'a Cl),
803
}
804

            
805
impl<'a, Cl> Deref for PossiblyOwned<'a, Cl> {
806
    type Target = Cl;
807

            
808
42
    fn deref(&self) -> &Self::Target {
809
42
        match self {
810
42
            PossiblyOwned::Owned(value) => value,
811
            PossiblyOwned::Borrowed(value) => value,
812
        }
813
42
    }
814
}
815

            
816
pub(crate) enum ListState<'a, Cn, Cl>
817
where
818
    Cl: schema::Collection,
819
{
820
    Pending(Option<ListBuilder<'a, Cn, Cl>>),
821
    Executing(BoxFuture<'a, Result<Vec<OwnedDocument>, Error>>),
822
}
823

            
824
/// Executes [`Connection::list()`] when awaited. Also offers methods to
825
/// customize the options for the operation.
826
#[must_use]
827
pub struct List<'a, Cn, Cl>
828
where
829
    Cl: schema::Collection,
830
{
831
    state: ListState<'a, Cn, Cl>,
832
}
833

            
834
impl<'a, Cn, Cl> List<'a, Cn, Cl>
835
where
836
    Cl: schema::Collection,
837
    Cn: Connection,
838
{
839
42
    pub(crate) fn new(
840
42
        collection: PossiblyOwned<'a, Collection<'a, Cn, Cl>>,
841
42
        range: Range<AnyDocumentId<Cl::PrimaryKey>>,
842
42
    ) -> Self {
843
42
        Self {
844
42
            state: ListState::Pending(Some(ListBuilder {
845
42
                collection,
846
42
                range,
847
42
                sort: Sort::Ascending,
848
42
                limit: None,
849
42
            })),
850
42
        }
851
42
    }
852

            
853
10
    fn builder(&mut self) -> &mut ListBuilder<'a, Cn, Cl> {
854
10
        if let ListState::Pending(Some(builder)) = &mut self.state {
855
10
            builder
856
        } else {
857
            unreachable!("Attempted to use after retrieving the result")
858
        }
859
10
    }
860

            
861
    /// Lists documents by id in ascending order.
862
    pub fn ascending(mut self) -> Self {
863
        self.builder().sort = Sort::Ascending;
864
        self
865
    }
866

            
867
    /// Lists documents by id in descending order.
868
5
    pub fn descending(mut self) -> Self {
869
5
        self.builder().sort = Sort::Descending;
870
5
        self
871
5
    }
872

            
873
    /// Sets the maximum number of results to return.
874
5
    pub fn limit(mut self, maximum_results: usize) -> Self {
875
5
        self.builder().limit = Some(maximum_results);
876
5
        self
877
5
    }
878

            
879
    /// Returns the number of documents contained within the range.
880
    ///
881
    /// Order and limit are ignored if they were set.
882
    ///
883
    /// ```rust
884
    /// # bonsaidb_core::__doctest_prelude!();
885
    /// # fn test_fn<C: Connection>(db: &C) -> Result<(), Error> {
886
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
887
    /// println!(
888
    ///     "Number of documents with id 42 or larger: {}",
889
    ///     db.collection::<MyCollection>().list(42..).count().await?
890
    /// );
891
    /// println!(
892
    ///     "Number of documents in MyCollection: {}",
893
    ///     db.collection::<MyCollection>().all().count().await?
894
    /// );
895
    /// # Ok(())
896
    /// # })
897
    /// # }
898
    /// ```
899
10
    pub async fn count(self) -> Result<u64, Error> {
900
10
        match self.state {
901
            ListState::Pending(Some(ListBuilder {
902
10
                collection, range, ..
903
10
            })) => collection.connection.count::<Cl, _, _>(range).await,
904
            _ => unreachable!("Attempted to use after retrieving the result"),
905
        }
906
10
    }
907
}
908

            
909
impl<'a, Cn, Cl> Future for List<'a, Cn, Cl>
910
where
911
    Cn: Connection,
912
    Cl: schema::Collection + Unpin,
913
    Cl::PrimaryKey: Unpin,
914
{
915
    type Output = Result<Vec<OwnedDocument>, Error>;
916

            
917
96
    fn poll(
918
96
        mut self: std::pin::Pin<&mut Self>,
919
96
        cx: &mut std::task::Context<'_>,
920
96
    ) -> std::task::Poll<Self::Output> {
921
96
        match &mut self.state {
922
64
            ListState::Executing(future) => future.as_mut().poll(cx),
923
32
            ListState::Pending(builder) => {
924
32
                let ListBuilder {
925
32
                    collection,
926
32
                    range,
927
32
                    sort,
928
32
                    limit,
929
32
                } = builder.take().unwrap();
930
32

            
931
32
                let future = async move {
932
32
                    collection
933
32
                        .connection
934
32
                        .list::<Cl, _, _>(range, sort, limit)
935
32
                        .await
936
32
                }
937
32
                .boxed();
938
32

            
939
32
                self.state = ListState::Executing(future);
940
32
                self.poll(cx)
941
            }
942
        }
943
96
    }
944
}
945

            
946
/// Parameters to query a [`schema::View`].
947
///
948
/// The examples for this type use this view definition:
949
///
950
/// ```rust
951
/// # mod collection {
952
/// # bonsaidb_core::__doctest_prelude!();
953
/// # }
954
/// # use collection::MyCollection;
955
/// use bonsaidb_core::{
956
///     define_basic_unique_mapped_view,
957
///     document::{CollectionDocument, Emit},
958
///     schema::{
959
///         CollectionViewSchema, DefaultViewSerialization, Name, ReduceResult, View,
960
///         ViewMapResult, ViewMappedValue,
961
///     },
962
/// };
963
///
964
/// #[derive(Debug, Clone, View)]
965
/// #[view(collection = MyCollection, key = u32, value = f32, name = "scores-by-rank")]
966
/// # #[view(core = bonsaidb_core)]
967
/// pub struct ScoresByRank;
968
///
969
/// impl CollectionViewSchema for ScoresByRank {
970
///     type View = Self;
971
///     fn map(
972
///         &self,
973
///         document: CollectionDocument<<Self::View as View>::Collection>,
974
///     ) -> ViewMapResult<Self::View> {
975
///         document
976
///             .header
977
///             .emit_key_and_value(document.contents.rank, document.contents.score)
978
///     }
979
///
980
///     fn reduce(
981
///         &self,
982
///         mappings: &[ViewMappedValue<Self::View>],
983
///         rereduce: bool,
984
///     ) -> ReduceResult<Self::View> {
985
///         if mappings.is_empty() {
986
///             Ok(0.)
987
///         } else {
988
///             Ok(mappings.iter().map(|map| map.value).sum::<f32>() / mappings.len() as f32)
989
///         }
990
///     }
991
/// }
992
/// ```
993
#[must_use]
994
pub struct View<'a, Cn, V: schema::SerializedView> {
995
    connection: &'a Cn,
996

            
997
    /// Key filtering criteria.
998
    pub key: Option<QueryKey<V::Key>>,
999

            
    /// The view's data access policy. The default value is [`AccessPolicy::UpdateBefore`].
    pub access_policy: AccessPolicy,

            
    /// The sort order of the query.
    pub sort: Sort,

            
    /// The maximum number of results to return.
    pub limit: Option<usize>,
}

            
impl<'a, Cn, V> View<'a, Cn, V>
where
    V: schema::SerializedView,
    Cn: Connection,
{
61997
    fn new(connection: &'a Cn) -> Self {
61997
        Self {
61997
            connection,
61997
            key: None,
61997
            access_policy: AccessPolicy::UpdateBefore,
61997
            sort: Sort::Ascending,
61997
            limit: None,
61997
        }
61997
    }

            
    /// Filters for entries in the view with `key`.
    ///
    /// ```rust
    /// # bonsaidb_core::__doctest_prelude!();
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
    /// // score is an f32 in this example
    /// for mapping in db.view::<ScoresByRank>().with_key(42).query().await? {
    ///     assert_eq!(mapping.key, 42);
    ///     println!("Rank {} has a score of {:3}", mapping.key, mapping.value);
    /// }
    /// # Ok(())
    /// # })
    /// # }
    /// ```
59217
    pub fn with_key(mut self, key: V::Key) -> Self {
59217
        self.key = Some(QueryKey::Matches(key));
59217
        self
59217
    }

            
    /// Filters for entries in the view with `keys`.
    ///
    /// ```rust
    /// # bonsaidb_core::__doctest_prelude!();
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
    /// // score is an f32 in this example
    /// for mapping in db
    ///     .view::<ScoresByRank>()
    ///     .with_keys([42, 43])
    ///     .query()
    ///     .await?
    /// {
    ///     println!("Rank {} has a score of {:3}", mapping.key, mapping.value);
    /// }
    /// # Ok(())
    /// # })
    /// # }
    /// ```
11
    pub fn with_keys<IntoIter: IntoIterator<Item = V::Key>>(mut self, keys: IntoIter) -> Self {
11
        self.key = Some(QueryKey::Multiple(keys.into_iter().collect()));
11
        self
11
    }

            
    /// Filters for entries in the view with the range `keys`.
    ///
    /// ```rust
    /// # bonsaidb_core::__doctest_prelude!();
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
    /// // score is an f32 in this example
    /// for mapping in db
    ///     .view::<ScoresByRank>()
    ///     .with_key_range(42..)
    ///     .query()
    ///     .await?
    /// {
    ///     assert!(mapping.key >= 42);
    ///     println!("Rank {} has a score of {:3}", mapping.key, mapping.value);
    /// }
    /// # Ok(())
    /// # })
    /// # }
    /// ```
13
    pub fn with_key_range<R: Into<Range<V::Key>>>(mut self, range: R) -> Self {
13
        self.key = Some(QueryKey::Range(range.into()));
13
        self
13
    }

            
    /// Filters for entries in the view with keys that begin with `prefix`.
    ///
    /// ```rust
    /// # bonsaidb_core::__doctest_prelude!();
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
    /// #[derive(View, Debug, Clone)]
    /// #[view(name = "by-name", key = String, collection = MyCollection)]
    /// # #[view(core = bonsaidb_core)]
    /// struct ByName;
    ///
    /// // score is an f32 in this example
    /// for mapping in db
    ///     .view::<ByName>()
    ///     .with_key_prefix(String::from("a"))
    ///     .query()
    ///     .await?
    /// {
    ///     assert!(mapping.key.starts_with("a"));
    ///     println!("{} in document {:?}", mapping.key, mapping.source);
    /// }
    /// # Ok(())
    /// # })
    /// # }
    /// ```
    pub fn with_key_prefix(mut self, prefix: V::Key) -> Self
    where
        V::Key: IntoPrefixRange,
    {
        self.key = Some(QueryKey::Range(prefix.into_prefix_range()));
        self
    }

            
    /// Sets the access policy for queries.
    ///
    /// ```rust
    /// # bonsaidb_core::__doctest_prelude!();
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
    /// // score is an f32 in this example
    /// for mapping in db
    ///     .view::<ScoresByRank>()
    ///     .with_access_policy(AccessPolicy::UpdateAfter)
    ///     .query()
    ///     .await?
    /// {
    ///     println!("Rank {} has a score of {:3}", mapping.key, mapping.value);
    /// }
    /// # Ok(())
    /// # })
    /// # }
    /// ```
15662
    pub fn with_access_policy(mut self, policy: AccessPolicy) -> Self {
15662
        self.access_policy = policy;
15662
        self
15662
    }

            
    /// Queries the view in ascending order. This is the default sorting
    /// behavior.
    ///
    /// ```rust
    /// # bonsaidb_core::__doctest_prelude!();
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
    /// // score is an f32 in this example
    /// for mapping in db.view::<ScoresByRank>().ascending().query().await? {
    ///     println!("Rank {} has a score of {:3}", mapping.key, mapping.value);
    /// }
    /// # Ok(())
    /// # })
    /// # }
    /// ```
    pub fn ascending(mut self) -> Self {
        self.sort = Sort::Ascending;
        self
    }

            
    /// Queries the view in descending order.
    ///
    /// ```rust
    /// # bonsaidb_core::__doctest_prelude!();
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
    /// // score is an f32 in this example
    /// for mapping in db.view::<ScoresByRank>().descending().query().await? {
    ///     println!("Rank {} has a score of {:3}", mapping.key, mapping.value);
    /// }
    /// # Ok(())
    /// # })
    /// # }
    /// ```
5
    pub fn descending(mut self) -> Self {
5
        self.sort = Sort::Descending;
5
        self
5
    }

            
    /// Sets the maximum number of results to return.
    ///
    /// ```rust
    /// # bonsaidb_core::__doctest_prelude!();
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
    /// // score is an f32 in this example
    /// let mappings = db.view::<ScoresByRank>().limit(10).query().await?;
    /// assert!(mappings.len() <= 10);
    /// # Ok(())
    /// # })
    /// # }
    /// ```
5
    pub fn limit(mut self, maximum_results: usize) -> Self {
5
        self.limit = Some(maximum_results);
5
        self
5
    }

            
    /// Executes the query and retrieves the results.
    ///
    /// ```rust
    /// # bonsaidb_core::__doctest_prelude!();
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
    /// // score is an f32 in this example
    /// for mapping in db.view::<ScoresByRank>().query().await? {
    ///     println!("Rank {} has a score of {:3}", mapping.key, mapping.value);
    /// }
    /// # Ok(())
    /// # })
    /// # }
    /// ```
38045
    pub async fn query(self) -> Result<Vec<Map<V::Key, V::Value>>, Error> {
38045
        self.connection
38052
            .query::<V>(self.key, self.sort, self.limit, self.access_policy)
37648
            .await
38045
    }

            
    /// Executes the query and retrieves the results with the associated [`Document`s](crate::document::OwnedDocument).
    ///
    /// ```rust
    /// # bonsaidb_core::__doctest_prelude!();
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
    /// for mapping in &db
    ///     .view::<ScoresByRank>()
    ///     .with_key_range(42..=44)
    ///     .query_with_docs()
    ///     .await?
    /// {
    ///     println!(
    ///         "Mapping from #{} with rank: {} and score: {}. Document bytes: {:?}",
    ///         mapping.document.header.id, mapping.key, mapping.value, mapping.document.contents
    ///     );
    /// }
    /// # Ok(())
    /// # })
    /// # }
    /// ```
7848
    pub async fn query_with_docs(self) -> Result<MappedDocuments<OwnedDocument, V>, Error> {
7848
        self.connection
8451
            .query_with_docs::<V>(self.key, self.sort, self.limit, self.access_policy)
8451
            .await
7848
    }

            
    /// Executes the query and retrieves the results with the associated [`CollectionDocument`s](crate::document::CollectionDocument).
    ///
    /// ```rust
    /// # bonsaidb_core::__doctest_prelude!();
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
    /// for mapping in &db
    ///     .view::<ScoresByRank>()
    ///     .with_key_range(42..=44)
    ///     .query_with_collection_docs()
    ///     .await?
    /// {
    ///     println!(
    ///         "Mapping from #{} with rank: {} and score: {}. Deserialized Contents: {:?}",
    ///         mapping.document.header.id, mapping.key, mapping.value, mapping.document.contents
    ///     );
    /// }
    /// # Ok(())
    /// # })
    /// # }
    /// ```
122
    pub async fn query_with_collection_docs(
122
        self,
122
    ) -> Result<MappedDocuments<CollectionDocument<V::Collection>, V>, Error>
122
    where
122
        V::Collection: SerializedCollection,
122
        <V::Collection as SerializedCollection>::Contents: std::fmt::Debug,
122
    {
122
        self.connection
237
            .query_with_collection_docs::<V>(self.key, self.sort, self.limit, self.access_policy)
237
            .await
122
    }

            
    /// Executes a reduce over the results of the query
    ///
    /// ```rust
    /// # bonsaidb_core::__doctest_prelude!();
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
    /// // score is an f32 in this example
    /// let score = db.view::<ScoresByRank>().reduce().await?;
    /// println!("Average score: {:3}", score);
    /// # Ok(())
    /// # })
    /// # }
    /// ```
15953
    pub async fn reduce(self) -> Result<V::Value, Error> {
15953
        self.connection
15954
            .reduce::<V>(self.key, self.access_policy)
8139
            .await
15953
    }

            
    /// Executes a reduce over the results of the query, grouping by key.
    ///
    /// ```rust
    /// # bonsaidb_core::__doctest_prelude!();
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
    /// // score is an f32 in this example
    /// for mapping in db.view::<ScoresByRank>().reduce_grouped().await? {
    ///     println!(
    ///         "Rank {} has an average score of {:3}",
    ///         mapping.key, mapping.value
    ///     );
    /// }
    /// # Ok(())
    /// # })
    /// # }
    /// ```
24
    pub async fn reduce_grouped(self) -> Result<Vec<MappedValue<V::Key, V::Value>>, Error> {
24
        self.connection
26
            .reduce_grouped::<V>(self.key, self.access_policy)
23
            .await
24
    }

            
    /// Deletes all of the associated documents that match this view query.
    ///
    /// ```rust
    /// # bonsaidb_core::__doctest_prelude!();
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
    /// db.view::<ScoresByRank>().delete_docs().await?;
    /// # Ok(())
    /// # })
    /// # }
    /// ```
5
    pub async fn delete_docs(self) -> Result<u64, Error> {
5
        self.connection
8
            .delete_docs::<V>(self.key, self.access_policy)
8
            .await
5
    }
}

            
/// A sort order.
7874
#[derive(Clone, Copy, Serialize, Deserialize, Debug)]
pub enum Sort {
    /// Sort ascending (A -> Z).
    Ascending,
    /// Sort descending (Z -> A).
    Descending,
}

            
/// Filters a [`View`] by key.
23778
#[derive(Clone, Serialize, Deserialize, Debug)]
pub enum QueryKey<K> {
    /// Matches all entries with the key provided.
    Matches(K),

            
    /// Matches all entires with keys in the range provided.
    Range(Range<K>),

            
    /// Matches all entries that have keys that are included in the set provided.
    Multiple(Vec<K>),
}

            
#[allow(clippy::use_self)] // clippy is wrong, Self is different because of generic parameters
impl<K: for<'a> Key<'a>> QueryKey<K> {
    /// Converts this key to a serialized format using the [`Key`] trait.
261167
    pub fn serialized(&self) -> Result<QueryKey<Bytes>, Error> {
261167
        match self {
261143
            Self::Matches(key) => key
261143
                .as_ord_bytes()
261143
                .map_err(|err| Error::Database(view::Error::key_serialization(err).to_string()))
261143
                .map(|v| QueryKey::Matches(Bytes::from(v.to_vec()))),
13
            Self::Range(range) => Ok(QueryKey::Range(range.as_ord_bytes().map_err(|err| {
                Error::Database(view::Error::key_serialization(err).to_string())
13
            })?)),
11
            Self::Multiple(keys) => {
11
                let keys = keys
11
                    .iter()
22
                    .map(|key| {
22
                        key.as_ord_bytes()
22
                            .map(|key| Bytes::from(key.to_vec()))
22
                            .map_err(|err| {
                                Error::Database(view::Error::key_serialization(err).to_string())
22
                            })
22
                    })
11
                    .collect::<Result<Vec<_>, Error>>()?;

            
11
                Ok(QueryKey::Multiple(keys))
            }
        }
261167
    }
}

            
#[allow(clippy::use_self)] // clippy is wrong, Self is different because of generic parameters
impl<'a, T> QueryKey<T>
where
    T: AsRef<[u8]>,
{
    /// Deserializes the bytes into `K` via the [`Key`] trait.
    pub fn deserialized<K: for<'k> Key<'k>>(&self) -> Result<QueryKey<K>, Error> {
        match self {
            Self::Matches(key) => K::from_ord_bytes(key.as_ref())
                .map_err(|err| Error::Database(view::Error::key_serialization(err).to_string()))
                .map(QueryKey::Matches),
            Self::Range(range) => Ok(QueryKey::Range(range.deserialize().map_err(|err| {
                Error::Database(view::Error::key_serialization(err).to_string())
            })?)),
            Self::Multiple(keys) => {
                let keys = keys
                    .iter()
                    .map(|key| {
                        K::from_ord_bytes(key.as_ref()).map_err(|err| {
                            Error::Database(view::Error::key_serialization(err).to_string())
                        })
                    })
                    .collect::<Result<Vec<_>, Error>>()?;

            
                Ok(QueryKey::Multiple(keys))
            }
        }
    }
}

            
/// A range type that can represent all std range types and be serialized.
32
#[derive(Serialize, Deserialize, Debug, Copy, Clone, Eq, PartialEq)]
pub struct Range<T> {
    /// The start of the range.
    pub start: Bound<T>,
    /// The end of the range.
    pub end: Bound<T>,
}

            
/// A range bound that can be serialized.
64
#[derive(Serialize, Deserialize, Debug, Copy, Clone, Eq, PartialEq)]
pub enum Bound<T> {
    /// No bound.
    Unbounded,
    /// Bounded by the contained value (inclusive).
    Included(T),
    /// Bounded by the contained value (exclusive).
    Excluded(T),
}

            
impl<T> Range<T> {
    /// Maps each contained value with the function provided.
20
    pub fn map<U, F: Fn(T) -> U>(self, map: F) -> Range<U> {
20
        Range {
20
            start: self.start.map(&map),
20
            end: self.end.map(&map),
20
        }
20
    }
    /// Maps each contained value with the function provided. The callback's
    /// return type is a Result, unlike with `map`.
42
    pub fn map_result<U, E, F: Fn(T) -> Result<U, E>>(self, map: F) -> Result<Range<U>, E> {
42
        Ok(Range {
42
            start: self.start.map_result(&map)?,
42
            end: self.end.map_result(&map)?,
        })
42
    }

            
    /// Maps each contained value as a reference.
238
    pub fn map_ref<U: ?Sized, F: Fn(&T) -> &U>(&self, map: F) -> Range<&U> {
238
        Range {
238
            start: self.start.map_ref(&map),
238
            end: self.end.map_ref(&map),
238
        }
238
    }
}

            
impl<'a, T: Key<'a>> Range<T> {
    /// Serializes the range's contained values to big-endian bytes.
251
    pub fn as_ord_bytes(&'a self) -> Result<Range<Bytes>, T::Error> {
251
        Ok(Range {
251
            start: self.start.as_ord_bytes()?,
251
            end: self.end.as_ord_bytes()?,
        })
251
    }
}

            
impl<'a, B> Range<B>
where
    B: AsRef<[u8]>,
{
    /// Deserializes the range's contained values from big-endian bytes.
    pub fn deserialize<T: for<'k> Key<'k>>(&'a self) -> Result<Range<T>, <T as Key<'_>>::Error> {
        Ok(Range {
            start: self.start.deserialize()?,
            end: self.start.deserialize()?,
        })
    }
}

            
impl<T> Bound<T> {
    /// Maps the contained value, if any, and returns the resulting `Bound`.
40
    pub fn map<U, F: Fn(T) -> U>(self, map: F) -> Bound<U> {
40
        match self {
            Bound::Unbounded => Bound::Unbounded,
35
            Bound::Included(value) => Bound::Included(map(value)),
5
            Bound::Excluded(value) => Bound::Excluded(map(value)),
        }
40
    }

            
    /// Maps the contained value with the function provided. The callback's
    /// return type is a Result, unlike with `map`.
84
    pub fn map_result<U, E, F: Fn(T) -> Result<U, E>>(self, map: F) -> Result<Bound<U>, E> {
84
        Ok(match self {
44
            Bound::Unbounded => Bound::Unbounded,
35
            Bound::Included(value) => Bound::Included(map(value)?),
5
            Bound::Excluded(value) => Bound::Excluded(map(value)?),
        })
84
    }

            
    /// Maps each contained value as a reference.
476
    pub fn map_ref<U: ?Sized, F: Fn(&T) -> &U>(&self, map: F) -> Bound<&U> {
476
        match self {
            Bound::Unbounded => Bound::Unbounded,
398
            Bound::Included(value) => Bound::Included(map(value)),
78
            Bound::Excluded(value) => Bound::Excluded(map(value)),
        }
476
    }
}

            
impl<'a, T: Key<'a>> Bound<T> {
    /// Serializes the contained value to big-endian bytes.
502
    pub fn as_ord_bytes(&'a self) -> Result<Bound<Bytes>, T::Error> {
502
        match self {
            Bound::Unbounded => Ok(Bound::Unbounded),
421
            Bound::Included(value) => {
421
                Ok(Bound::Included(Bytes::from(value.as_ord_bytes()?.to_vec())))
            }
81
            Bound::Excluded(value) => {
81
                Ok(Bound::Excluded(Bytes::from(value.as_ord_bytes()?.to_vec())))
            }
        }
502
    }
}

            
impl<'a, B> Bound<B>
where
    B: AsRef<[u8]>,
{
    /// Deserializes the bound's contained value from big-endian bytes.
    pub fn deserialize<T: for<'k> Key<'k>>(&'a self) -> Result<Bound<T>, <T as Key<'_>>::Error> {
        match self {
            Bound::Unbounded => Ok(Bound::Unbounded),
            Bound::Included(value) => Ok(Bound::Included(T::from_ord_bytes(value.as_ref())?)),
            Bound::Excluded(value) => Ok(Bound::Excluded(T::from_ord_bytes(value.as_ref())?)),
        }
    }
}

            
impl<T> std::ops::RangeBounds<T> for Range<T> {
237886
    fn start_bound(&self) -> std::ops::Bound<&T> {
237886
        std::ops::Bound::from(&self.start)
237886
    }

            
210272
    fn end_bound(&self) -> std::ops::Bound<&T> {
210272
        std::ops::Bound::from(&self.end)
210272
    }
}

            
impl<'a, T> From<&'a Bound<T>> for std::ops::Bound<&'a T> {
448158
    fn from(bound: &'a Bound<T>) -> Self {
448158
        match bound {
344482
            Bound::Unbounded => std::ops::Bound::Unbounded,
101842
            Bound::Included(value) => std::ops::Bound::Included(value),
1834
            Bound::Excluded(value) => std::ops::Bound::Excluded(value),
        }
448158
    }
}

            
impl<T> From<std::ops::Range<T>> for Range<T> {
22
    fn from(range: std::ops::Range<T>) -> Self {
22
        Self {
22
            start: Bound::Included(range.start),
22
            end: Bound::Excluded(range.end),
22
        }
22
    }
}

            
impl<T> From<std::ops::RangeFrom<T>> for Range<T> {
13998
    fn from(range: std::ops::RangeFrom<T>) -> Self {
13998
        Self {
13998
            start: Bound::Included(range.start),
13998
            end: Bound::Unbounded,
13998
        }
13998
    }
}

            
impl<T> From<std::ops::RangeTo<T>> for Range<T> {
    fn from(range: std::ops::RangeTo<T>) -> Self {
        Self {
            start: Bound::Unbounded,
            end: Bound::Excluded(range.end),
        }
    }
}

            
impl<T: Clone> From<std::ops::RangeInclusive<T>> for Range<T> {
25
    fn from(range: std::ops::RangeInclusive<T>) -> Self {
25
        Self {
25
            start: Bound::Included(range.start().clone()),
25
            end: Bound::Included(range.end().clone()),
25
        }
25
    }
}

            
impl<T> From<std::ops::RangeToInclusive<T>> for Range<T> {
    fn from(range: std::ops::RangeToInclusive<T>) -> Self {
        Self {
            start: Bound::Unbounded,
            end: Bound::Included(range.end),
        }
    }
}

            
impl<T> From<std::ops::RangeFull> for Range<T> {
13472
    fn from(_: std::ops::RangeFull) -> Self {
13472
        Self {
13472
            start: Bound::Unbounded,
13472
            end: Bound::Unbounded,
13472
        }
13472
    }
}

            
/// Changes how the view's outdated data will be treated.
23812
#[derive(Clone, Serialize, Deserialize, Debug)]
pub enum AccessPolicy {
    /// Update any changed documents before returning a response.
    UpdateBefore,

            
    /// Return the results, which may be out-of-date, and start an update job in
    /// the background. This pattern is useful when you want to ensure you
    /// provide consistent response times while ensuring the database is
    /// updating in the background.
    UpdateAfter,

            
    /// Returns the results, which may be out-of-date, and do not start any
    /// background jobs. This mode is useful if you're using a view as a cache
    /// and have a background process that is responsible for controlling when
    /// data is refreshed and updated. While the default `UpdateBefore`
    /// shouldn't have much overhead, this option removes all overhead related
    /// to view updating from the query.
    NoUpdate,
}

            
/// Functions for interacting with a multi-database BonsaiDb instance.
#[async_trait]
pub trait StorageConnection: Send + Sync {
    /// The type that represents a database for this implementation.
    type Database: Connection;

            
    /// Creates a database named `name` with the `Schema` provided.
    ///
    /// ## Errors
    ///
    /// * [`Error::InvalidDatabaseName`]: `name` must begin with an alphanumeric
    ///   character (`[a-zA-Z0-9]`), and all remaining characters must be
    ///   alphanumeric, a period (`.`), or a hyphen (`-`).
    /// * [`Error::DatabaseNameAlreadyTaken`]: `name` was already used for a
    ///   previous database name. Database names are case insensitive. Returned
    ///   if `only_if_needed` is false.
989
    async fn create_database<DB: Schema>(
989
        &self,
989
        name: &str,
989
        only_if_needed: bool,
989
    ) -> Result<(), crate::Error> {
2627
        self.create_database_with_schema(name, DB::schema_name(), only_if_needed)
2623
            .await
1978
    }

            
    /// Returns a reference to database `name` with schema `DB`.
    async fn database<DB: Schema>(&self, name: &str) -> Result<Self::Database, crate::Error>;

            
    /// Creates a database named `name` using the [`SchemaName`] `schema`.
    ///
    /// ## Errors
    ///
    /// * [`Error::InvalidDatabaseName`]: `name` must begin with an alphanumeric
    ///   character (`[a-zA-Z0-9]`), and all remaining characters must be
    ///   alphanumeric, a period (`.`), or a hyphen (`-`).
    /// * [`Error::DatabaseNameAlreadyTaken`]: `name` was already used for a
    ///   previous database name. Database names are case insensitive. Returned
    ///   if `only_if_needed` is false.
    async fn create_database_with_schema(
        &self,
        name: &str,
        schema: SchemaName,
        only_if_needed: bool,
    ) -> Result<(), crate::Error>;

            
    /// Deletes a database named `name`.
    ///
    /// ## Errors
    ///
    /// * [`Error::DatabaseNotFound`]: database `name` does not exist.
    /// * [`Error::Io`]: an error occurred while deleting files.
    async fn delete_database(&self, name: &str) -> Result<(), crate::Error>;

            
    /// Lists the databases in this storage.
    async fn list_databases(&self) -> Result<Vec<Database>, crate::Error>;

            
    /// Lists the [`SchemaName`]s registered with this storage.
    async fn list_available_schemas(&self) -> Result<Vec<SchemaName>, crate::Error>;

            
    /// Creates a user.
    #[cfg(feature = "multiuser")]
    async fn create_user(&self, username: &str) -> Result<u64, crate::Error>;

            
    /// Deletes a user.
    #[cfg(feature = "multiuser")]
    async fn delete_user<'user, U: Nameable<'user, u64> + Send + Sync>(
        &self,
        user: U,
    ) -> Result<(), crate::Error>;

            
    /// Sets a user's password.
    #[cfg(feature = "password-hashing")]
    async fn set_user_password<'user, U: Nameable<'user, u64> + Send + Sync>(
        &self,
        user: U,
        password: SensitiveString,
    ) -> Result<(), crate::Error>;

            
    /// Authenticates as a user with a authentication method.
    #[cfg(all(feature = "multiuser", feature = "password-hashing"))]
    async fn authenticate<'user, U: Nameable<'user, u64> + Send + Sync>(
        &self,
        user: U,
        authentication: Authentication,
    ) -> Result<Authenticated, crate::Error>;

            
    /// Adds a user to a permission group.
    #[cfg(feature = "multiuser")]
    async fn add_permission_group_to_user<
        'user,
        'group,
        U: Nameable<'user, u64> + Send + Sync,
        G: Nameable<'group, u64> + Send + Sync,
    >(
        &self,
        user: U,
        permission_group: G,
    ) -> Result<(), crate::Error>;

            
    /// Removes a user from a permission group.
    #[cfg(feature = "multiuser")]
    async fn remove_permission_group_from_user<
        'user,
        'group,
        U: Nameable<'user, u64> + Send + Sync,
        G: Nameable<'group, u64> + Send + Sync,
    >(
        &self,
        user: U,
        permission_group: G,
    ) -> Result<(), crate::Error>;

            
    /// Adds a user to a permission group.
    #[cfg(feature = "multiuser")]
    async fn add_role_to_user<
        'user,
        'role,
        U: Nameable<'user, u64> + Send + Sync,
        R: Nameable<'role, u64> + Send + Sync,
    >(
        &self,
        user: U,
        role: R,
    ) -> Result<(), crate::Error>;

            
    /// Removes a user from a permission group.
    #[cfg(feature = "multiuser")]
    async fn remove_role_from_user<
        'user,
        'role,
        U: Nameable<'user, u64> + Send + Sync,
        R: Nameable<'role, u64> + Send + Sync,
    >(
        &self,
        user: U,
        role: R,
    ) -> Result<(), crate::Error>;
}

            
/// A database stored in BonsaiDb.
1288
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
pub struct Database {
    /// The name of the database.
    pub name: String,
    /// The schema defining the database.
    pub schema: SchemaName,
}

            
/// A plain-text password. This struct automatically overwrites the password
/// with zeroes when dropped.
#[cfg(feature = "multiuser")]
1998
#[derive(Clone, Serialize, Deserialize, Zeroize)]
#[zeroize(drop)]
#[serde(transparent)]
pub struct SensitiveString(pub String);

            
#[cfg(feature = "multiuser")]
impl std::fmt::Debug for SensitiveString {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.write_str("SensitiveString(...)")
    }
}

            
#[cfg(feature = "multiuser")]
impl Deref for SensitiveString {
    type Target = String;

            
270
    fn deref(&self) -> &Self::Target {
270
        &self.0
270
    }
}

            
/// User authentication methods.
10
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum Authentication {
    /// Authenticate using a password.
    #[cfg(feature = "password-hashing")]
    Password(crate::connection::SensitiveString),
}

            
/// Information about the authenticated session.
10
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Authenticated {
    /// The user id logged in as.
    pub user_id: u64,
    /// The effective permissions granted.
    pub permissions: Permissions,
}

            
#[doc(hidden)]
#[macro_export]
macro_rules! __doctest_prelude {
    () => {
        use bonsaidb_core::{
            connection::{AccessPolicy, Connection},
            define_basic_unique_mapped_view,
            document::{CollectionDocument,Emit, Document, OwnedDocument},
            schema::{

            
                Collection, CollectionName, CollectionViewSchema, DefaultSerialization,
                DefaultViewSerialization, Name, NamedCollection, ReduceResult, Schema, SchemaName,
                Schematic, SerializedCollection, View, ViewMapResult, ViewMappedValue,
            },
            Error,
        };
        use serde::{Deserialize, Serialize};

            
        #[derive(Debug, Schema)]
        #[schema(name = "MySchema", collections = [MyCollection], core = $crate)]
        pub struct MySchema;

            
        #[derive( Debug, Serialize, Deserialize, Default, Collection)]
        #[collection(name = "MyCollection", views = [MyCollectionByName], core = $crate)]
        pub struct MyCollection {
            pub name: String,
            pub rank: u32,
            pub score: f32,
        }

            
        impl MyCollection {
            pub fn named(s: impl Into<String>) -> Self {
                Self::new(s, 0, 0.)
            }

            
            pub fn new(s: impl Into<String>, rank: u32, score: f32) -> Self {
                Self {
                    name: s.into(),
                    rank,
                    score,
                }
            }
        }

            
        impl NamedCollection for MyCollection {
            type ByNameView = MyCollectionByName;
        }

            
        #[derive(Debug, Clone, View)]
        #[view(collection = MyCollection, key = u32, value = f32, name = "scores-by-rank", core = $crate)]
        pub struct ScoresByRank;

            
        impl CollectionViewSchema for ScoresByRank {
            type View = Self;
            fn map(
                &self,
                document: CollectionDocument<<Self::View as View>::Collection>,
            ) -> ViewMapResult<Self::View> {
                document
                    .header
                    .emit_key_and_value(document.contents.rank, document.contents.score)
            }

            
            fn reduce(
                &self,
                mappings: &[ViewMappedValue<Self::View>],
                rereduce: bool,
            ) -> ReduceResult<Self::View> {
                if mappings.is_empty() {
                    Ok(0.)
                } else {
                    Ok(mappings.iter().map(|map| map.value).sum::<f32>() / mappings.len() as f32)
                }
            }
        }

            
        define_basic_unique_mapped_view!(
            MyCollectionByName,
            MyCollection,
            1,
            "by-name",
            String,
            (),
            |document: CollectionDocument<MyCollection>| {
                document.header.emit_key(document.contents.name.clone())
            },
        );
    };
}