1
use std::{borrow::Cow, fmt::Debug, marker::PhantomData, task::Poll};
2

            
3
use async_trait::async_trait;
4
use futures::{future::BoxFuture, ready, Future, FutureExt};
5
use serde::{de::DeserializeOwned, Deserialize, Serialize};
6
use transmog::{Format, OwnedDeserializer};
7
use transmog_pot::Pot;
8

            
9
use crate::{
10
    connection::{self, Connection, Range},
11
    document::{BorrowedDocument, CollectionDocument, KeyId, OwnedDocument, OwnedDocuments},
12
    schema::{CollectionName, Schematic},
13
    Error,
14
};
15

            
16
/// A namespaced collection of `Document<Self>` items and views.
17
pub trait Collection: Debug + Send + Sync {
18
    /// The `Id` of this collection.
19
    fn collection_name() -> CollectionName;
20

            
21
    /// Defines all `View`s in this collection in `schema`.
22
    fn define_views(schema: &mut Schematic) -> Result<(), Error>;
23

            
24
    /// If a [`KeyId`] is returned, this collection will be stored encrypted
25
    /// at-rest using the key specified.
26
    #[must_use]
27
1612044
    fn encryption_key() -> Option<KeyId> {
28
1612044
        None
29
1612044
    }
30
}
31

            
32
/// A collection that knows how to serialize and deserialize documents to an associated type.
33
#[async_trait]
34
pub trait SerializedCollection: Collection {
35
    /// The type of the contents stored in documents in this collection.
36
    type Contents: Send + Sync;
37
    /// The serialization format for this collection.
38
    type Format: OwnedDeserializer<Self::Contents>;
39

            
40
    /// Returns the configured instance of [`Self::Format`].
41
    // TODO allow configuration to be passed here, such as max allocation bytes.
42
    fn format() -> Self::Format;
43

            
44
    /// Deserialize `data` as `Self::Contents` using this collection's format.
45
82072
    fn deserialize(data: &[u8]) -> Result<Self::Contents, Error> {
46
82072
        Self::format()
47
82072
            .deserialize_owned(data)
48
82072
            .map_err(|err| crate::Error::Serialization(err.to_string()))
49
82072
    }
50

            
51
    /// Serialize `item` using this collection's format.
52
39637
    fn serialize(item: &Self::Contents) -> Result<Vec<u8>, Error> {
53
39637
        Self::format()
54
39637
            .serialize(item)
55
39637
            .map_err(|err| crate::Error::Serialization(err.to_string()))
56
39637
    }
57

            
58
    /// Gets a [`CollectionDocument`] with `id` from `connection`.
59
9278
    async fn get<C: Connection>(
60
9278
        id: u64,
61
9278
        connection: &C,
62
9278
    ) -> Result<Option<CollectionDocument<Self>>, Error>
63
9278
    where
64
9278
        Self: Sized,
65
9278
    {
66
10325
        let possible_doc = connection.get::<Self>(id).await?;
67
9278
        Ok(possible_doc.as_ref().map(TryInto::try_into).transpose()?)
68
18556
    }
69

            
70
    /// Retrieves all documents matching `ids`. Documents that are not found
71
    /// are not returned, but no error will be generated.
72
8
    async fn get_multiple<C: Connection>(
73
8
        ids: &[u64],
74
8
        connection: &C,
75
8
    ) -> Result<Vec<CollectionDocument<Self>>, Error>
76
8
    where
77
8
        Self: Sized,
78
8
    {
79
8
        connection
80
8
            .collection::<Self>()
81
8
            .get_multiple(ids)
82
8
            .await
83
8
            .and_then(|docs| docs.collection_documents())
84
16
    }
85

            
86
    /// Retrieves all documents matching `ids`. Documents that are not found
87
    /// are not returned, but no error will be generated.
88
12
    fn list<R: Into<Range<u64>>, C: Connection>(ids: R, connection: &'_ C) -> List<'_, C, Self>
89
12
    where
90
12
        Self: Sized,
91
12
    {
92
12
        List(connection::List::new(
93
12
            connection::PossiblyOwned::Owned(connection.collection::<Self>()),
94
12
            ids.into(),
95
12
        ))
96
12
    }
97

            
98
    /// Pushes this value into the collection, returning the created document.
99
2390
    async fn push<Cn: Connection>(
100
2390
        contents: Self::Contents,
101
2390
        connection: &Cn,
102
2390
    ) -> Result<CollectionDocument<Self>, InsertError<Self::Contents>>
103
2390
    where
104
2390
        Self: Sized + 'static,
105
2390
        Self::Contents: 'async_trait,
106
2390
    {
107
3675
        let header = match connection.collection::<Self>().push(&contents).await {
108
2386
            Ok(header) => header,
109
3
            Err(error) => return Err(InsertError { contents, error }),
110
        };
111
2386
        Ok(CollectionDocument { header, contents })
112
4779
    }
113

            
114
    /// Pushes this value into the collection, returning the created document.
115
2318
    async fn push_into<Cn: Connection>(
116
2318
        self,
117
2318
        connection: &Cn,
118
2318
    ) -> Result<CollectionDocument<Self>, InsertError<Self>>
119
2318
    where
120
2318
        Self: SerializedCollection<Contents = Self> + Sized + 'static,
121
2318
    {
122
3603
        Self::push(self, connection).await
123
4636
    }
124

            
125
    /// Inserts this value into the collection with the specified id, returning
126
    /// the created document.
127
1010
    async fn insert<Cn: Connection>(
128
1010
        id: u64,
129
1010
        contents: Self::Contents,
130
1010
        connection: &Cn,
131
1010
    ) -> Result<CollectionDocument<Self>, InsertError<Self::Contents>>
132
1010
    where
133
1010
        Self: Sized + 'static,
134
1010
        Self::Contents: 'async_trait,
135
1010
    {
136
3225
        let header = match connection.collection::<Self>().insert(id, &contents).await {
137
505
            Ok(header) => header,
138
505
            Err(error) => return Err(InsertError { contents, error }),
139
        };
140
505
        Ok(CollectionDocument { header, contents })
141
2020
    }
142

            
143
    /// Inserts this value into the collection with the given `id`, returning
144
    /// the created document.
145
1010
    async fn insert_into<Cn: Connection>(
146
1010
        self,
147
1010
        id: u64,
148
1010
        connection: &Cn,
149
1010
    ) -> Result<CollectionDocument<Self>, InsertError<Self>>
150
1010
    where
151
1010
        Self: SerializedCollection<Contents = Self> + Sized + 'static,
152
1010
    {
153
3225
        Self::insert(id, self, connection).await
154
2020
    }
155
}
156

            
157
/// A convenience trait for easily storing Serde-compatible types in documents.
158
pub trait DefaultSerialization: Collection {}
159

            
160
impl<T> SerializedCollection for T
161
where
162
    T: DefaultSerialization + Serialize + DeserializeOwned,
163
{
164
    type Contents = Self;
165
    type Format = Pot;
166

            
167
293792
    fn format() -> Self::Format {
168
293792
        Pot::default()
169
293792
    }
170
}
171

            
172
/// An error from inserting a [`CollectionDocument`].
173
#[derive(thiserror::Error, Debug)]
174
#[error("{error}")]
175
pub struct InsertError<T> {
176
    /// The original value being inserted.
177
    pub contents: T,
178
    /// The error that occurred while inserting.
179
    pub error: Error,
180
}
181

            
182
/// A collection with a unique name column.
183
///
184
/// ## Finding a document by unique name
185
///
186
/// ```rust
187
/// # bonsaidb_core::__doctest_prelude!();
188
/// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
189
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
190
/// if let Some(doc) = MyCollection::load("unique name", &db).await? {
191
///     println!(
192
///         "Retrieved revision {} with deserialized contents: {:?}",
193
///         doc.header.revision, doc.contents
194
///     );
195
/// }
196
/// # Ok(())
197
/// # })
198
/// # }
199
/// ```
200
///
201
/// Load accepts either a string or a u64. This enables building methods that
202
/// accept either the unique ID or the unique name:
203
///
204
/// ```rust
205
/// # bonsaidb_core::__doctest_prelude!();
206
/// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
207
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
208
/// if let Some(doc) = MyCollection::load(42, &db).await? {
209
///     println!(
210
///         "Retrieved revision {} with deserialized contents: {:?}",
211
///         doc.header.revision, doc.contents
212
///     );
213
/// }
214
/// # Ok(())
215
/// # })
216
/// # }
217
/// ```
218
///
219
/// ## Executing an insert or update
220
///
221
/// ```rust
222
/// # bonsaidb_core::__doctest_prelude!();
223
/// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
224
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
225
/// let upserted = MyCollection::entry("unique name", &db)
226
///     .update_with(|existing: &mut MyCollection| {
227
///         existing.rank += 1;
228
///     })
229
///     .or_insert_with(MyCollection::default)
230
///     .await?
231
///     .unwrap();
232
/// println!("Rank: {:?}", upserted.contents.rank);
233
///
234
/// # Ok(())
235
/// # })
236
/// # }
237
/// ```
238
#[async_trait]
239
pub trait NamedCollection: Collection + Unpin {
240
    /// The name view defined for the collection.
241
    type ByNameView: crate::schema::SerializedView<Key = String>;
242

            
243
    /// Gets a [`CollectionDocument`] with `id` from `connection`.
244
5948
    async fn load<'name, N: Into<NamedReference<'name>> + Send + Sync, C: Connection>(
245
5948
        id: N,
246
5948
        connection: &C,
247
5948
    ) -> Result<Option<CollectionDocument<Self>>, Error>
248
5948
    where
249
5948
        Self: SerializedCollection + Sized + 'static,
250
5948
    {
251
6006
        let possible_doc = Self::load_document(id, connection).await?;
252
5947
        Ok(possible_doc
253
5947
            .as_ref()
254
5947
            .map(CollectionDocument::try_from)
255
5947
            .transpose()?)
256
11895
    }
257

            
258
    /// Gets a [`CollectionDocument`] with `id` from `connection`.
259
85
    fn entry<'connection, 'name, N: Into<NamedReference<'name>> + Send + Sync, C: Connection>(
260
85
        id: N,
261
85
        connection: &'connection C,
262
85
    ) -> Entry<'connection, 'name, C, Self, (), ()>
263
85
    where
264
85
        Self: SerializedCollection + Sized,
265
85
    {
266
85
        let name = id.into();
267
85
        Entry {
268
85
            state: EntryState::Pending(Some(EntryBuilder {
269
85
                name,
270
85
                connection,
271
85
                insert: None,
272
85
                update: None,
273
85
                retry_limit: 0,
274
85
                _collection: PhantomData,
275
85
            })),
276
85
        }
277
85
    }
278

            
279
    /// Loads a document from this collection by name, if applicable. Return
280
    /// `Ok(None)` if unsupported.
281
    #[allow(unused_variables)]
282
5948
    async fn load_document<'name, N: Into<NamedReference<'name>> + Send + Sync, C: Connection>(
283
5948
        name: N,
284
5948
        connection: &C,
285
5948
    ) -> Result<Option<OwnedDocument>, Error>
286
5948
    where
287
5948
        Self: SerializedCollection + Sized,
288
5948
    {
289
5948
        match name.into() {
290
20
            NamedReference::Id(id) => connection.get::<Self>(id).await,
291
5928
            NamedReference::Name(name) => Ok(connection
292
5928
                .view::<Self::ByNameView>()
293
5928
                .with_key(name.as_ref().to_owned())
294
5986
                .query_with_docs()
295
5949
                .await?
296
                .documents
297
5926
                .into_iter()
298
5926
                .next()
299
5926
                .map(|(_, document)| document)),
300
        }
301
11894
    }
302
}
303

            
304
/// A reference to a collection that has a unique name view.
305
74
#[derive(Clone, PartialEq, Deserialize, Serialize, Debug)]
306
#[must_use]
307
pub enum NamedReference<'a> {
308
    /// An entity's name.
309
    Name(Cow<'a, str>),
310
    /// A document id.
311
    Id(u64),
312
}
313

            
314
impl<'a> From<&'a str> for NamedReference<'a> {
315
552
    fn from(name: &'a str) -> Self {
316
552
        Self::Name(Cow::Borrowed(name))
317
552
    }
318
}
319

            
320
impl<'a> From<&'a String> for NamedReference<'a> {
321
135792
    fn from(name: &'a String) -> Self {
322
135792
        Self::Name(Cow::Borrowed(name.as_str()))
323
135792
    }
324
}
325

            
326
impl<'a, 'b, 'c> From<&'b BorrowedDocument<'b>> for NamedReference<'a> {
327
    fn from(doc: &'b BorrowedDocument<'b>) -> Self {
328
        Self::Id(doc.header.id)
329
    }
330
}
331

            
332
impl<'a, 'c, C> From<&'c CollectionDocument<C>> for NamedReference<'a>
333
where
334
    C: SerializedCollection,
335
{
336
24
    fn from(doc: &'c CollectionDocument<C>) -> Self {
337
24
        Self::Id(doc.header.id)
338
24
    }
339
}
340

            
341
impl<'a> From<String> for NamedReference<'a> {
342
    fn from(name: String) -> Self {
343
        Self::Name(Cow::Owned(name))
344
    }
345
}
346

            
347
impl<'a> From<u64> for NamedReference<'a> {
348
506
    fn from(id: u64) -> Self {
349
506
        Self::Id(id)
350
506
    }
351
}
352

            
353
impl<'a> NamedReference<'a> {
354
    /// Converts this reference to an owned reference with a `'static` lifetime.
355
851
    pub fn into_owned(self) -> NamedReference<'static> {
356
851
        match self {
357
207
            Self::Name(name) => NamedReference::Name(match name {
358
                Cow::Owned(string) => Cow::Owned(string),
359
207
                Cow::Borrowed(borrowed) => Cow::Owned(borrowed.to_owned()),
360
            }),
361
644
            Self::Id(id) => NamedReference::Id(id),
362
        }
363
851
    }
364

            
365
    /// Returns this reference's id. If the reference is a name, the
366
    /// [`NamedCollection::ByNameView`] is queried for the id.
367
47
    pub async fn id<Col: NamedCollection, Cn: Connection>(
368
47
        &self,
369
47
        connection: &Cn,
370
47
    ) -> Result<Option<u64>, Error> {
371
47
        match self {
372
9
            Self::Name(name) => Ok(connection
373
9
                .view::<Col::ByNameView>()
374
9
                .with_key(name.as_ref().to_owned())
375
9
                .query()
376
6
                .await?
377
9
                .into_iter()
378
9
                .next()
379
9
                .map(|e| e.source.id)),
380
38
            Self::Id(id) => Ok(Some(*id)),
381
        }
382
47
    }
383
}
384

            
385
/// A future that resolves to an entry in a [`NamedCollection`].
386
#[must_use]
387
pub struct Entry<'a, 'name, Connection, Col, EI, EU>
388
where
389
    Col: NamedCollection + SerializedCollection,
390
    EI: EntryInsert<Col>,
391
    EU: EntryUpdate<Col>,
392
{
393
    state: EntryState<'a, 'name, Connection, Col, EI, EU>,
394
}
395

            
396
struct EntryBuilder<
397
    'a,
398
    'name,
399
    Connection,
400
    Col,
401
    EI: EntryInsert<Col> + 'a,
402
    EU: EntryUpdate<Col> + 'a,
403
> where
404
    Col: SerializedCollection,
405
{
406
    name: NamedReference<'name>,
407
    connection: &'a Connection,
408
    insert: Option<EI>,
409
    update: Option<EU>,
410
    retry_limit: usize,
411
    _collection: PhantomData<Col>,
412
}
413

            
414
impl<'a, 'name, Connection, Col, EI, EU> Entry<'a, 'name, Connection, Col, EI, EU>
415
where
416
    Col: NamedCollection + SerializedCollection + 'static + Unpin,
417
    Connection: crate::connection::Connection,
418
    EI: EntryInsert<Col> + 'a + Unpin,
419
    EU: EntryUpdate<Col> + 'a + Unpin,
420
    'name: 'a,
421
{
422
85
    async fn execute(
423
85
        name: NamedReference<'name>,
424
85
        connection: &'a Connection,
425
85
        insert: Option<EI>,
426
85
        update: Option<EU>,
427
85
        mut retry_limit: usize,
428
85
    ) -> Result<Option<CollectionDocument<Col>>, Error> {
429
156
        if let Some(mut existing) = Col::load(name, connection).await? {
430
13
            if let Some(update) = update {
431
9
                loop {
432
9
                    update.call(&mut existing.contents);
433
9
                    match existing.update(connection).await {
434
5
                        Ok(()) => return Ok(Some(existing)),
435
                        Err(Error::DocumentConflict(collection, id)) => {
436
                            // Another client has updated the document underneath us.
437
                            if retry_limit > 0 {
438
                                retry_limit -= 1;
439
                                existing = match Col::load(id, connection).await? {
440
                                    Some(doc) => doc,
441
                                    // Another client deleted the document before we could reload it.
442
                                    None => break Ok(None),
443
                                }
444
                            } else {
445
                                break Err(Error::DocumentConflict(collection, id));
446
                            }
447
                        }
448
4
                        Err(other) => break Err(other),
449
                    }
450
                }
451
            } else {
452
4
                Ok(Some(existing))
453
            }
454
72
        } else if let Some(insert) = insert {
455
72
            let new_document = insert.call();
456
72
            Ok(Some(Col::push(new_document, connection).await?))
457
        } else {
458
            Ok(None)
459
        }
460
85
    }
461
    fn pending(&mut self) -> &mut EntryBuilder<'a, 'name, Connection, Col, EI, EU> {
462
        match &mut self.state {
463
            EntryState::Pending(pending) => pending.as_mut().unwrap(),
464
            EntryState::Executing(_) => unreachable!(),
465
        }
466
    }
467

            
468
    /// If an entry with the key doesn't exist, `cb` will be executed to provide
469
    /// an initial document. This document will be saved before being returned.
470
77
    pub fn or_insert_with<F: EntryInsert<Col> + 'a + Unpin>(
471
77
        self,
472
77
        cb: F,
473
77
    ) -> Entry<'a, 'name, Connection, Col, F, EU> {
474
        Entry {
475
77
            state: match self.state {
476
                EntryState::Pending(Some(EntryBuilder {
477
77
                    name,
478
77
                    connection,
479
77
                    update,
480
77
                    retry_limit,
481
77
                    ..
482
77
                })) => EntryState::Pending(Some(EntryBuilder {
483
77
                    name,
484
77
                    connection,
485
77
                    insert: Some(cb),
486
77
                    update,
487
77
                    retry_limit,
488
77
                    _collection: PhantomData,
489
77
                })),
490
                _ => {
491
                    unreachable!("attempting to modify an already executing future")
492
                }
493
            },
494
        }
495
77
    }
496

            
497
    /// If an entry with the keys exists, `cb` will be executed with the stored
498
    /// value, allowing an opportunity to update the value. This new value will
499
    /// be saved to the database before returning. If an error occurs during
500
    /// update, `cb` may be invoked multiple times, up to the
501
    /// [`retry_limit`](Self::retry_limit()).
502
81
    pub fn update_with<F: EntryUpdate<Col> + 'a + Unpin>(
503
81
        self,
504
81
        cb: F,
505
81
    ) -> Entry<'a, 'name, Connection, Col, EI, F> {
506
        Entry {
507
81
            state: match self.state {
508
                EntryState::Pending(Some(EntryBuilder {
509
81
                    name,
510
81
                    connection,
511
81
                    insert,
512
81
                    retry_limit,
513
81
                    ..
514
81
                })) => EntryState::Pending(Some(EntryBuilder {
515
81
                    name,
516
81
                    connection,
517
81
                    insert,
518
81
                    update: Some(cb),
519
81
                    retry_limit,
520
81
                    _collection: PhantomData,
521
81
                })),
522
                _ => {
523
                    unreachable!("attempting to modify an already executing future")
524
                }
525
            },
526
        }
527
81
    }
528

            
529
    /// The number of attempts to attempt updating the document using
530
    /// `update_with` before returning an error.
531
    pub fn retry_limit(mut self, attempts: usize) -> Self {
532
        self.pending().retry_limit = attempts;
533
        self
534
    }
535
}
536

            
537
pub trait EntryInsert<Col: SerializedCollection>: Send + Unpin {
538
    fn call(self) -> Col::Contents;
539
}
540

            
541
impl<F, Col> EntryInsert<Col> for F
542
where
543
    F: FnOnce() -> Col::Contents + Send + Unpin,
544
    Col: SerializedCollection,
545
{
546
72
    fn call(self) -> Col::Contents {
547
72
        self()
548
72
    }
549
}
550

            
551
impl<Col> EntryInsert<Col> for ()
552
where
553
    Col: SerializedCollection,
554
{
555
    fn call(self) -> Col::Contents {
556
        unreachable!()
557
    }
558
}
559

            
560
pub trait EntryUpdate<Col>: Send + Unpin
561
where
562
    Col: SerializedCollection,
563
{
564
    fn call(&self, doc: &mut Col::Contents);
565
}
566

            
567
impl<F, Col> EntryUpdate<Col> for F
568
where
569
    F: Fn(&mut Col::Contents) + Send + Unpin,
570
    Col: NamedCollection + SerializedCollection,
571
{
572
9
    fn call(&self, doc: &mut Col::Contents) {
573
9
        self(doc);
574
9
    }
575
}
576

            
577
impl<Col> EntryUpdate<Col> for ()
578
where
579
    Col: SerializedCollection,
580
{
581
    fn call(&self, _doc: &mut Col::Contents) {
582
        unreachable!();
583
    }
584
}
585

            
586
impl<'a, 'name, Conn, Col, EI, EU> Future for Entry<'a, 'name, Conn, Col, EI, EU>
587
where
588
    Col: NamedCollection + SerializedCollection + 'static,
589
    Conn: Connection,
590
    EI: EntryInsert<Col> + 'a,
591
    EU: EntryUpdate<Col> + 'a,
592
    'name: 'a,
593
{
594
    type Output = Result<Option<CollectionDocument<Col>>, Error>;
595

            
596
320
    fn poll(
597
320
        mut self: std::pin::Pin<&mut Self>,
598
320
        cx: &mut std::task::Context<'_>,
599
320
    ) -> Poll<Self::Output> {
600
        if let Some(EntryBuilder {
601
85
            name,
602
85
            connection,
603
85
            insert,
604
85
            update,
605
85
            retry_limit,
606
            ..
607
320
        }) = match &mut self.state {
608
235
            EntryState::Executing(_) => None,
609
85
            EntryState::Pending(builder) => builder.take(),
610
85
        } {
611
85
            let future = Self::execute(name, connection, insert, update, retry_limit).boxed();
612
85
            self.state = EntryState::Executing(future);
613
235
        }
614

            
615
320
        if let EntryState::Executing(future) = &mut self.state {
616
320
            future.as_mut().poll(cx)
617
        } else {
618
            unreachable!()
619
        }
620
320
    }
621
}
622

            
623
enum EntryState<'a, 'name, Connection, Col, EI, EU>
624
where
625
    Col: NamedCollection + SerializedCollection,
626
    EI: EntryInsert<Col>,
627
    EU: EntryUpdate<Col>,
628
{
629
    Pending(Option<EntryBuilder<'a, 'name, Connection, Col, EI, EU>>),
630
    Executing(BoxFuture<'a, Result<Option<CollectionDocument<Col>>, Error>>),
631
}
632

            
633
/// Executes [`Connection::list()`] when awaited. Also offers methods to
634
/// customize the options for the operation.
635
#[must_use]
636
pub struct List<'a, Cn, Cl>(connection::List<'a, Cn, Cl>);
637

            
638
impl<'a, Cn, Cl> List<'a, Cn, Cl> {
639
    /// Lists documents by id in ascending order.
640
    pub fn ascending(mut self) -> Self {
641
        self.0 = self.0.ascending();
642
        self
643
    }
644

            
645
    /// Lists documents by id in descending order.
646
4
    pub fn descending(mut self) -> Self {
647
4
        self.0 = self.0.descending();
648
4
        self
649
4
    }
650

            
651
    /// Sets the maximum number of results to return.
652
4
    pub fn limit(mut self, maximum_results: usize) -> Self {
653
4
        self.0 = self.0.limit(maximum_results);
654
4
        self
655
4
    }
656
}
657

            
658
impl<'a, Cn, Cl> Future for List<'a, Cn, Cl>
659
where
660
    Cl: SerializedCollection + Unpin,
661
    Cn: Connection,
662
{
663
    type Output = Result<Vec<CollectionDocument<Cl>>, Error>;
664

            
665
24
    fn poll(
666
24
        mut self: std::pin::Pin<&mut Self>,
667
24
        cx: &mut std::task::Context<'_>,
668
24
    ) -> Poll<Self::Output> {
669
24
        let result = ready!(self.0.poll_unpin(cx));
670
12
        Poll::Ready(result.and_then(|docs| docs.collection_documents()))
671
24
    }
672
}