1
use std::{borrow::Cow, fmt::Debug, marker::PhantomData, task::Poll};
2

            
3
use async_trait::async_trait;
4
use futures::{future::BoxFuture, ready, Future, FutureExt};
5
use serde::{de::DeserializeOwned, Deserialize, Serialize};
6
use transmog::{Format, OwnedDeserializer};
7
use transmog_pot::Pot;
8

            
9
use crate::{
10
    connection::{self, Connection, Range},
11
    document::{BorrowedDocument, CollectionDocument, KeyId, OwnedDocument, OwnedDocuments},
12
    schema::{CollectionName, Schematic},
13
    Error,
14
};
15

            
16
/// A namespaced collection of `Document<Self>` items and views.
17
///
18
/// ## Deriving this trait
19
///
20
/// This trait can be derived instead of manually implemented:
21
///
22
/// ```rust
23
/// use bonsaidb_core::schema::Collection;
24
/// use serde::{Deserialize, Serialize};
25
///
26
/// #[derive(Debug, Serialize, Deserialize, Default, Collection)]
27
/// #[collection(name = "MyCollection")]
28
/// # #[collection(core = bonsaidb_core)]
29
/// pub struct MyCollection;
30
/// ```
31
///
32
/// If you're publishing a collection for use in multiple projects, consider
33
/// giving the collection an `authority`, which gives your collection a
34
/// namespace:
35
///
36
/// ```rust
37
/// use bonsaidb_core::schema::Collection;
38
/// use serde::{Deserialize, Serialize};
39
///
40
/// #[derive(Debug, Serialize, Deserialize, Default, Collection)]
41
/// #[collection(name = "MyCollection", authority = "khonsulabs")]
42
/// # #[collection(core = bonsaidb_core)]
43
/// pub struct MyCollection;
44
/// ```
45
///
46
/// The list of views can be specified using the `views` parameter:
47
///
48
/// ```rust
49
/// use bonsaidb_core::schema::{Collection, View};
50
/// use serde::{Deserialize, Serialize};
51
///
52
/// #[derive(Debug, Serialize, Deserialize, Default, Collection)]
53
/// #[collection(name = "MyCollection", views = [ScoresByRank])]
54
/// # #[collection(core = bonsaidb_core)]
55
/// pub struct MyCollection;
56
///
57
/// #[derive(Debug, Clone, View)]
58
/// #[view(collection = MyCollection, key = u32, value = f32, name = "scores-by-rank")]
59
/// # #[view(core = bonsaidb_core)]
60
/// pub struct ScoresByRank;
61
/// #
62
/// # use bonsaidb_core::{
63
/// #     document::CollectionDocument,
64
/// #     schema::{
65
/// #         CollectionViewSchema,   ReduceResult,
66
/// #         ViewMapResult, ViewMappedValue,
67
/// #    },
68
/// # };
69
/// # impl CollectionViewSchema for ScoresByRank {
70
/// #     type View = Self;
71
/// #     fn map(
72
/// #         &self,
73
/// #         _document: CollectionDocument<<Self::View as View>::Collection>,
74
/// #     ) -> ViewMapResult<Self::View> {
75
/// #         todo!()
76
/// #     }
77
/// #
78
/// #     fn reduce(
79
/// #         &self,
80
/// #         _mappings: &[ViewMappedValue<Self::View>],
81
/// #         _rereduce: bool,
82
/// #     ) -> ReduceResult<Self::View> {
83
/// #         todo!()
84
/// #     }
85
/// # }
86
/// ```
87
///
88
/// ### Specifying a Collection Encryption Key
89
///
90
/// By default, encryption will be required if an `encryption_key` is provided:
91
///
92
/// ```rust
93
/// use bonsaidb_core::{document::KeyId, schema::Collection};
94
/// use serde::{Deserialize, Serialize};
95
///
96
/// #[derive(Debug, Serialize, Deserialize, Default, Collection)]
97
/// #[collection(name = "MyCollection", encryption_key = Some(KeyId::Master))]
98
/// # #[collection(core = bonsaidb_core)]
99
/// pub struct MyCollection;
100
/// ```
101
///
102
/// The `encryption_required` parameter can be provided if you wish to be
103
/// explicit:
104
///
105
/// ```rust
106
/// use bonsaidb_core::{document::KeyId, schema::Collection};
107
/// use serde::{Deserialize, Serialize};
108
///
109
/// #[derive(Debug, Serialize, Deserialize, Default, Collection)]
110
/// #[collection(name = "MyCollection")]
111
/// #[collection(encryption_key = Some(KeyId::Master), encryption_required)]
112
/// # #[collection(core = bonsaidb_core)]
113
/// pub struct MyCollection;
114
/// ```
115
///
116
/// Or, if you wish your collection to be encrypted if its available, but not
117
/// cause errors when being stored without encryption, you can provide the
118
/// `encryption_optional` parameter:
119
///
120
/// ```rust
121
/// use bonsaidb_core::{document::KeyId, schema::Collection};
122
/// use serde::{Deserialize, Serialize};
123
///
124
/// #[derive(Debug, Serialize, Deserialize, Default, Collection)]
125
/// #[collection(name = "MyCollection")]
126
/// #[collection(encryption_key = Some(KeyId::Master), encryption_optional)]
127
/// # #[collection(core = bonsaidb_core)]
128
/// pub struct MyCollection;
129
/// ```
130
///
131
/// ### Changing the serialization strategy
132
///
133
/// BonsaiDb uses [`transmog`](::transmog) to allow customizing serialization
134
/// formats. To use one of the formats Transmog already supports, add its crate
135
/// to your Cargo.toml and use it like this example using `transmog_bincode`:
136
///
137
/// ```rust
138
/// use bonsaidb_core::schema::Collection;
139
/// use serde::{Deserialize, Serialize};
140
///
141
/// #[derive(Debug, Serialize, Deserialize, Default, Collection)]
142
/// #[collection(name = "MyCollection")]
143
/// #[collection(serialization = transmog_bincode::Bincode)]
144
/// # #[collection(core = bonsaidb_core)]
145
/// pub struct MyCollection;
146
/// ```
147
///
148
/// To manually implement `SerializedCollection` you can pass `None` to
149
/// `serialization`:
150
///
151
/// ```rust
152
/// use bonsaidb_core::schema::Collection;
153
///
154
/// #[derive(Debug, Default, Collection)]
155
/// #[collection(name = "MyCollection")]
156
/// #[collection(serialization = None)]
157
/// # #[collection(core = bonsaidb_core)]
158
/// pub struct MyCollection;
159
/// ```
160
pub trait Collection: Debug + Send + Sync {
161
    /// The `Id` of this collection.
162
    fn collection_name() -> CollectionName;
163

            
164
    /// Defines all `View`s in this collection in `schema`.
165
    fn define_views(schema: &mut Schematic) -> Result<(), Error>;
166

            
167
    /// If a [`KeyId`] is returned, this collection will be stored encrypted
168
    /// at-rest using the key specified.
169
    #[must_use]
170
1758988
    fn encryption_key() -> Option<KeyId> {
171
1758988
        None
172
1758988
    }
173
}
174

            
175
/// A collection that knows how to serialize and deserialize documents to an associated type.
176
///
177
/// These examples for this type use this basic collection definition:
178
///
179
/// ```rust
180
/// use bonsaidb_core::{
181
///     schema::{Collection, CollectionName, DefaultSerialization, Schematic},
182
///     Error,
183
/// };
184
/// use serde::{Deserialize, Serialize};
185
///
186
/// #[derive(Debug, Serialize, Deserialize, Default, Collection)]
187
/// #[collection(name = "MyCollection")]
188
/// # #[collection(core = bonsaidb_core)]
189
/// pub struct MyCollection {
190
///     pub rank: u32,
191
///     pub score: f32,
192
/// }
193
/// ```
194
#[async_trait]
195
pub trait SerializedCollection: Collection {
196
    /// The type of the contents stored in documents in this collection.
197
    type Contents: Send + Sync;
198
    /// The serialization format for this collection.
199
    type Format: OwnedDeserializer<Self::Contents>;
200

            
201
    /// Returns the configured instance of [`Self::Format`].
202
    // TODO allow configuration to be passed here, such as max allocation bytes.
203
    fn format() -> Self::Format;
204

            
205
    /// Deserialize `data` as `Self::Contents` using this collection's format.
206
90807
    fn deserialize(data: &[u8]) -> Result<Self::Contents, Error> {
207
90807
        Self::format()
208
90807
            .deserialize_owned(data)
209
90807
            .map_err(|err| crate::Error::Serialization(err.to_string()))
210
90807
    }
211

            
212
    /// Serialize `item` using this collection's format.
213
39468
    fn serialize(item: &Self::Contents) -> Result<Vec<u8>, Error> {
214
39468
        Self::format()
215
39468
            .serialize(item)
216
39468
            .map_err(|err| crate::Error::Serialization(err.to_string()))
217
39468
    }
218

            
219
    /// Gets a [`CollectionDocument`] with `id` from `connection`.
220
    ///
221
    /// ```rust
222
    /// # bonsaidb_core::__doctest_prelude!();
223
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
224
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
225
    /// if let Some(doc) = MyCollection::get(42, &db).await? {
226
    ///     println!(
227
    ///         "Retrieved revision {} with deserialized contents: {:?}",
228
    ///         doc.header.revision, doc.contents
229
    ///     );
230
    /// }
231
    /// # Ok(())
232
    /// # })
233
    /// # }
234
    /// ```
235
9587
    async fn get<C: Connection>(
236
9587
        id: u64,
237
9587
        connection: &C,
238
9587
    ) -> Result<Option<CollectionDocument<Self>>, Error>
239
9587
    where
240
9587
        Self: Sized,
241
9587
    {
242
10587
        let possible_doc = connection.get::<Self>(id).await?;
243
9586
        Ok(possible_doc.as_ref().map(TryInto::try_into).transpose()?)
244
19173
    }
245

            
246
    /// Retrieves all documents matching `ids`. Documents that are not found
247
    /// are not returned, but no error will be generated.
248
    ///
249
    /// ```rust
250
    /// # bonsaidb_core::__doctest_prelude!();
251
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
252
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
253
    /// for doc in MyCollection::get_multiple(&[42, 43], &db).await? {
254
    ///     println!(
255
    ///         "Retrieved #{} with deserialized contents: {:?}",
256
    ///         doc.header.id, doc.contents
257
    ///     );
258
    /// }
259
    /// # Ok(())
260
    /// # })
261
    /// # }
262
    /// ```
263
10
    async fn get_multiple<C: Connection>(
264
10
        ids: &[u64],
265
10
        connection: &C,
266
10
    ) -> Result<Vec<CollectionDocument<Self>>, Error>
267
10
    where
268
10
        Self: Sized,
269
10
    {
270
10
        connection
271
10
            .collection::<Self>()
272
10
            .get_multiple(ids)
273
10
            .await
274
10
            .and_then(|docs| docs.collection_documents())
275
20
    }
276

            
277
    /// Retrieves all documents matching the range of `ids`.
278
    ///
279
    /// ```rust
280
    /// # bonsaidb_core::__doctest_prelude!();
281
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
282
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
283
    /// for doc in MyCollection::list(42.., &db).descending().limit(20).await? {
284
    ///     println!(
285
    ///         "Retrieved #{} with deserialized contents: {:?}",
286
    ///         doc.header.id, doc.contents
287
    ///     );
288
    /// }
289
    /// # Ok(())
290
    /// # })
291
    /// # }
292
    /// ```
293
15
    fn list<R: Into<Range<u64>>, C: Connection>(ids: R, connection: &'_ C) -> List<'_, C, Self>
294
15
    where
295
15
        Self: Sized,
296
15
    {
297
15
        List(connection::List::new(
298
15
            connection::PossiblyOwned::Owned(connection.collection::<Self>()),
299
15
            ids.into(),
300
15
        ))
301
15
    }
302

            
303
    /// Retrieves all documents.
304
    ///
305
    /// ```rust
306
    /// # bonsaidb_core::__doctest_prelude!();
307
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
308
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
309
    /// for doc in MyCollection::all(&db).await? {
310
    ///     println!(
311
    ///         "Retrieved #{} with deserialized contents: {:?}",
312
    ///         doc.header.id, doc.contents
313
    ///     );
314
    /// }
315
    /// # Ok(())
316
    /// # })
317
    /// # }
318
    /// ```
319
5
    fn all<C: Connection>(connection: &C) -> List<'_, C, Self>
320
5
    where
321
5
        Self: Sized,
322
5
    {
323
5
        List(connection::List::new(
324
5
            connection::PossiblyOwned::Owned(connection.collection::<Self>()),
325
5
            Range::from(..),
326
5
        ))
327
5
    }
328

            
329
    /// Pushes this value into the collection, returning the created document.
330
    /// This function is useful when `Self != Self::Contents`.
331
    ///
332
    /// ```rust
333
    /// # bonsaidb_core::__doctest_prelude!();
334
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
335
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
336
    /// let document = MyCollection::push(MyCollection::default(), &db).await?;
337
    /// println!(
338
    ///     "Inserted {:?} with id {} with revision {}",
339
    ///     document.contents, document.header.id, document.header.revision
340
    /// );
341
    /// # Ok(())
342
    /// # })
343
    /// # }
344
    /// ```
345
2455
    async fn push<Cn: Connection>(
346
2455
        contents: Self::Contents,
347
2455
        connection: &Cn,
348
2455
    ) -> Result<CollectionDocument<Self>, InsertError<Self::Contents>>
349
2455
    where
350
2455
        Self: Sized + 'static,
351
2455
        Self::Contents: 'async_trait,
352
2455
    {
353
3736
        let header = match connection.collection::<Self>().push(&contents).await {
354
2452
            Ok(header) => header,
355
3
            Err(error) => return Err(InsertError { contents, error }),
356
        };
357
2452
        Ok(CollectionDocument { header, contents })
358
4910
    }
359

            
360
    /// Pushes this value into the collection, returning the created document.
361
    ///
362
    /// ```rust
363
    /// # bonsaidb_core::__doctest_prelude!();
364
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
365
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
366
    /// let document = MyCollection::default().push_into(&db).await?;
367
    /// println!(
368
    ///     "Inserted {:?} with id {} with revision {}",
369
    ///     document.contents, document.header.id, document.header.revision
370
    /// );
371
    /// # Ok(())
372
    /// # })
373
    /// # }
374
    /// ```
375
2381
    async fn push_into<Cn: Connection>(
376
2381
        self,
377
2381
        connection: &Cn,
378
2381
    ) -> Result<CollectionDocument<Self>, InsertError<Self>>
379
2381
    where
380
2381
        Self: SerializedCollection<Contents = Self> + Sized + 'static,
381
2381
    {
382
3662
        Self::push(self, connection).await
383
4762
    }
384

            
385
    /// Inserts this value into the collection with the specified id, returning
386
    /// the created document.
387
    ///
388
    /// ```rust
389
    /// # bonsaidb_core::__doctest_prelude!();
390
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
391
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
392
    /// let document = MyCollection::insert(42, MyCollection::default(), &db).await?;
393
    /// assert_eq!(document.header.id, 42);
394
    /// println!(
395
    ///     "Inserted {:?} with revision {}",
396
    ///     document.contents, document.header.revision
397
    /// );
398
    /// # Ok(())
399
    /// # })
400
    /// # }
401
    /// ```
402
1012
    async fn insert<Cn: Connection>(
403
1012
        id: u64,
404
1012
        contents: Self::Contents,
405
1012
        connection: &Cn,
406
1012
    ) -> Result<CollectionDocument<Self>, InsertError<Self::Contents>>
407
1012
    where
408
1012
        Self: Sized + 'static,
409
1012
        Self::Contents: 'async_trait,
410
1012
    {
411
3366
        let header = match connection.collection::<Self>().insert(id, &contents).await {
412
506
            Ok(header) => header,
413
506
            Err(error) => return Err(InsertError { contents, error }),
414
        };
415
506
        Ok(CollectionDocument { header, contents })
416
2024
    }
417

            
418
    /// Inserts this value into the collection with the given `id`, returning
419
    /// the created document.
420
    ///
421
    /// ```rust
422
    /// # bonsaidb_core::__doctest_prelude!();
423
    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
424
    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
425
    /// let document = MyCollection::default().insert_into(42, &db).await?;
426
    /// assert_eq!(document.header.id, 42);
427
    /// println!(
428
    ///     "Inserted {:?} with revision {}",
429
    ///     document.contents, document.header.revision
430
    /// );
431
    /// # Ok(())
432
    /// # })
433
    /// # }
434
    /// ```
435
1012
    async fn insert_into<Cn: Connection>(
436
1012
        self,
437
1012
        id: u64,
438
1012
        connection: &Cn,
439
1012
    ) -> Result<CollectionDocument<Self>, InsertError<Self>>
440
1012
    where
441
1012
        Self: SerializedCollection<Contents = Self> + Sized + 'static,
442
1012
    {
443
3366
        Self::insert(id, self, connection).await
444
2024
    }
445
}
446

            
447
/// A convenience trait for easily storing Serde-compatible types in documents.
448
pub trait DefaultSerialization: Collection {}
449

            
450
impl<T> SerializedCollection for T
451
where
452
    T: DefaultSerialization + Serialize + DeserializeOwned,
453
{
454
    type Contents = Self;
455
    type Format = Pot;
456

            
457
344509
    fn format() -> Self::Format {
458
344509
        Pot::default()
459
344509
    }
460
}
461

            
462
/// An error from inserting a [`CollectionDocument`].
463
#[derive(thiserror::Error, Debug)]
464
#[error("{error}")]
465
pub struct InsertError<T> {
466
    /// The original value being inserted.
467
    pub contents: T,
468
    /// The error that occurred while inserting.
469
    pub error: Error,
470
}
471

            
472
/// A collection with a unique name column.
473
///
474
/// ## Finding a document by unique name
475
///
476
/// ```rust
477
/// # bonsaidb_core::__doctest_prelude!();
478
/// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
479
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
480
/// if let Some(doc) = MyCollection::load("unique name", &db).await? {
481
///     println!(
482
///         "Retrieved revision {} with deserialized contents: {:?}",
483
///         doc.header.revision, doc.contents
484
///     );
485
/// }
486
/// # Ok(())
487
/// # })
488
/// # }
489
/// ```
490
///
491
/// Load accepts either a string or a u64. This enables building methods that
492
/// accept either the unique ID or the unique name:
493
///
494
/// ```rust
495
/// # bonsaidb_core::__doctest_prelude!();
496
/// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
497
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
498
/// if let Some(doc) = MyCollection::load(42, &db).await? {
499
///     println!(
500
///         "Retrieved revision {} with deserialized contents: {:?}",
501
///         doc.header.revision, doc.contents
502
///     );
503
/// }
504
/// # Ok(())
505
/// # })
506
/// # }
507
/// ```
508
///
509
/// ## Executing an insert or update
510
///
511
/// ```rust
512
/// # bonsaidb_core::__doctest_prelude!();
513
/// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
514
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
515
/// let upserted = MyCollection::entry("unique name", &db)
516
///     .update_with(|existing: &mut MyCollection| {
517
///         existing.rank += 1;
518
///     })
519
///     .or_insert_with(MyCollection::default)
520
///     .await?
521
///     .unwrap();
522
/// println!("Rank: {:?}", upserted.contents.rank);
523
///
524
/// # Ok(())
525
/// # })
526
/// # }
527
/// ```
528
#[async_trait]
529
pub trait NamedCollection: Collection + Unpin {
530
    /// The name view defined for the collection.
531
    type ByNameView: crate::schema::SerializedView<Key = String>;
532

            
533
    /// Gets a [`CollectionDocument`] with `id` from `connection`.
534
6166
    async fn load<'name, N: Into<NamedReference<'name>> + Send + Sync, C: Connection>(
535
6166
        id: N,
536
6166
        connection: &C,
537
6166
    ) -> Result<Option<CollectionDocument<Self>>, Error>
538
6166
    where
539
6166
        Self: SerializedCollection + Sized + 'static,
540
6166
    {
541
6281
        let possible_doc = Self::load_document(id, connection).await?;
542
6166
        Ok(possible_doc
543
6166
            .as_ref()
544
6166
            .map(CollectionDocument::try_from)
545
6166
            .transpose()?)
546
12332
    }
547

            
548
    /// Gets a [`CollectionDocument`] with `id` from `connection`.
549
90
    fn entry<'connection, 'name, N: Into<NamedReference<'name>> + Send + Sync, C: Connection>(
550
90
        id: N,
551
90
        connection: &'connection C,
552
90
    ) -> Entry<'connection, 'name, C, Self, (), ()>
553
90
    where
554
90
        Self: SerializedCollection + Sized,
555
90
    {
556
90
        let name = id.into();
557
90
        Entry {
558
90
            state: EntryState::Pending(Some(EntryBuilder {
559
90
                name,
560
90
                connection,
561
90
                insert: None,
562
90
                update: None,
563
90
                retry_limit: 0,
564
90
                _collection: PhantomData,
565
90
            })),
566
90
        }
567
90
    }
568

            
569
    /// Loads a document from this collection by name, if applicable. Return
570
    /// `Ok(None)` if unsupported.
571
    #[allow(unused_variables)]
572
6166
    async fn load_document<'name, N: Into<NamedReference<'name>> + Send + Sync, C: Connection>(
573
6166
        name: N,
574
6166
        connection: &C,
575
6166
    ) -> Result<Option<OwnedDocument>, Error>
576
6166
    where
577
6166
        Self: SerializedCollection + Sized,
578
6166
    {
579
6166
        match name.into() {
580
26
            NamedReference::Id(id) => connection.get::<Self>(id).await,
581
6140
            NamedReference::Name(name) => Ok(connection
582
6140
                .view::<Self::ByNameView>()
583
6140
                .with_key(name.as_ref().to_owned())
584
6262
                .query_with_docs()
585
6261
                .await?
586
                .documents
587
6140
                .into_iter()
588
6140
                .next()
589
6140
                .map(|(_, document)| document)),
590
        }
591
12331
    }
592
}
593

            
594
/// A reference to a collection that has a unique name view.
595
74
#[derive(Clone, PartialEq, Deserialize, Serialize, Debug)]
596
#[must_use]
597
pub enum NamedReference<'a> {
598
    /// An entity's name.
599
    Name(Cow<'a, str>),
600
    /// A document id.
601
    Id(u64),
602
}
603

            
604
impl<'a> From<&'a str> for NamedReference<'a> {
605
700
    fn from(name: &'a str) -> Self {
606
700
        Self::Name(Cow::Borrowed(name))
607
700
    }
608
}
609

            
610
impl<'a> From<&'a String> for NamedReference<'a> {
611
152800
    fn from(name: &'a String) -> Self {
612
152800
        Self::Name(Cow::Borrowed(name.as_str()))
613
152800
    }
614
}
615

            
616
impl<'a, 'b, 'c> From<&'b BorrowedDocument<'b>> for NamedReference<'a> {
617
    fn from(doc: &'b BorrowedDocument<'b>) -> Self {
618
        Self::Id(doc.header.id)
619
    }
620
}
621

            
622
impl<'a, 'c, C> From<&'c CollectionDocument<C>> for NamedReference<'a>
623
where
624
    C: SerializedCollection,
625
{
626
32
    fn from(doc: &'c CollectionDocument<C>) -> Self {
627
32
        Self::Id(doc.header.id)
628
32
    }
629
}
630

            
631
impl<'a> From<String> for NamedReference<'a> {
632
    fn from(name: String) -> Self {
633
        Self::Name(Cow::Owned(name))
634
    }
635
}
636

            
637
impl<'a> From<u64> for NamedReference<'a> {
638
700
    fn from(id: u64) -> Self {
639
700
        Self::Id(id)
640
700
    }
641
}
642

            
643
impl<'a> NamedReference<'a> {
644
    /// Converts this reference to an owned reference with a `'static` lifetime.
645
925
    pub fn into_owned(self) -> NamedReference<'static> {
646
925
        match self {
647
225
            Self::Name(name) => NamedReference::Name(match name {
648
                Cow::Owned(string) => Cow::Owned(string),
649
225
                Cow::Borrowed(borrowed) => Cow::Owned(borrowed.to_owned()),
650
            }),
651
700
            Self::Id(id) => NamedReference::Id(id),
652
        }
653
925
    }
654

            
655
    /// Returns this reference's id. If the reference is a name, the
656
    /// [`NamedCollection::ByNameView`] is queried for the id.
657
55
    pub async fn id<Col: NamedCollection, Cn: Connection>(
658
55
        &self,
659
55
        connection: &Cn,
660
55
    ) -> Result<Option<u64>, Error> {
661
55
        match self {
662
9
            Self::Name(name) => Ok(connection
663
9
                .view::<Col::ByNameView>()
664
9
                .with_key(name.as_ref().to_owned())
665
9
                .query()
666
6
                .await?
667
9
                .into_iter()
668
9
                .next()
669
9
                .map(|e| e.source.id)),
670
46
            Self::Id(id) => Ok(Some(*id)),
671
        }
672
55
    }
673
}
674

            
675
/// A future that resolves to an entry in a [`NamedCollection`].
676
#[must_use]
677
pub struct Entry<'a, 'name, Connection, Col, EI, EU>
678
where
679
    Col: NamedCollection + SerializedCollection,
680
    EI: EntryInsert<Col>,
681
    EU: EntryUpdate<Col>,
682
{
683
    state: EntryState<'a, 'name, Connection, Col, EI, EU>,
684
}
685

            
686
struct EntryBuilder<
687
    'a,
688
    'name,
689
    Connection,
690
    Col,
691
    EI: EntryInsert<Col> + 'a,
692
    EU: EntryUpdate<Col> + 'a,
693
> where
694
    Col: SerializedCollection,
695
{
696
    name: NamedReference<'name>,
697
    connection: &'a Connection,
698
    insert: Option<EI>,
699
    update: Option<EU>,
700
    retry_limit: usize,
701
    _collection: PhantomData<Col>,
702
}
703

            
704
impl<'a, 'name, Connection, Col, EI, EU> Entry<'a, 'name, Connection, Col, EI, EU>
705
where
706
    Col: NamedCollection + SerializedCollection + 'static + Unpin,
707
    Connection: crate::connection::Connection,
708
    EI: EntryInsert<Col> + 'a + Unpin,
709
    EU: EntryUpdate<Col> + 'a + Unpin,
710
    'name: 'a,
711
{
712
90
    async fn execute(
713
90
        name: NamedReference<'name>,
714
90
        connection: &'a Connection,
715
90
        insert: Option<EI>,
716
90
        update: Option<EU>,
717
90
        mut retry_limit: usize,
718
90
    ) -> Result<Option<CollectionDocument<Col>>, Error> {
719
163
        if let Some(mut existing) = Col::load(name, connection).await? {
720
16
            if let Some(update) = update {
721
11
                loop {
722
11
                    update.call(&mut existing.contents);
723
11
                    match existing.update(connection).await {
724
6
                        Ok(()) => return Ok(Some(existing)),
725
                        Err(Error::DocumentConflict(collection, id)) => {
726
                            // Another client has updated the document underneath us.
727
                            if retry_limit > 0 {
728
                                retry_limit -= 1;
729
                                existing = match Col::load(id, connection).await? {
730
                                    Some(doc) => doc,
731
                                    // Another client deleted the document before we could reload it.
732
                                    None => break Ok(None),
733
                                }
734
                            } else {
735
                                break Err(Error::DocumentConflict(collection, id));
736
                            }
737
                        }
738
5
                        Err(other) => break Err(other),
739
                    }
740
                }
741
            } else {
742
5
                Ok(Some(existing))
743
            }
744
74
        } else if let Some(insert) = insert {
745
74
            let new_document = insert.call();
746
74
            Ok(Some(Col::push(new_document, connection).await?))
747
        } else {
748
            Ok(None)
749
        }
750
90
    }
751
    fn pending(&mut self) -> &mut EntryBuilder<'a, 'name, Connection, Col, EI, EU> {
752
        match &mut self.state {
753
            EntryState::Pending(pending) => pending.as_mut().unwrap(),
754
            EntryState::Executing(_) => unreachable!(),
755
        }
756
    }
757

            
758
    /// If an entry with the key doesn't exist, `cb` will be executed to provide
759
    /// an initial document. This document will be saved before being returned.
760
80
    pub fn or_insert_with<F: EntryInsert<Col> + 'a + Unpin>(
761
80
        self,
762
80
        cb: F,
763
80
    ) -> Entry<'a, 'name, Connection, Col, F, EU> {
764
        Entry {
765
80
            state: match self.state {
766
                EntryState::Pending(Some(EntryBuilder {
767
80
                    name,
768
80
                    connection,
769
80
                    update,
770
80
                    retry_limit,
771
80
                    ..
772
80
                })) => EntryState::Pending(Some(EntryBuilder {
773
80
                    name,
774
80
                    connection,
775
80
                    insert: Some(cb),
776
80
                    update,
777
80
                    retry_limit,
778
80
                    _collection: PhantomData,
779
80
                })),
780
                _ => {
781
                    unreachable!("attempting to modify an already executing future")
782
                }
783
            },
784
        }
785
80
    }
786

            
787
    /// If an entry with the keys exists, `cb` will be executed with the stored
788
    /// value, allowing an opportunity to update the value. This new value will
789
    /// be saved to the database before returning. If an error occurs during
790
    /// update, `cb` may be invoked multiple times, up to the
791
    /// [`retry_limit`](Self::retry_limit()).
792
85
    pub fn update_with<F: EntryUpdate<Col> + 'a + Unpin>(
793
85
        self,
794
85
        cb: F,
795
85
    ) -> Entry<'a, 'name, Connection, Col, EI, F> {
796
        Entry {
797
85
            state: match self.state {
798
                EntryState::Pending(Some(EntryBuilder {
799
85
                    name,
800
85
                    connection,
801
85
                    insert,
802
85
                    retry_limit,
803
85
                    ..
804
85
                })) => EntryState::Pending(Some(EntryBuilder {
805
85
                    name,
806
85
                    connection,
807
85
                    insert,
808
85
                    update: Some(cb),
809
85
                    retry_limit,
810
85
                    _collection: PhantomData,
811
85
                })),
812
                _ => {
813
                    unreachable!("attempting to modify an already executing future")
814
                }
815
            },
816
        }
817
85
    }
818

            
819
    /// The number of attempts to attempt updating the document using
820
    /// `update_with` before returning an error.
821
    pub fn retry_limit(mut self, attempts: usize) -> Self {
822
        self.pending().retry_limit = attempts;
823
        self
824
    }
825
}
826

            
827
pub trait EntryInsert<Col: SerializedCollection>: Send + Unpin {
828
    fn call(self) -> Col::Contents;
829
}
830

            
831
impl<F, Col> EntryInsert<Col> for F
832
where
833
    F: FnOnce() -> Col::Contents + Send + Unpin,
834
    Col: SerializedCollection,
835
{
836
74
    fn call(self) -> Col::Contents {
837
74
        self()
838
74
    }
839
}
840

            
841
impl<Col> EntryInsert<Col> for ()
842
where
843
    Col: SerializedCollection,
844
{
845
    fn call(self) -> Col::Contents {
846
        unreachable!()
847
    }
848
}
849

            
850
pub trait EntryUpdate<Col>: Send + Unpin
851
where
852
    Col: SerializedCollection,
853
{
854
    fn call(&self, doc: &mut Col::Contents);
855
}
856

            
857
impl<F, Col> EntryUpdate<Col> for F
858
where
859
    F: Fn(&mut Col::Contents) + Send + Unpin,
860
    Col: NamedCollection + SerializedCollection,
861
{
862
11
    fn call(&self, doc: &mut Col::Contents) {
863
11
        self(doc);
864
11
    }
865
}
866

            
867
impl<Col> EntryUpdate<Col> for ()
868
where
869
    Col: SerializedCollection,
870
{
871
    fn call(&self, _doc: &mut Col::Contents) {
872
        unreachable!();
873
    }
874
}
875

            
876
impl<'a, 'name, Conn, Col, EI, EU> Future for Entry<'a, 'name, Conn, Col, EI, EU>
877
where
878
    Col: NamedCollection + SerializedCollection + 'static,
879
    Conn: Connection,
880
    EI: EntryInsert<Col> + 'a,
881
    EU: EntryUpdate<Col> + 'a,
882
    'name: 'a,
883
{
884
    type Output = Result<Option<CollectionDocument<Col>>, Error>;
885

            
886
336
    fn poll(
887
336
        mut self: std::pin::Pin<&mut Self>,
888
336
        cx: &mut std::task::Context<'_>,
889
336
    ) -> Poll<Self::Output> {
890
        if let Some(EntryBuilder {
891
90
            name,
892
90
            connection,
893
90
            insert,
894
90
            update,
895
90
            retry_limit,
896
            ..
897
336
        }) = match &mut self.state {
898
246
            EntryState::Executing(_) => None,
899
90
            EntryState::Pending(builder) => builder.take(),
900
90
        } {
901
90
            let future = Self::execute(name, connection, insert, update, retry_limit).boxed();
902
90
            self.state = EntryState::Executing(future);
903
246
        }
904

            
905
336
        if let EntryState::Executing(future) = &mut self.state {
906
336
            future.as_mut().poll(cx)
907
        } else {
908
            unreachable!()
909
        }
910
336
    }
911
}
912

            
913
enum EntryState<'a, 'name, Connection, Col, EI, EU>
914
where
915
    Col: NamedCollection + SerializedCollection,
916
    EI: EntryInsert<Col>,
917
    EU: EntryUpdate<Col>,
918
{
919
    Pending(Option<EntryBuilder<'a, 'name, Connection, Col, EI, EU>>),
920
    Executing(BoxFuture<'a, Result<Option<CollectionDocument<Col>>, Error>>),
921
}
922

            
923
/// Executes [`Connection::list()`] when awaited. Also offers methods to
924
/// customize the options for the operation.
925
#[must_use]
926
pub struct List<'a, Cn, Cl>(connection::List<'a, Cn, Cl>);
927

            
928
impl<'a, Cn, Cl> List<'a, Cn, Cl> {
929
    /// Lists documents by id in ascending order.
930
    pub fn ascending(mut self) -> Self {
931
        self.0 = self.0.ascending();
932
        self
933
    }
934

            
935
    /// Lists documents by id in descending order.
936
5
    pub fn descending(mut self) -> Self {
937
5
        self.0 = self.0.descending();
938
5
        self
939
5
    }
940

            
941
    /// Sets the maximum number of results to return.
942
5
    pub fn limit(mut self, maximum_results: usize) -> Self {
943
5
        self.0 = self.0.limit(maximum_results);
944
5
        self
945
5
    }
946
}
947

            
948
impl<'a, Cn, Cl> Future for List<'a, Cn, Cl>
949
where
950
    Cl: SerializedCollection + Unpin,
951
    Cn: Connection,
952
{
953
    type Output = Result<Vec<CollectionDocument<Cl>>, Error>;
954

            
955
40
    fn poll(
956
40
        mut self: std::pin::Pin<&mut Self>,
957
40
        cx: &mut std::task::Context<'_>,
958
40
    ) -> Poll<Self::Output> {
959
40
        let result = ready!(self.0.poll_unpin(cx));
960
20
        Poll::Ready(result.and_then(|docs| docs.collection_documents()))
961
40
    }
962
}