1
use std::{borrow::Cow, fmt::Debug, marker::PhantomData, task::Poll};
2

            
3
use async_trait::async_trait;
4
use futures::{future::BoxFuture, ready, Future, FutureExt};
5
use serde::{de::DeserializeOwned, Deserialize, Serialize};
6
use transmog::{Format, OwnedDeserializer};
7
use transmog_pot::Pot;
8

            
9
use crate::{
10
    connection::{self, Connection, Range},
11
    document::{
12
        AnyDocumentId, BorrowedDocument, CollectionDocument, Document, DocumentId, KeyId,
13
        OwnedDocument, OwnedDocuments,
14
    },
15
    key::Key,
16
    schema::{CollectionName, Schematic},
17
    Error,
18
};
19

            
20
/// A namespaced collection of `Document<Self>` items and views.
21
///
22
/// ## Deriving this trait
23
///
24
/// This trait can be derived instead of manually implemented:
25
///
26
/// ```rust
27
/// use bonsaidb_core::schema::Collection;
28
/// use serde::{Deserialize, Serialize};
29
///
30
/// #[derive(Debug, Serialize, Deserialize, Default, Collection)]
31
/// #[collection(name = "MyCollection")]
32
/// # #[collection(core = bonsaidb_core)]
33
/// pub struct MyCollection;
34
/// ```
35
///
36
/// If you're publishing a collection for use in multiple projects, consider
37
/// giving the collection an `authority`, which gives your collection a
38
/// namespace:
39
///
40
/// ```rust
41
/// use bonsaidb_core::schema::Collection;
42
/// use serde::{Deserialize, Serialize};
43
///
44
/// #[derive(Debug, Serialize, Deserialize, Default, Collection)]
45
/// #[collection(name = "MyCollection", authority = "khonsulabs")]
46
/// # #[collection(core = bonsaidb_core)]
47
/// pub struct MyCollection;
48
/// ```
49
///
50
/// The list of views can be specified using the `views` parameter:
51
///
52
/// ```rust
53
/// use bonsaidb_core::schema::{Collection, View};
54
/// use serde::{Deserialize, Serialize};
55
///
56
/// #[derive(Debug, Serialize, Deserialize, Default, Collection)]
57
/// #[collection(name = "MyCollection", views = [ScoresByRank])]
58
/// # #[collection(core = bonsaidb_core)]
59
/// pub struct MyCollection;
60
///
61
/// #[derive(Debug, Clone, View)]
62
/// #[view(collection = MyCollection, key = u32, value = f32, name = "scores-by-rank")]
63
/// # #[view(core = bonsaidb_core)]
64
/// pub struct ScoresByRank;
65
/// #
66
/// # use bonsaidb_core::{
67
/// #     document::CollectionDocument,
68
/// #     schema::{
69
/// #         CollectionViewSchema,   ReduceResult,
70
/// #         ViewMapResult, ViewMappedValue,
71
/// #    },
72
/// # };
73
/// # impl CollectionViewSchema for ScoresByRank {
74
/// #     type View = Self;
75
/// #     fn map(
76
/// #         &self,
77
/// #         _document: CollectionDocument<<Self::View as View>::Collection>,
78
/// #     ) -> ViewMapResult<Self::View> {
79
/// #         todo!()
80
/// #     }
81
/// #
82
/// #     fn reduce(
83
/// #         &self,
84
/// #         _mappings: &[ViewMappedValue<Self::View>],
85
/// #         _rereduce: bool,
86
/// #     ) -> ReduceResult<Self::View> {
87
/// #         todo!()
88
/// #     }
89
/// # }
90
/// ```
91
///
92
/// ### Selecting a Primary Key type
93
///
94
/// By default, the `#[collection]` macro will use `u64` for the
95
/// [`Self::PrimaryKey`] type. Collections can use any type that implements the
96
/// [`Key`] trait:
97
///
98
/// ```rust
99
/// use bonsaidb_core::schema::Collection;
100
/// use serde::{Deserialize, Serialize};
101
///
102
/// #[derive(Debug, Serialize, Deserialize, Default, Collection)]
103
/// #[collection(name = "MyCollection", primary_key = u128)]
104
/// # #[collection(core = bonsaidb_core)]
105
/// pub struct MyCollection;
106
/// ```
107
///
108
/// If the data being stored has a ["natural key"][natural-key], a closure or a
109
/// function can be provided to extract the value during a `push` operation:
110
///
111
/// ```rust
112
/// use bonsaidb_core::schema::Collection;
113
/// use serde::{Deserialize, Serialize};
114
///
115
/// #[derive(Debug, Serialize, Deserialize, Default, Collection)]
116
/// #[collection(name = "MyCollection", natural_id = |item: &Self| Some(item.external_id))]
117
/// # #[collection(core = bonsaidb_core)]
118
/// pub struct MyCollection {
119
///     pub external_id: u64,
120
/// }
121
/// ```
122
///
123
/// Primary keys are not able to be updated. To update a document's primary key,
124
/// the contents must be inserted at the new id and deleted from the previous
125
/// id.
126
///
127
/// [natural-key]: https://en.wikipedia.org/wiki/Natural_key
128
///
129
///
130
/// ### Specifying a Collection Encryption Key
131
///
132
/// By default, encryption will be required if an `encryption_key` is provided:
133
///
134
/// ```rust
135
/// use bonsaidb_core::{document::KeyId, schema::Collection};
136
/// use serde::{Deserialize, Serialize};
137
///
138
/// #[derive(Debug, Serialize, Deserialize, Default, Collection)]
139
/// #[collection(name = "MyCollection", encryption_key = Some(KeyId::Master))]
140
/// # #[collection(core = bonsaidb_core)]
141
/// pub struct MyCollection;
142
/// ```
143
///
144
/// The `encryption_required` parameter can be provided if you wish to be
145
/// explicit:
146
///
147
/// ```rust
148
/// use bonsaidb_core::{document::KeyId, schema::Collection};
149
/// use serde::{Deserialize, Serialize};
150
///
151
/// #[derive(Debug, Serialize, Deserialize, Default, Collection)]
152
/// #[collection(name = "MyCollection")]
153
/// #[collection(encryption_key = Some(KeyId::Master), encryption_required)]
154
/// # #[collection(core = bonsaidb_core)]
155
/// pub struct MyCollection;
156
/// ```
157
///
158
/// Or, if you wish your collection to be encrypted if its available, but not
159
/// cause errors when being stored without encryption, you can provide the
160
/// `encryption_optional` parameter:
161
///
162
/// ```rust
163
/// use bonsaidb_core::{document::KeyId, schema::Collection};
164
/// use serde::{Deserialize, Serialize};
165
///
166
/// #[derive(Debug, Serialize, Deserialize, Default, Collection)]
167
/// #[collection(name = "MyCollection")]
168
/// #[collection(encryption_key = Some(KeyId::Master), encryption_optional)]
169
/// # #[collection(core = bonsaidb_core)]
170
/// pub struct MyCollection;
171
/// ```
172
///
173
/// ### Changing the serialization strategy
174
///
175
/// BonsaiDb uses [`transmog`](::transmog) to allow customizing serialization
176
/// formats. To use one of the formats Transmog already supports, add its crate
177
/// to your Cargo.toml and use it like this example using `transmog_bincode`:
178
///
179
/// ```rust
180
/// use bonsaidb_core::schema::Collection;
181
/// use serde::{Deserialize, Serialize};
182
///
183
/// #[derive(Debug, Serialize, Deserialize, Default, Collection)]
184
/// #[collection(name = "MyCollection")]
185
/// #[collection(serialization = transmog_bincode::Bincode)]
186
/// # #[collection(core = bonsaidb_core)]
187
/// pub struct MyCollection;
188
/// ```
189
///
190
/// To manually implement `SerializedCollection` you can pass `None` to
191
/// `serialization`:
192
///
193
/// ```rust
194
/// use bonsaidb_core::schema::Collection;
195
///
196
/// #[derive(Debug, Default, Collection)]
197
/// #[collection(name = "MyCollection")]
198
/// #[collection(serialization = None)]
199
/// # #[collection(core = bonsaidb_core)]
200
/// pub struct MyCollection;
201
/// ```
202
pub trait Collection: Debug + Send + Sync
203
where
204
    AnyDocumentId<Self::PrimaryKey>: From<Self::PrimaryKey>,
205
{
206
    /// The unique id type. Each document stored in a collection will be
207
    /// uniquely identified by this type.
208
    ///
209
    /// ## Primary Key Limits
210
    ///
211
    /// The result of [`Key::as_ord_bytes()`] must be less than or equal
212
    /// to [`DocumentId::MAX_LENGTH`]. This is currently 63 bytes.
213
    type PrimaryKey: for<'k> Key<'k>;
214

            
215
    /// The unique name of this collection. Each collection must be uniquely
216
    /// named within the [`Schema`](crate::schema::Schema) it is registered
217
    /// within.
218
    fn collection_name() -> CollectionName;
219

            
220
    /// Defines all `View`s in this collection in `schema`.
221
    fn define_views(schema: &mut Schematic) -> Result<(), Error>;
222

            
223
    /// If a [`KeyId`] is returned, this collection will be stored encrypted
224
    /// at-rest using the key specified.
225
    #[must_use]
226
1849349
    fn encryption_key() -> Option<KeyId> {
227
1849349
        None
228
1849349
    }
229
}
230

            
231
/// A collection that knows how to serialize and deserialize documents to an associated type.
232
///
233
/// These examples for this type use this basic collection definition:
234
///
235
/// ```rust
236
/// use bonsaidb_core::{
237
///     schema::{Collection, CollectionName, DefaultSerialization, Schematic},
238
///     Error,
239
/// };
240
/// use serde::{Deserialize, Serialize};
241
///
242
/// #[derive(Debug, Serialize, Deserialize, Default, Collection)]
243
/// #[collection(name = "MyCollection")]
244
/// # #[collection(core = bonsaidb_core)]
245
/// pub struct MyCollection {
246
///     pub rank: u32,
247
///     pub score: f32,
248
/// }
249
/// ```
250
#[async_trait]
251
pub trait SerializedCollection: Collection {
252
    /// The type of the contents stored in documents in this collection.
253
    type Contents: Send + Sync;
254
    /// The serialization format for this collection.
255
    type Format: OwnedDeserializer<Self::Contents>;
256

            
257
    /// Returns the natural identifier of `contents`. This is called when
258
    /// pushing values into a collection, before attempting to automatically
259
    /// assign a unique id.
260
    #[allow(unused_variables)]
261
1
    fn natural_id(contents: &Self::Contents) -> Option<Self::PrimaryKey>
262
1
    where
263
1
        Self: Sized,
264
1
    {
265
1
        None
266
1
    }
267

            
268
    /// Returns the configured instance of [`Self::Format`].
269
    // TODO allow configuration to be passed here, such as max allocation bytes.
270
    fn format() -> Self::Format;
271

            
272
    /// Deserialize `data` as `Self::Contents` using this collection's format.
273
98998
    fn deserialize(data: &[u8]) -> Result<Self::Contents, Error> {
274
98998
        Self::format()
275
98998
            .deserialize_owned(data)
276
98998
            .map_err(|err| crate::Error::Serialization(err.to_string()))
277
98998
    }
278

            
279
    /// Returns the deserialized contents of `doc`.
280
4621
    fn document_contents<D: Document<Self>>(doc: &D) -> Result<Self::Contents, Error>
281
4621
    where
282
4621
        Self: Sized,
283
4621
    {
284
4621
        doc.contents()
285
4621
    }
286

            
287
    /// Sets the contents of `doc` to `contents`.
288
521
    fn set_document_contents<D: Document<Self>>(
289
521
        doc: &mut D,
290
521
        contents: Self::Contents,
291
521
    ) -> Result<(), Error>
292
521
    where
293
521
        Self: Sized,
294
521
    {
295
521
        doc.set_contents(contents)
296
521
    }
297

            
298
    /// Serialize `item` using this collection's format.
299
44412
    fn serialize(item: &Self::Contents) -> Result<Vec<u8>, Error> {
300
44412
        Self::format()
301
44412
            .serialize(item)
302
44412
            .map_err(|err| crate::Error::Serialization(err.to_string()))
303
44412
    }
304

            
305
    /// Gets a [`CollectionDocument`] with `id` from `connection`.
306
    ///
307
    /// ```rust
308
    /// # bonsaidb_core::__doctest_prelude!();
309
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
310
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
311
    /// if let Some(doc) = MyCollection::get(42, &db).await? {
312
    ///     println!(
313
    ///         "Retrieved revision {} with deserialized contents: {:?}",
314
    ///         doc.header.revision, doc.contents
315
    ///     );
316
    /// }
317
    /// # Ok(())
318
    /// # })
319
    /// # }
320
    /// ```
321
12264
    async fn get<C, PrimaryKey>(
322
12264
        id: PrimaryKey,
323
12264
        connection: &C,
324
12264
    ) -> Result<Option<CollectionDocument<Self>>, Error>
325
12264
    where
326
12264
        C: Connection,
327
12264
        PrimaryKey: Into<AnyDocumentId<Self::PrimaryKey>> + Send,
328
12264
        Self: Sized,
329
12264
    {
330
13278
        let possible_doc = connection.get::<Self, _>(id).await?;
331
12264
        Ok(possible_doc.as_ref().map(TryInto::try_into).transpose()?)
332
24528
    }
333

            
334
    /// Retrieves all documents matching `ids`. Documents that are not found
335
    /// are not returned, but no error will be generated.
336
    ///
337
    /// ```rust
338
    /// # bonsaidb_core::__doctest_prelude!();
339
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
340
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
341
    /// for doc in MyCollection::get_multiple([42, 43], &db).await? {
342
    ///     println!(
343
    ///         "Retrieved #{} with deserialized contents: {:?}",
344
    ///         doc.header.id, doc.contents
345
    ///     );
346
    /// }
347
    /// # Ok(())
348
    /// # })
349
    /// # }
350
    /// ```
351
15
    async fn get_multiple<C, DocumentIds, PrimaryKey, I>(
352
15
        ids: DocumentIds,
353
15
        connection: &C,
354
15
    ) -> Result<Vec<CollectionDocument<Self>>, Error>
355
15
    where
356
15
        C: Connection,
357
15
        DocumentIds: IntoIterator<Item = PrimaryKey, IntoIter = I> + Send + Sync,
358
15
        I: Iterator<Item = PrimaryKey> + Send + Sync,
359
15
        PrimaryKey: Into<AnyDocumentId<Self::PrimaryKey>> + Send + Sync,
360
15
        Self: Sized,
361
15
    {
362
15
        connection
363
15
            .collection::<Self>()
364
15
            .get_multiple(ids)
365
15
            .await
366
15
            .and_then(|docs| docs.collection_documents())
367
30
    }
368

            
369
    /// Retrieves all documents matching the range of `ids`.
370
    ///
371
    /// ```rust
372
    /// # bonsaidb_core::__doctest_prelude!();
373
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
374
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
375
    /// for doc in MyCollection::list(42.., &db).descending().limit(20).await? {
376
    ///     println!(
377
    ///         "Retrieved #{} with deserialized contents: {:?}",
378
    ///         doc.header.id, doc.contents
379
    ///     );
380
    /// }
381
    /// # Ok(())
382
    /// # })
383
    /// # }
384
    /// ```
385
15
    fn list<R, PrimaryKey, C>(ids: R, connection: &'_ C) -> List<'_, C, Self>
386
15
    where
387
15
        R: Into<Range<PrimaryKey>>,
388
15
        C: Connection,
389
15
        PrimaryKey: Into<AnyDocumentId<Self::PrimaryKey>> + Send + Sync,
390
15
        Self: Sized,
391
15
    {
392
15
        List(connection::List::new(
393
15
            connection::PossiblyOwned::Owned(connection.collection::<Self>()),
394
15
            ids.into().map(PrimaryKey::into),
395
15
        ))
396
15
    }
397

            
398
    /// Retrieves all documents.
399
    ///
400
    /// ```rust
401
    /// # bonsaidb_core::__doctest_prelude!();
402
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
403
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
404
    /// for doc in MyCollection::all(&db).await? {
405
    ///     println!(
406
    ///         "Retrieved #{} with deserialized contents: {:?}",
407
    ///         doc.header.id, doc.contents
408
    ///     );
409
    /// }
410
    /// # Ok(())
411
    /// # })
412
    /// # }
413
    /// ```
414
5
    fn all<C: Connection>(connection: &C) -> List<'_, C, Self>
415
5
    where
416
5
        Self: Sized,
417
5
    {
418
5
        List(connection::List::new(
419
5
            connection::PossiblyOwned::Owned(connection.collection::<Self>()),
420
5
            Range::from(..),
421
5
        ))
422
5
    }
423

            
424
    /// Pushes this value into the collection, returning the created document.
425
    /// This function is useful when `Self != Self::Contents`.
426
    ///
427
    /// ## Automatic Id Assignment
428
    ///
429
    /// This function calls [`Self::natural_id()`] to try to retrieve a primary
430
    /// key value from `contents`. If an id is returned, the item is inserted
431
    /// with that id. If an id is not returned, an id will be automatically
432
    /// assigned, if possible, by the storage backend, which uses the [`Key`]
433
    /// trait to assign ids.
434
    ///
435
    /// ```rust
436
    /// # bonsaidb_core::__doctest_prelude!();
437
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
438
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
439
    /// let document = MyCollection::push(MyCollection::default(), &db).await?;
440
    /// println!(
441
    ///     "Inserted {:?} with id {} with revision {}",
442
    ///     document.contents, document.header.id, document.header.revision
443
    /// );
444
    /// # Ok(())
445
    /// # })
446
    /// # }
447
    /// ```
448
3021
    async fn push<Cn: Connection>(
449
3021
        contents: Self::Contents,
450
3021
        connection: &Cn,
451
3021
    ) -> Result<CollectionDocument<Self>, InsertError<Self::Contents>>
452
3021
    where
453
3021
        Self: Sized + 'static,
454
3021
        Self::Contents: 'async_trait,
455
3021
    {
456
4208
        let header = match connection.collection::<Self>().push(&contents).await {
457
3017
            Ok(header) => header,
458
4
            Err(error) => return Err(InsertError { contents, error }),
459
        };
460
3017
        Ok(CollectionDocument { header, contents })
461
6042
    }
462

            
463
    /// Pushes this value into the collection, returning the created document.
464
    ///
465
    /// ## Automatic Id Assignment
466
    ///
467
    /// This function calls [`Self::natural_id()`] to try to retrieve a primary
468
    /// key value from `self`. If an id is returned, the item is inserted with
469
    /// that id. If an id is not returned, an id will be automatically assigned,
470
    /// if possible, by the storage backend, which uses the [`Key`] trait to
471
    /// assign ids.
472
    ///
473
    /// ```rust
474
    /// # bonsaidb_core::__doctest_prelude!();
475
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
476
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
477
    /// let document = MyCollection::default().push_into(&db).await?;
478
    /// println!(
479
    ///     "Inserted {:?} with id {} with revision {}",
480
    ///     document.contents, document.header.id, document.header.revision
481
    /// );
482
    /// # Ok(())
483
    /// # })
484
    /// # }
485
    /// ```
486
2943
    async fn push_into<Cn: Connection>(
487
2943
        self,
488
2943
        connection: &Cn,
489
2943
    ) -> Result<CollectionDocument<Self>, InsertError<Self>>
490
2943
    where
491
2943
        Self: SerializedCollection<Contents = Self> + Sized + 'static,
492
2943
    {
493
4130
        Self::push(self, connection).await
494
5886
    }
495

            
496
    /// Inserts this value into the collection with the specified id, returning
497
    /// the created document.
498
    ///
499
    /// ```rust
500
    /// # bonsaidb_core::__doctest_prelude!();
501
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
502
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
503
    /// let document = MyCollection::insert(42, MyCollection::default(), &db).await?;
504
    /// assert_eq!(document.header.id, 42);
505
    /// println!(
506
    ///     "Inserted {:?} with revision {}",
507
    ///     document.contents, document.header.revision
508
    /// );
509
    /// # Ok(())
510
    /// # })
511
    /// # }
512
    /// ```
513
1013
    async fn insert<PrimaryKey, Cn>(
514
1013
        id: PrimaryKey,
515
1013
        contents: Self::Contents,
516
1013
        connection: &Cn,
517
1013
    ) -> Result<CollectionDocument<Self>, InsertError<Self::Contents>>
518
1013
    where
519
1013
        PrimaryKey: Into<AnyDocumentId<Self::PrimaryKey>> + Send + Sync,
520
1013
        Cn: Connection,
521
1013
        Self: Sized + 'static,
522
1013
        Self::Contents: 'async_trait,
523
1013
    {
524
3006
        let header = match connection.collection::<Self>().insert(id, &contents).await {
525
507
            Ok(header) => header,
526
506
            Err(error) => return Err(InsertError { contents, error }),
527
        };
528
507
        Ok(CollectionDocument { header, contents })
529
2026
    }
530

            
531
    /// Inserts this value into the collection with the given `id`, returning
532
    /// the created document.
533
    ///
534
    /// ```rust
535
    /// # bonsaidb_core::__doctest_prelude!();
536
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
537
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
538
    /// let document = MyCollection::default().insert_into(42, &db).await?;
539
    /// assert_eq!(document.header.id, 42);
540
    /// println!(
541
    ///     "Inserted {:?} with revision {}",
542
    ///     document.contents, document.header.revision
543
    /// );
544
    /// # Ok(())
545
    /// # })
546
    /// # }
547
    /// ```
548
1013
    async fn insert_into<PrimaryKey, Cn>(
549
1013
        self,
550
1013
        id: PrimaryKey,
551
1013
        connection: &Cn,
552
1013
    ) -> Result<CollectionDocument<Self>, InsertError<Self>>
553
1013
    where
554
1013
        PrimaryKey: Into<AnyDocumentId<Self::PrimaryKey>> + Send + Sync,
555
1013
        Cn: Connection,
556
1013
        Self: SerializedCollection<Contents = Self> + Sized + 'static,
557
1013
    {
558
3006
        Self::insert(id, self, connection).await
559
2026
    }
560

            
561
    /// Overwrites this value into the collection with the specified id, returning
562
    /// the created or updated document.
563
    ///
564
    /// ```rust
565
    /// # bonsaidb_core::__doctest_prelude!();
566
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
567
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
568
    /// let document = MyCollection::overwrite(42, MyCollection::default(), &db).await?;
569
    /// assert_eq!(document.header.id, 42);
570
    /// println!(
571
    ///     "Overwrote {:?} with revision {}",
572
    ///     document.contents, document.header.revision
573
    /// );
574
    /// # Ok(())
575
    /// # })
576
    /// # }
577
    /// ```
578
506
    async fn overwrite<PrimaryKey, Cn>(
579
506
        id: PrimaryKey,
580
506
        contents: Self::Contents,
581
506
        connection: &Cn,
582
506
    ) -> Result<CollectionDocument<Self>, InsertError<Self::Contents>>
583
506
    where
584
506
        PrimaryKey: Into<AnyDocumentId<Self::PrimaryKey>> + Send,
585
506
        Cn: Connection,
586
506
        Self: Sized + 'static,
587
506
        Self::Contents: 'async_trait,
588
506
    {
589
506
        let header = match Self::serialize(&contents) {
590
1550
            Ok(serialized) => match connection.overwrite::<Self, _>(id, serialized).await {
591
506
                Ok(header) => header,
592
                Err(error) => return Err(InsertError { contents, error }),
593
            },
594
            Err(error) => return Err(InsertError { contents, error }),
595
        };
596
506
        Ok(CollectionDocument { header, contents })
597
1012
    }
598

            
599
    /// Overwrites this value into the collection with the given `id`, returning
600
    /// the created or updated document.
601
    ///
602
    /// ```rust
603
    /// # bonsaidb_core::__doctest_prelude!();
604
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
605
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
606
    /// let document = MyCollection::default().overwrite_into(42, &db).await?;
607
    /// assert_eq!(document.header.id, 42);
608
    /// println!(
609
    ///     "Overwrote {:?} with revision {}",
610
    ///     document.contents, document.header.revision
611
    /// );
612
    /// # Ok(())
613
    /// # })
614
    /// # }
615
    /// ```
616
506
    async fn overwrite_into<Cn: Connection, PrimaryKey>(
617
506
        self,
618
506
        id: PrimaryKey,
619
506
        connection: &Cn,
620
506
    ) -> Result<CollectionDocument<Self>, InsertError<Self>>
621
506
    where
622
506
        PrimaryKey: Into<AnyDocumentId<Self::PrimaryKey>> + Send + Sync,
623
506
        Self: SerializedCollection<Contents = Self> + Sized + 'static,
624
506
    {
625
1550
        Self::overwrite(id, self, connection).await
626
1012
    }
627
}
628

            
629
/// A convenience trait for easily storing Serde-compatible types in documents.
630
pub trait DefaultSerialization: Collection {
631
    /// Returns the natural identifier of `contents`. This is called when
632
    /// pushing values into a collection, before attempting to automatically
633
    /// assign a unique id.
634
29401
    fn natural_id(&self) -> Option<Self::PrimaryKey> {
635
29401
        None
636
29401
    }
637
}
638

            
639
impl<T> SerializedCollection for T
640
where
641
    T: DefaultSerialization + Serialize + DeserializeOwned,
642
{
643
    type Contents = Self;
644
    type Format = Pot;
645

            
646
379156
    fn format() -> Self::Format {
647
379156
        Pot::default()
648
379156
    }
649

            
650
29402
    fn natural_id(contents: &Self::Contents) -> Option<Self::PrimaryKey> {
651
29402
        T::natural_id(contents)
652
29402
    }
653
}
654

            
655
/// An error from inserting a [`CollectionDocument`].
656
#[derive(thiserror::Error, Debug)]
657
#[error("{error}")]
658
pub struct InsertError<T> {
659
    /// The original value being inserted.
660
    pub contents: T,
661
    /// The error that occurred while inserting.
662
    pub error: Error,
663
}
664

            
665
/// A collection with a unique name column.
666
///
667
/// ## Finding a document by unique name
668
///
669
/// ```rust
670
/// # bonsaidb_core::__doctest_prelude!();
671
/// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
672
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
673
/// if let Some(doc) = MyCollection::load("unique name", &db).await? {
674
///     println!(
675
///         "Retrieved revision {} with deserialized contents: {:?}",
676
///         doc.header.revision, doc.contents
677
///     );
678
/// }
679
/// # Ok(())
680
/// # })
681
/// # }
682
/// ```
683
///
684
/// Load accepts either a string or a [`DocumentId`]. This enables building
685
/// methods that accept either the unique ID or the unique name:
686
///
687
/// ```rust
688
/// # bonsaidb_core::__doctest_prelude!();
689
/// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
690
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
691
/// if let Some(doc) = MyCollection::load(42, &db).await? {
692
///     println!(
693
///         "Retrieved revision {} with deserialized contents: {:?}",
694
///         doc.header.revision, doc.contents
695
///     );
696
/// }
697
/// # Ok(())
698
/// # })
699
/// # }
700
/// ```
701
///
702
/// ## Executing an insert or update
703
///
704
/// ```rust
705
/// # bonsaidb_core::__doctest_prelude!();
706
/// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
707
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
708
/// let upserted = MyCollection::entry("unique name", &db)
709
///     .update_with(|existing: &mut MyCollection| {
710
///         existing.rank += 1;
711
///     })
712
///     .or_insert_with(MyCollection::default)
713
///     .await?
714
///     .unwrap();
715
/// println!("Rank: {:?}", upserted.contents.rank);
716
///
717
/// # Ok(())
718
/// # })
719
/// # }
720
/// ```
721
#[async_trait]
722
pub trait NamedCollection: Collection + Unpin {
723
    /// The name view defined for the collection.
724
    type ByNameView: crate::schema::SerializedView<Key = String>;
725

            
726
    /// Gets a [`CollectionDocument`] with `id` from `connection`.
727
7796
    async fn load<'name, N: Nameable<'name, Self::PrimaryKey> + Send + Sync, C: Connection>(
728
7796
        id: N,
729
7796
        connection: &C,
730
7796
    ) -> Result<Option<CollectionDocument<Self>>, Error>
731
7796
    where
732
7796
        Self: SerializedCollection + Sized + 'static,
733
7796
    {
734
8084
        let possible_doc = Self::load_document(id, connection).await?;
735
7796
        Ok(possible_doc
736
7796
            .as_ref()
737
7796
            .map(CollectionDocument::try_from)
738
7796
            .transpose()?)
739
15592
    }
740

            
741
    /// Gets a [`CollectionDocument`] with `id` from `connection`.
742
94
    fn entry<
743
94
        'connection,
744
94
        'name,
745
94
        N: Into<NamedReference<'name, Self::PrimaryKey>> + Send + Sync,
746
94
        C: Connection,
747
94
    >(
748
94
        id: N,
749
94
        connection: &'connection C,
750
94
    ) -> Entry<'connection, 'name, C, Self, (), ()>
751
94
    where
752
94
        Self: SerializedCollection + Sized,
753
94
    {
754
94
        let name = id.into();
755
94
        Entry {
756
94
            state: EntryState::Pending(Some(EntryBuilder {
757
94
                name,
758
94
                connection,
759
94
                insert: None,
760
94
                update: None,
761
94
                retry_limit: 0,
762
94
                _collection: PhantomData,
763
94
            })),
764
94
        }
765
94
    }
766

            
767
    /// Loads a document from this collection by name, if applicable. Return
768
    /// `Ok(None)` if unsupported.
769
    #[allow(unused_variables)]
770
7796
    async fn load_document<
771
7796
        'name,
772
7796
        N: Nameable<'name, Self::PrimaryKey> + Send + Sync,
773
7796
        C: Connection,
774
7796
    >(
775
7796
        name: N,
776
7796
        connection: &C,
777
7796
    ) -> Result<Option<OwnedDocument>, Error>
778
7796
    where
779
7796
        Self: SerializedCollection + Sized,
780
7796
    {
781
7796
        match name.name()? {
782
            NamedReference::Id(id) => connection.collection::<Self>().get(id).await,
783
26
            NamedReference::Key(id) => connection.collection::<Self>().get(id).await,
784
7770
            NamedReference::Name(name) => Ok(connection
785
7770
                .view::<Self::ByNameView>()
786
7770
                .with_key(name.as_ref().to_owned())
787
8061
                .query_with_docs()
788
8060
                .await?
789
                .documents
790
7770
                .into_iter()
791
7770
                .next()
792
7770
                .map(|(_, document)| document)),
793
        }
794
15592
    }
795
}
796

            
797
/// A reference to a collection that has a unique name view.
798
74
#[derive(Clone, PartialEq, Deserialize, Serialize, Debug)]
799
#[must_use]
800
pub enum NamedReference<'a, Id> {
801
    /// An entity's name.
802
    Name(Cow<'a, str>),
803
    /// A document id.
804
    Id(DocumentId),
805
    /// A document id.
806
    Key(Id),
807
}
808

            
809
impl<'a, Id> From<&'a str> for NamedReference<'a, Id> {
810
28
    fn from(name: &'a str) -> Self {
811
28
        Self::Name(Cow::Borrowed(name))
812
28
    }
813
}
814

            
815
/// A type that can be used as a unique reference for a collection that
816
/// implements [`NamedCollection`].
817
pub trait Nameable<'a, Id> {
818
    /// Returns this name as a [`NamedReference`].
819
    fn name(self) -> Result<NamedReference<'a, Id>, crate::Error>;
820
}
821

            
822
impl<'a, Id> Nameable<'a, Id> for NamedReference<'a, Id> {
823
165
    fn name(self) -> Result<NamedReference<'a, Id>, crate::Error> {
824
165
        Ok(self)
825
165
    }
826
}
827

            
828
impl<'a, Id> Nameable<'a, Id> for &'a str {
829
8
    fn name(self) -> Result<NamedReference<'a, Id>, crate::Error> {
830
8
        Ok(NamedReference::from(self))
831
8
    }
832
}
833

            
834
impl<'a, Id> From<&'a String> for NamedReference<'a, Id> {
835
7742
    fn from(name: &'a String) -> Self {
836
7742
        Self::Name(Cow::Borrowed(name.as_str()))
837
7742
    }
838
}
839

            
840
impl<'a, Id> Nameable<'a, Id> for &'a String {
841
7668
    fn name(self) -> Result<NamedReference<'a, Id>, crate::Error> {
842
7668
        Ok(NamedReference::from(self))
843
7668
    }
844
}
845

            
846
impl<'a, 'b, 'c, Id> From<&'b BorrowedDocument<'b>> for NamedReference<'a, Id> {
847
    fn from(doc: &'b BorrowedDocument<'b>) -> Self {
848
        Self::Id(doc.header.id)
849
    }
850
}
851

            
852
impl<'a, 'b, Id> Nameable<'a, Id> for &'a BorrowedDocument<'b> {
853
    fn name(self) -> Result<NamedReference<'a, Id>, crate::Error> {
854
        Ok(NamedReference::from(self))
855
    }
856
}
857

            
858
impl<'a, 'c, C> TryFrom<&'c CollectionDocument<C>> for NamedReference<'a, C::PrimaryKey>
859
where
860
    C: SerializedCollection,
861
{
862
    type Error = crate::Error;
863

            
864
32
    fn try_from(doc: &'c CollectionDocument<C>) -> Result<Self, crate::Error> {
865
32
        DocumentId::new(doc.header.id.clone()).map(Self::Id)
866
32
    }
867
}
868

            
869
impl<'a, C> Nameable<'a, C::PrimaryKey> for &'a CollectionDocument<C>
870
where
871
    C: SerializedCollection,
872
{
873
32
    fn name(self) -> Result<NamedReference<'a, C::PrimaryKey>, crate::Error> {
874
32
        NamedReference::try_from(self)
875
32
    }
876
}
877

            
878
impl<'a, Id> From<String> for NamedReference<'a, Id> {
879
    fn from(name: String) -> Self {
880
        Self::Name(Cow::Owned(name))
881
    }
882
}
883

            
884
impl<'a, Id> Nameable<'a, Id> for String {
885
    fn name(self) -> Result<NamedReference<'a, Id>, crate::Error> {
886
        Ok(NamedReference::from(self))
887
    }
888
}
889

            
890
impl<'a, Id> From<DocumentId> for NamedReference<'a, Id> {
891
    fn from(id: DocumentId) -> Self {
892
        Self::Id(id)
893
    }
894
}
895

            
896
impl<'a, Id> Nameable<'a, Id> for DocumentId {
897
    fn name(self) -> Result<NamedReference<'a, Id>, crate::Error> {
898
        Ok(NamedReference::from(self))
899
    }
900
}
901

            
902
impl<'a> Nameable<'a, Self> for u64 {
903
728
    fn name(self) -> Result<NamedReference<'a, Self>, crate::Error> {
904
728
        Ok(NamedReference::Key(self))
905
728
    }
906
}
907

            
908
impl<'a, Id> NamedReference<'a, Id>
909
where
910
    Id: for<'k> Key<'k>,
911
{
912
    /// Converts this reference to an owned reference with a `'static` lifetime.
913
37
    pub fn into_owned(self) -> NamedReference<'static, Id> {
914
37
        match self {
915
9
            Self::Name(name) => NamedReference::Name(match name {
916
                Cow::Owned(string) => Cow::Owned(string),
917
9
                Cow::Borrowed(borrowed) => Cow::Owned(borrowed.to_owned()),
918
            }),
919
16
            Self::Id(id) => NamedReference::Id(id),
920
12
            Self::Key(key) => NamedReference::Key(key),
921
        }
922
37
    }
923

            
924
    /// Returns this reference's id. If the reference is a name, the
925
    /// [`NamedCollection::ByNameView`] is queried for the id.
926
55
    pub async fn id<Col: NamedCollection<PrimaryKey = Id>, Cn: Connection>(
927
55
        &self,
928
55
        connection: &Cn,
929
55
    ) -> Result<Option<Col::PrimaryKey>, Error> {
930
55
        match self {
931
9
            Self::Name(name) => connection
932
9
                .view::<Col::ByNameView>()
933
9
                .with_key(name.as_ref().to_owned())
934
9
                .query()
935
6
                .await?
936
9
                .into_iter()
937
9
                .next()
938
9
                .map(|e| e.source.id.deserialize())
939
9
                .transpose(),
940
32
            Self::Id(id) => Ok(Some(id.deserialize()?)),
941
14
            Self::Key(id) => Ok(Some(id.clone())),
942
        }
943
55
    }
944
}
945

            
946
/// A future that resolves to an entry in a [`NamedCollection`].
947
#[must_use]
948
pub struct Entry<'a, 'name, Connection, Col, EI, EU>
949
where
950
    Col: NamedCollection + SerializedCollection,
951
    EI: EntryInsert<Col>,
952
    EU: EntryUpdate<Col>,
953
{
954
    state: EntryState<'a, 'name, Connection, Col, EI, EU>,
955
}
956

            
957
struct EntryBuilder<
958
    'a,
959
    'name,
960
    Connection,
961
    Col,
962
    EI: EntryInsert<Col> + 'a,
963
    EU: EntryUpdate<Col> + 'a,
964
> where
965
    Col: SerializedCollection,
966
{
967
    name: NamedReference<'name, Col::PrimaryKey>,
968
    connection: &'a Connection,
969
    insert: Option<EI>,
970
    update: Option<EU>,
971
    retry_limit: usize,
972
    _collection: PhantomData<Col>,
973
}
974

            
975
impl<'a, 'name, Connection, Col, EI, EU> Entry<'a, 'name, Connection, Col, EI, EU>
976
where
977
    Col: NamedCollection + SerializedCollection + 'static + Unpin,
978
    Connection: crate::connection::Connection,
979
    EI: EntryInsert<Col> + 'a + Unpin,
980
    EU: EntryUpdate<Col> + 'a + Unpin,
981
    'name: 'a,
982
{
983
94
    async fn execute(
984
94
        name: NamedReference<'name, Col::PrimaryKey>,
985
94
        connection: &'a Connection,
986
94
        insert: Option<EI>,
987
94
        update: Option<EU>,
988
94
        mut retry_limit: usize,
989
94
    ) -> Result<Option<CollectionDocument<Col>>, Error> {
990
169
        if let Some(mut existing) = Col::load(name, connection).await? {
991
16
            if let Some(update) = update {
992
11
                loop {
993
11
                    update.call(&mut existing.contents);
994
11
                    match existing.update(connection).await {
995
6
                        Ok(()) => return Ok(Some(existing)),
996
                        Err(Error::DocumentConflict(collection, header)) => {
997
                            // Another client has updated the document underneath us.
998
                            if retry_limit > 0 {
999
                                retry_limit -= 1;
                                existing = match Col::load(header.id, connection).await? {
                                    Some(doc) => doc,
                                    // Another client deleted the document before we could reload it.
                                    None => break Ok(None),
                                }
                            } else {
                                break Err(Error::DocumentConflict(collection, header));
                            }
                        }
5
                        Err(other) => break Err(other),
                    }
                }
            } else {
5
                Ok(Some(existing))
            }
78
        } else if let Some(insert) = insert {
78
            let new_document = insert.call();
78
            Ok(Some(Col::push(new_document, connection).await?))
        } else {
            Ok(None)
        }
94
    }
    fn pending(&mut self) -> &mut EntryBuilder<'a, 'name, Connection, Col, EI, EU> {
        match &mut self.state {
            EntryState::Pending(pending) => pending.as_mut().unwrap(),
            EntryState::Executing(_) => unreachable!(),
        }
    }

            
    /// If an entry with the key doesn't exist, `cb` will be executed to provide
    /// an initial document. This document will be saved before being returned.
84
    pub fn or_insert_with<F: EntryInsert<Col> + 'a + Unpin>(
84
        self,
84
        cb: F,
84
    ) -> Entry<'a, 'name, Connection, Col, F, EU> {
        Entry {
84
            state: match self.state {
                EntryState::Pending(Some(EntryBuilder {
84
                    name,
84
                    connection,
84
                    update,
84
                    retry_limit,
84
                    ..
84
                })) => EntryState::Pending(Some(EntryBuilder {
84
                    name,
84
                    connection,
84
                    insert: Some(cb),
84
                    update,
84
                    retry_limit,
84
                    _collection: PhantomData,
84
                })),
                _ => {
                    unreachable!("attempting to modify an already executing future")
                }
            },
        }
84
    }

            
    /// If an entry with the keys exists, `cb` will be executed with the stored
    /// value, allowing an opportunity to update the value. This new value will
    /// be saved to the database before returning. If an error occurs during
    /// update, `cb` may be invoked multiple times, up to the
    /// [`retry_limit`](Self::retry_limit()).
89
    pub fn update_with<F: EntryUpdate<Col> + 'a + Unpin>(
89
        self,
89
        cb: F,
89
    ) -> Entry<'a, 'name, Connection, Col, EI, F> {
        Entry {
89
            state: match self.state {
                EntryState::Pending(Some(EntryBuilder {
89
                    name,
89
                    connection,
89
                    insert,
89
                    retry_limit,
89
                    ..
89
                })) => EntryState::Pending(Some(EntryBuilder {
89
                    name,
89
                    connection,
89
                    insert,
89
                    update: Some(cb),
89
                    retry_limit,
89
                    _collection: PhantomData,
89
                })),
                _ => {
                    unreachable!("attempting to modify an already executing future")
                }
            },
        }
89
    }

            
    /// The number of attempts to attempt updating the document using
    /// `update_with` before returning an error.
    pub fn retry_limit(mut self, attempts: usize) -> Self {
        self.pending().retry_limit = attempts;
        self
    }
}

            
pub trait EntryInsert<Col: SerializedCollection>: Send + Unpin {
    fn call(self) -> Col::Contents;
}

            
impl<F, Col> EntryInsert<Col> for F
where
    F: FnOnce() -> Col::Contents + Send + Unpin,
    Col: SerializedCollection,
{
78
    fn call(self) -> Col::Contents {
78
        self()
78
    }
}

            
impl<Col> EntryInsert<Col> for ()
where
    Col: SerializedCollection,
{
    fn call(self) -> Col::Contents {
        unreachable!()
    }
}

            
pub trait EntryUpdate<Col>: Send + Unpin
where
    Col: SerializedCollection,
{
    fn call(&self, doc: &mut Col::Contents);
}

            
impl<F, Col> EntryUpdate<Col> for F
where
    F: Fn(&mut Col::Contents) + Send + Unpin,
    Col: NamedCollection + SerializedCollection,
{
11
    fn call(&self, doc: &mut Col::Contents) {
11
        self(doc);
11
    }
}

            
impl<Col> EntryUpdate<Col> for ()
where
    Col: SerializedCollection,
{
    fn call(&self, _doc: &mut Col::Contents) {
        unreachable!();
    }
}

            
impl<'a, 'name, Conn, Col, EI, EU> Future for Entry<'a, 'name, Conn, Col, EI, EU>
where
    Col: NamedCollection + SerializedCollection + 'static,
    <Col as Collection>::PrimaryKey: Unpin,
    Conn: Connection,
    EI: EntryInsert<Col> + 'a,
    EU: EntryUpdate<Col> + 'a,
    'name: 'a,
{
    type Output = Result<Option<CollectionDocument<Col>>, Error>;

            
351
    fn poll(
351
        mut self: std::pin::Pin<&mut Self>,
351
        cx: &mut std::task::Context<'_>,
351
    ) -> Poll<Self::Output> {
        if let Some(EntryBuilder {
94
            name,
94
            connection,
94
            insert,
94
            update,
94
            retry_limit,
            ..
351
        }) = match &mut self.state {
257
            EntryState::Executing(_) => None,
94
            EntryState::Pending(builder) => builder.take(),
94
        } {
94
            let future = Self::execute(name, connection, insert, update, retry_limit).boxed();
94
            self.state = EntryState::Executing(future);
257
        }

            
351
        if let EntryState::Executing(future) = &mut self.state {
351
            future.as_mut().poll(cx)
        } else {
            unreachable!()
        }
351
    }
}

            
enum EntryState<'a, 'name, Connection, Col, EI, EU>
where
    Col: NamedCollection + SerializedCollection,
    EI: EntryInsert<Col>,
    EU: EntryUpdate<Col>,
{
    Pending(Option<EntryBuilder<'a, 'name, Connection, Col, EI, EU>>),
    Executing(BoxFuture<'a, Result<Option<CollectionDocument<Col>>, Error>>),
}

            
/// Executes [`Connection::list()`] when awaited. Also offers methods to
/// customize the options for the operation.
#[must_use]
pub struct List<'a, Cn, Cl>(connection::List<'a, Cn, Cl>)
where
    Cl: Collection;

            
impl<'a, Cn, Cl> List<'a, Cn, Cl>
where
    Cl: Collection,
{
    /// Lists documents by id in ascending order.
    pub fn ascending(mut self) -> Self {
        self.0 = self.0.ascending();
        self
    }

            
    /// Lists documents by id in descending order.
5
    pub fn descending(mut self) -> Self {
5
        self.0 = self.0.descending();
5
        self
5
    }

            
    /// Sets the maximum number of results to return.
5
    pub fn limit(mut self, maximum_results: usize) -> Self {
5
        self.0 = self.0.limit(maximum_results);
5
        self
5
    }
}

            
impl<'a, Cn, Cl> Future for List<'a, Cn, Cl>
where
    Cl: SerializedCollection + Unpin,
    Cl::PrimaryKey: Unpin,
    Cn: Connection,
{
    type Output = Result<Vec<CollectionDocument<Cl>>, Error>;

            
40
    fn poll(
40
        mut self: std::pin::Pin<&mut Self>,
40
        cx: &mut std::task::Context<'_>,
40
    ) -> Poll<Self::Output> {
40
        let result = ready!(self.0.poll_unpin(cx));
20
        Poll::Ready(result.and_then(|docs| docs.collection_documents()))
40
    }
}