1
use std::borrow::Borrow;
2
use std::collections::BTreeMap;
3

            
4
use arc_bytes::serde::Bytes;
5
use async_trait::async_trait;
6

            
7
use super::GroupedReductions;
8
use crate::connection::{
9
    AccessPolicy, HasSession, QueryKey, Range, RangeRef, SerializedQueryKey, Sort,
10
};
11
use crate::document::{
12
    CollectionDocument, CollectionHeader, Document, DocumentId, HasHeader, Header, OwnedDocument,
13
};
14
use crate::key::{self, ByteSource, Key, KeyEncoding};
15
use crate::schema::view::map::{
16
    CollectionMap, MappedDocuments, MappedSerializedValue, ViewMappings,
17
};
18
use crate::schema::view::{self};
19
use crate::schema::{self, CollectionName, MappedValue, Schematic, SerializedCollection, ViewName};
20
use crate::transaction::{OperationResult, Transaction};
21
use crate::Error;
22

            
23
/// The low-level interface to a database's [`schema::Schema`], giving access to
24
/// [`Collection`s](crate::schema::Collection) and
25
/// [`Views`s](crate::schema::View). This trait is not safe to use within async
26
/// contexts and will block the current thread. For async access, use
27
/// [`AsyncLowLevelConnection`].
28
///
29
/// This trait's methods are not designed for ergonomics. See
30
/// [`Connection`](super::Connection) for a higher-level interface.
31
pub trait LowLevelConnection: HasSchema + HasSession {
32
    /// Inserts a newly created document into the connected [`schema::Schema`]
33
    /// for the [`Collection`](schema::Collection) `C`. If `id` is `None` a unique id will be
34
    /// generated. If an id is provided and a document already exists with that
35
    /// id, a conflict error will be returned.
36
    ///
37
    /// This is the lower-level API. For better ergonomics, consider using
38
    /// one of:
39
    ///
40
    /// - [`SerializedCollection::push_into()`]
41
    /// - [`SerializedCollection::insert_into()`]
42
    /// - [`self.collection::<Collection>().insert()`](super::Collection::insert)
43
    /// - [`self.collection::<Collection>().push()`](super::Collection::push)
44
38772
    fn insert<C, PrimaryKey, B>(
45
38772
        &self,
46
38772
        id: Option<&PrimaryKey>,
47
38772
        contents: B,
48
38772
    ) -> Result<CollectionHeader<C::PrimaryKey>, Error>
49
38772
    where
50
38772
        C: schema::Collection,
51
38772
        B: Into<Bytes> + Send,
52
38772
        PrimaryKey: KeyEncoding<C::PrimaryKey> + Send + ?Sized,
53
38772
    {
54
38772
        let contents = contents.into();
55
38772
        let results = self.apply_transaction(Transaction::insert(
56
38772
            C::collection_name(),
57
38772
            id.map(|id| DocumentId::new(id)).transpose()?,
58
38772
            contents,
59
54
        ))?;
60
38718
        if let Some(OperationResult::DocumentUpdated { header, .. }) = results.into_iter().next() {
61
38718
            CollectionHeader::try_from(header)
62
        } else {
63
            unreachable!(
64
                "apply_transaction on a single insert should yield a single DocumentUpdated entry"
65
            )
66
        }
67
38772
    }
68

            
69
    /// Updates an existing document in the connected [`schema::Schema`] for the
70
    /// [`Collection`](schema::Collection) `C`. Upon success, `doc.revision` will be updated with
71
    /// the new revision.
72
    ///
73
    /// This is the lower-level API. For better ergonomics, consider using
74
    /// one of:
75
    ///
76
    /// - [`CollectionDocument::update()`]
77
    /// - [`self.collection::<Collection>().update()`](super::Collection::update)
78
916
    fn update<C: schema::Collection, D: Document<C> + Send + Sync>(
79
916
        &self,
80
916
        doc: &mut D,
81
916
    ) -> Result<(), Error> {
82
916
        let results = self.apply_transaction(Transaction::update(
83
916
            C::collection_name(),
84
916
            doc.header().into_header()?,
85
916
            doc.bytes()?,
86
12
        ))?;
87
904
        if let Some(OperationResult::DocumentUpdated { header, .. }) = results.into_iter().next() {
88
904
            doc.set_header(header)?;
89
904
            Ok(())
90
        } else {
91
            unreachable!(
92
                "apply_transaction on a single update should yield a single DocumentUpdated entry"
93
            )
94
        }
95
916
    }
96

            
97
    /// Overwrites an existing document, or inserts a new document. Upon success,
98
    /// `doc.revision` will be updated with the new revision information.
99
    ///
100
    /// This is the lower-level API. For better ergonomics, consider using
101
    /// one of:
102
    ///
103
    /// - [`SerializedCollection::overwrite()`]
104
    /// - [`SerializedCollection::overwrite_into()`]
105
    /// - [`self.collection::<Collection>().overwrite()`](super::Collection::overwrite)
106
9
    fn overwrite<C, PrimaryKey>(
107
9
        &self,
108
9
        id: &PrimaryKey,
109
9
        contents: Vec<u8>,
110
9
    ) -> Result<CollectionHeader<C::PrimaryKey>, Error>
111
9
    where
112
9
        C: schema::Collection,
113
9
        PrimaryKey: KeyEncoding<C::PrimaryKey>,
114
9
    {
115
9
        let results = self.apply_transaction(Transaction::overwrite(
116
9
            C::collection_name(),
117
9
            DocumentId::new(id)?,
118
9
            contents,
119
        ))?;
120
9
        if let Some(OperationResult::DocumentUpdated { header, .. }) = results.into_iter().next() {
121
9
            CollectionHeader::try_from(header)
122
        } else {
123
            unreachable!(
124
                "apply_transaction on a single update should yield a single DocumentUpdated entry"
125
            )
126
        }
127
9
    }
128

            
129
    /// Retrieves a stored document from [`Collection`](schema::Collection) `C` identified by `id`.
130
    ///
131
    /// This is a lower-level API. For better ergonomics, consider using one of:
132
    ///
133
    /// - [`SerializedCollection::get()`]
134
    /// - [`self.collection::<Collection>().get()`](super::Collection::get)
135
2335
    fn get<C, PrimaryKey>(&self, id: &PrimaryKey) -> Result<Option<OwnedDocument>, Error>
136
2335
    where
137
2335
        C: schema::Collection,
138
2335
        PrimaryKey: KeyEncoding<C::PrimaryKey> + ?Sized,
139
2335
    {
140
2335
        self.get_from_collection(DocumentId::new(id)?, &C::collection_name())
141
2335
    }
142

            
143
    /// Retrieves all documents matching `ids`. Documents that are not found are
144
    /// not returned, but no error will be generated.
145
    ///
146
    /// This is a lower-level API. For better ergonomics, consider using one of:
147
    ///
148
    /// - [`SerializedCollection::get_multiple()`]
149
    /// - [`self.collection::<Collection>().get_multiple()`](super::Collection::get_multiple)
150
1378
    fn get_multiple<'id, C, PrimaryKey, DocumentIds, I>(
151
1378
        &self,
152
1378
        ids: DocumentIds,
153
1378
    ) -> Result<Vec<OwnedDocument>, Error>
154
1378
    where
155
1378
        C: schema::Collection,
156
1378
        DocumentIds: IntoIterator<Item = &'id PrimaryKey, IntoIter = I> + Send + Sync,
157
1378
        I: Iterator<Item = &'id PrimaryKey> + Send + Sync,
158
1378
        PrimaryKey: KeyEncoding<C::PrimaryKey> + 'id + ?Sized,
159
1378
    {
160
1378
        let ids = ids
161
1378
            .into_iter()
162
1390
            .map(|id| DocumentId::new(id))
163
1378
            .collect::<Result<Vec<_>, _>>()?;
164
1378
        self.get_multiple_from_collection(&ids, &C::collection_name())
165
1378
    }
166

            
167
    /// Retrieves all documents within the range of `ids`. To retrieve all
168
    /// documents, pass in `..` for `ids`.
169
    ///
170
    /// This is a lower-level API. For better ergonomics, consider using one of:
171
    ///
172
    /// - [`SerializedCollection::all()`]
173
    /// - [`self.collection::<Collection>().all()`](super::Collection::all)
174
    /// - [`SerializedCollection::list()`]
175
    /// - [`self.collection::<Collection>().list()`](super::Collection::list)
176
24
    fn list<'id, C, R, PrimaryKey>(
177
24
        &self,
178
24
        ids: R,
179
24
        order: Sort,
180
24
        limit: Option<u32>,
181
24
    ) -> Result<Vec<OwnedDocument>, Error>
182
24
    where
183
24
        C: schema::Collection,
184
24
        R: Into<RangeRef<'id, C::PrimaryKey, PrimaryKey>> + Send,
185
24
        PrimaryKey: KeyEncoding<C::PrimaryKey> + PartialEq + 'id + ?Sized,
186
24
        C::PrimaryKey: Borrow<PrimaryKey> + PartialEq<PrimaryKey>,
187
24
    {
188
26
        let ids = ids.into().map_result(|id| DocumentId::new(id))?;
189
24
        self.list_from_collection(ids, order, limit, &C::collection_name())
190
24
    }
191

            
192
    /// Retrieves all documents within the range of `ids`. To retrieve all
193
    /// documents, pass in `..` for `ids`.
194
    ///
195
    /// This is the lower-level API. For better ergonomics, consider using one
196
    /// of:
197
    ///
198
    /// - [`SerializedCollection::all_async().headers()`](schema::List::headers)
199
    /// - [`self.collection::<Collection>().all().headers()`](super::List::headers)
200
    /// - [`SerializedCollection::list_async().headers()`](schema::List::headers)
201
    /// - [`self.collection::<Collection>().list().headers()`](super::List::headers)
202
3
    fn list_headers<'id, C, R, PrimaryKey>(
203
3
        &self,
204
3
        ids: R,
205
3
        order: Sort,
206
3
        limit: Option<u32>,
207
3
    ) -> Result<Vec<Header>, Error>
208
3
    where
209
3
        C: schema::Collection,
210
3
        R: Into<RangeRef<'id, C::PrimaryKey, PrimaryKey>> + Send,
211
3
        PrimaryKey: KeyEncoding<C::PrimaryKey> + PartialEq + 'id + ?Sized,
212
3
        C::PrimaryKey: Borrow<PrimaryKey> + PartialEq<PrimaryKey>,
213
3
    {
214
6
        let ids = ids.into().map_result(|id| DocumentId::new(id))?;
215
3
        self.list_headers_from_collection(ids, order, limit, &C::collection_name())
216
3
    }
217

            
218
    /// Counts the number of documents within the range of `ids`.
219
    ///
220
    /// This is a lower-level API. For better ergonomics, consider using one of:
221
    ///
222
    /// - [`SerializedCollection::all().count()`](schema::List::count)
223
    /// - [`self.collection::<Collection>().all().count()`](super::List::count)
224
    /// - [`SerializedCollection::list().count()`](schema::List::count)
225
    /// - [`self.collection::<Collection>().list().count()`](super::List::count)
226
6
    fn count<'id, C, R, PrimaryKey>(&self, ids: R) -> Result<u64, Error>
227
6
    where
228
6
        C: schema::Collection,
229
6
        R: Into<RangeRef<'id, C::PrimaryKey, PrimaryKey>> + Send,
230
6
        PrimaryKey: KeyEncoding<C::PrimaryKey> + PartialEq + 'id + ?Sized,
231
6
        C::PrimaryKey: Borrow<PrimaryKey> + PartialEq<PrimaryKey>,
232
6
    {
233
6
        self.count_from_collection(
234
6
            ids.into().map_result(|id| DocumentId::new(id))?,
235
6
            &C::collection_name(),
236
        )
237
6
    }
238

            
239
    /// Removes a `Document` from the database.
240
    ///
241
    /// This is a lower-level API. For better ergonomics, consider using
242
    /// one of:
243
    ///
244
    /// - [`CollectionDocument::delete()`]
245
    /// - [`self.collection::<Collection>().delete()`](super::Collection::delete)
246
18669
    fn delete<C: schema::Collection, H: HasHeader + Send + Sync>(
247
18669
        &self,
248
18669
        doc: &H,
249
18669
    ) -> Result<(), Error> {
250
18669
        let results =
251
18669
            self.apply_transaction(Transaction::delete(C::collection_name(), doc.header()?))?;
252
18669
        if let OperationResult::DocumentDeleted { .. } = &results[0] {
253
18669
            Ok(())
254
        } else {
255
            unreachable!(
256
                "apply_transaction on a single update should yield a single DocumentUpdated entry"
257
            )
258
        }
259
18669
    }
260

            
261
    /// Queries for view entries matching [`View`](schema::View).
262
    ///
263
    /// This is a lower-level API. For better ergonomics, consider querying the
264
    /// view using [`View::entries(self).query()`](super::View::query) instead. The
265
    /// parameters for the query can be customized on the builder returned from
266
    /// [`SerializedView::entries()`](schema::SerializedView::entries),
267
    /// [`SerializedView::entries_async()`](schema::SerializedView::entries_async),
268
    /// or [`Connection::view()`](super::Connection::view).
269
25039
    fn query<V: schema::SerializedView, Key>(
270
25039
        &self,
271
25039
        key: Option<QueryKey<'_, V::Key, Key>>,
272
25039
        order: Sort,
273
25039
        limit: Option<u32>,
274
25039
        access_policy: AccessPolicy,
275
25039
    ) -> Result<ViewMappings<V>, Error>
276
25039
    where
277
25039
        Key: KeyEncoding<V::Key> + PartialEq + ?Sized,
278
25039
        V::Key: Borrow<Key> + PartialEq<Key>,
279
25039
    {
280
25039
        let view = self.schematic().view::<V>()?;
281
25039
        let mappings = self.query_by_name(
282
25039
            &view.view_name(),
283
25039
            key.map(|key| key.serialized()).transpose()?,
284
25039
            order,
285
25039
            limit,
286
25039
            access_policy,
287
        )?;
288
25039
        mappings
289
25039
            .into_iter()
290
25039
            .map(|mapping| {
291
21286
                Ok(CollectionMap {
292
21286
                    key: <V::Key as key::Key>::from_ord_bytes(ByteSource::Borrowed(&mapping.key))
293
21286
                        .map_err(view::Error::key_serialization)
294
21286
                        .map_err(Error::from)?,
295
21286
                    value: V::deserialize(&mapping.value)?,
296
21286
                    source: mapping.source.try_into()?,
297
                })
298
25039
            })
299
25039
            .collect::<Result<Vec<_>, Error>>()
300
25039
    }
301

            
302
    /// Queries for view entries matching [`View`](schema::View) with their
303
    /// source documents.
304
    ///
305
    /// This is a lower-level API. For better ergonomics, consider querying the
306
    /// view using
307
    /// [`View::entries(self).query_with_docs()`](super::View::query_with_docs)
308
    /// instead. The parameters for the query can be customized on the builder
309
    /// returned from
310
    /// [`SerializedView::entries()`](schema::SerializedView::entries),
311
    /// [`SerializedView::entries_async()`](schema::SerializedView::entries_async),
312
    /// or [`Connection::view()`](super::Connection::view).
313
716
    fn query_with_docs<V: schema::SerializedView, Key>(
314
716
        &self,
315
716
        key: Option<QueryKey<'_, V::Key, Key>>,
316
716
        order: Sort,
317
716
        limit: Option<u32>,
318
716
        access_policy: AccessPolicy,
319
716
    ) -> Result<MappedDocuments<OwnedDocument, V>, Error>
320
716
    where
321
716
        Key: KeyEncoding<V::Key> + PartialEq + ?Sized,
322
716
        V::Key: Borrow<Key> + PartialEq<Key>,
323
716
    {
324
        // Query permission is checked by the query call
325
716
        let results = self.query::<V, Key>(key, order, limit, access_policy)?;
326

            
327
        // Verify that there is permission to fetch each document
328
716
        let documents = self
329
731
            .get_multiple::<V::Collection, _, _, _>(results.iter().map(|m| &m.source.id))?
330
716
            .into_iter()
331
731
            .map(|doc| {
332
729
                let id = doc.header.id.deserialize()?;
333
729
                Ok((id, doc))
334
731
            })
335
716
            .collect::<Result<BTreeMap<_, _>, Error>>()?;
336

            
337
716
        Ok(MappedDocuments {
338
716
            mappings: results,
339
716
            documents,
340
716
        })
341
716
    }
342

            
343
    /// Queries for view entries matching [`View`](schema::View) with their
344
    /// source documents, deserialized.
345
    ///
346
    /// This is a lower-level API. For better ergonomics, consider querying the
347
    /// view using
348
    /// [`View::entries(self).query_with_collection_docs()`](super::View::query_with_collection_docs)
349
    /// instead. The parameters for the query can be customized on the builder
350
    /// returned from
351
    /// [`SerializedView::entries()`](schema::SerializedView::entries),
352
    /// [`SerializedView::entries_async()`](schema::SerializedView::entries_async),
353
    /// or [`Connection::view()`](super::Connection::view).
354
25
    fn query_with_collection_docs<V, Key>(
355
25
        &self,
356
25
        key: Option<QueryKey<'_, V::Key, Key>>,
357
25
        order: Sort,
358
25
        limit: Option<u32>,
359
25
        access_policy: AccessPolicy,
360
25
    ) -> Result<MappedDocuments<CollectionDocument<V::Collection>, V>, Error>
361
25
    where
362
25
        Key: KeyEncoding<V::Key> + PartialEq + ?Sized,
363
25
        V::Key: Borrow<Key> + PartialEq<Key>,
364
25
        V: schema::SerializedView,
365
25
        V::Collection: SerializedCollection,
366
25
        <V::Collection as SerializedCollection>::Contents: std::fmt::Debug,
367
25
    {
368
25
        let mapped_docs = self.query_with_docs::<V, Key>(key, order, limit, access_policy)?;
369
25
        let mut collection_docs = BTreeMap::new();
370
66
        for (id, doc) in mapped_docs.documents {
371
41
            collection_docs.insert(id, CollectionDocument::<V::Collection>::try_from(&doc)?);
372
        }
373
25
        Ok(MappedDocuments {
374
25
            mappings: mapped_docs.mappings,
375
25
            documents: collection_docs,
376
25
        })
377
25
    }
378

            
379
    /// Reduces the view entries matching [`View`](schema::View).
380
    ///
381
    /// This is a lower-level API. For better ergonomics, consider reducing the
382
    /// view using [`View::entries(self).reduce()`](super::View::reduce)
383
    /// instead. The parameters for the query can be customized on the builder
384
    /// returned from
385
    /// [`SerializedView::entries()`](schema::SerializedView::entries),
386
    /// [`SerializedView::entries_async()`](schema::SerializedView::entries_async),
387
    /// or [`Connection::view()`](super::Connection::view).
388
42
    fn reduce<V: schema::SerializedView, Key>(
389
42
        &self,
390
42
        key: Option<QueryKey<'_, V::Key, Key>>,
391
42
        access_policy: AccessPolicy,
392
42
    ) -> Result<V::Value, Error>
393
42
    where
394
42
        Key: KeyEncoding<V::Key> + PartialEq + ?Sized,
395
42
        V::Key: Borrow<Key> + PartialEq<Key>,
396
42
    {
397
42
        let view = self.schematic().view::<V>()?;
398
42
        self.reduce_by_name(
399
42
            &view.view_name(),
400
42
            key.map(|key| key.serialized()).transpose()?,
401
42
            access_policy,
402
42
        )
403
42
        .and_then(|value| V::deserialize(&value))
404
42
    }
405

            
406
    /// Reduces the view entries matching [`View`](schema::View), reducing the
407
    /// values by each unique key.
408
    ///
409
    /// This is a lower-level API. For better ergonomics, consider reducing the
410
    /// view using
411
    /// [`View::entries(self).reduce_grouped()`](super::View::reduce_grouped)
412
    /// instead. The parameters for the query can be customized on the builder
413
    /// returned from
414
    /// [`SerializedView::entries()`](schema::SerializedView::entries),
415
    /// [`SerializedView::entries_async()`](schema::SerializedView::entries_async),
416
    /// or [`Connection::view()`](super::Connection::view).
417
18
    fn reduce_grouped<V: schema::SerializedView, Key>(
418
18
        &self,
419
18
        key: Option<QueryKey<'_, V::Key, Key>>,
420
18
        access_policy: AccessPolicy,
421
18
    ) -> Result<GroupedReductions<V>, Error>
422
18
    where
423
18
        Key: KeyEncoding<V::Key> + PartialEq + ?Sized,
424
18
        V::Key: Borrow<Key> + PartialEq<Key>,
425
18
    {
426
18
        let view = self.schematic().view::<V>()?;
427
18
        self.reduce_grouped_by_name(
428
18
            &view.view_name(),
429
18
            key.map(|key| key.serialized()).transpose()?,
430
18
            access_policy,
431
        )?
432
18
        .into_iter()
433
45
        .map(|map| {
434
45
            Ok(MappedValue::new(
435
45
                V::Key::from_ord_bytes(ByteSource::Borrowed(&map.key))
436
45
                    .map_err(view::Error::key_serialization)?,
437
45
                V::deserialize(&map.value)?,
438
            ))
439
45
        })
440
18
        .collect::<Result<Vec<_>, Error>>()
441
18
    }
442

            
443
    /// Deletes all of the documents associated with this view.
444
    ///
445
    /// This is a lower-level API. For better ergonomics, consider querying the
446
    /// view using
447
    /// [`View::entries(self).delete_docs()`](super::View::delete_docs())
448
    /// instead. The parameters for the query can be customized on the builder
449
    /// returned from
450
    /// [`SerializedView::entries()`](schema::SerializedView::entries),
451
    /// [`SerializedView::entries_async()`](schema::SerializedView::entries_async),
452
    /// or [`Connection::view()`](super::Connection::view).
453
9
    fn delete_docs<V: schema::SerializedView, Key>(
454
9
        &self,
455
9
        key: Option<QueryKey<'_, V::Key, Key>>,
456
9
        access_policy: AccessPolicy,
457
9
    ) -> Result<u64, Error>
458
9
    where
459
9
        Key: KeyEncoding<V::Key> + PartialEq + ?Sized,
460
9
        V::Key: Borrow<Key> + PartialEq<Key>,
461
9
    {
462
9
        let view = self.schematic().view::<V>()?;
463
9
        self.delete_docs_by_name(
464
9
            &view.view_name(),
465
9
            key.map(|key| key.serialized()).transpose()?,
466
9
            access_policy,
467
        )
468
9
    }
469

            
470
    /// Applies a [`Transaction`] to the [`schema::Schema`]. If any operation in the
471
    /// [`Transaction`] fails, none of the operations will be applied to the
472
    /// [`schema::Schema`].
473
    fn apply_transaction(&self, transaction: Transaction) -> Result<Vec<OperationResult>, Error>;
474

            
475
    /// Retrieves the document with `id` stored within the named `collection`.
476
    ///
477
    /// This is a lower-level API. For better ergonomics, consider using
478
    /// one of:
479
    ///
480
    /// - [`SerializedCollection::get()`]
481
    /// - [`self.collection::<Collection>().get()`](super::Collection::get)
482
    fn get_from_collection(
483
        &self,
484
        id: DocumentId,
485
        collection: &CollectionName,
486
    ) -> Result<Option<OwnedDocument>, Error>;
487

            
488
    /// Retrieves all documents matching `ids` from the named `collection`.
489
    /// Documents that are not found are not returned, but no error will be
490
    /// generated.
491
    ///
492
    /// This is a lower-level API. For better ergonomics, consider using one of:
493
    ///
494
    /// - [`SerializedCollection::get_multiple()`]
495
    /// - [`self.collection::<Collection>().get_multiple()`](super::Collection::get_multiple)
496
    fn get_multiple_from_collection(
497
        &self,
498
        ids: &[DocumentId],
499
        collection: &CollectionName,
500
    ) -> Result<Vec<OwnedDocument>, Error>;
501

            
502
    /// Retrieves all documents within the range of `ids` from the named
503
    /// `collection`. To retrieve all documents, pass in `..` for `ids`.
504
    ///
505
    /// This is a lower-level API. For better ergonomics, consider using one of:
506
    ///
507
    /// - [`SerializedCollection::all()`]
508
    /// - [`self.collection::<Collection>().all()`](super::Collection::all)
509
    /// - [`SerializedCollection::list()`]
510
    /// - [`self.collection::<Collection>().list()`](super::Collection::list)
511
    fn list_from_collection(
512
        &self,
513
        ids: Range<DocumentId>,
514
        order: Sort,
515
        limit: Option<u32>,
516
        collection: &CollectionName,
517
    ) -> Result<Vec<OwnedDocument>, Error>;
518

            
519
    /// Retrieves all headers within the range of `ids` from the named
520
    /// `collection`. To retrieve all documents, pass in `..` for `ids`.
521
    ///
522
    /// This is a lower-level API. For better ergonomics, consider using one of:
523
    ///
524
    /// - [`SerializedCollection::all().headers()`](schema::List::headers)
525
    /// - [`self.collection::<Collection>().all().headers()`](super::AsyncCollection::all)
526
    /// - [`SerializedCollection::list().headers()`](schema::List::headers)
527
    /// - [`self.collection::<Collection>().list()`](super::AsyncCollection::list)
528
    fn list_headers_from_collection(
529
        &self,
530
        ids: Range<DocumentId>,
531
        order: Sort,
532
        limit: Option<u32>,
533
        collection: &CollectionName,
534
    ) -> Result<Vec<Header>, Error>;
535

            
536
    /// Counts the number of documents within the range of `ids` from the named
537
    /// `collection`.
538
    ///
539
    /// This is a lower-level API. For better ergonomics, consider using one of:
540
    ///
541
    /// - [`SerializedCollection::all().count()`](schema::List::count)
542
    /// - [`self.collection::<Collection>().all().count()`](super::List::count)
543
    /// - [`SerializedCollection::list().count()`](schema::List::count)
544
    /// - [`self.collection::<Collection>().list().count()`](super::List::count)
545
    fn count_from_collection(
546
        &self,
547
        ids: Range<DocumentId>,
548
        collection: &CollectionName,
549
    ) -> Result<u64, Error>;
550

            
551
    /// Compacts the collection to reclaim unused disk space.
552
    ///
553
    /// This process is done by writing data to a new file and swapping the file
554
    /// once the process completes. This ensures that if a hardware failure,
555
    /// power outage, or crash occurs that the original collection data is left
556
    /// untouched.
557
    ///
558
    /// ## Errors
559
    ///
560
    /// * [`Error::CollectionNotFound`]: database `name` does not exist.
561
    /// * [`Error::Other`]: an error occurred while compacting the database.
562
    fn compact_collection_by_name(&self, collection: CollectionName) -> Result<(), Error>;
563

            
564
    /// Queries for view entries from the named `view`.
565
    ///
566
    /// This is a lower-level API. For better ergonomics, consider querying the
567
    /// view using [`View::entries(self).query()`](super::View::query) instead. The
568
    /// parameters for the query can be customized on the builder returned from
569
    /// [`Connection::view()`](super::Connection::view).
570
    fn query_by_name(
571
        &self,
572
        view: &ViewName,
573
        key: Option<SerializedQueryKey>,
574
        order: Sort,
575
        limit: Option<u32>,
576
        access_policy: AccessPolicy,
577
    ) -> Result<Vec<schema::view::map::Serialized>, Error>;
578

            
579
    /// Queries for view entries from the named `view` with their source
580
    /// documents.
581
    ///
582
    /// This is a lower-level API. For better ergonomics, consider querying the
583
    /// view using
584
    /// [`View::entries(self).query_with_docs()`](super::View::query_with_docs)
585
    /// instead. The parameters for the query can be customized on the builder
586
    /// returned from [`Connection::view()`](super::Connection::view).
587
    fn query_by_name_with_docs(
588
        &self,
589
        view: &ViewName,
590
        key: Option<SerializedQueryKey>,
591
        order: Sort,
592
        limit: Option<u32>,
593
        access_policy: AccessPolicy,
594
    ) -> Result<schema::view::map::MappedSerializedDocuments, Error>;
595

            
596
    /// Reduces the view entries from the named `view`.
597
    ///
598
    /// This is a lower-level API. For better ergonomics, consider reducing the
599
    /// view using [`View::entries(self).reduce()`](super::View::reduce)
600
    /// instead. The parameters for the query can be customized on the builder
601
    /// returned from [`Connection::view()`](super::Connection::view).
602
    fn reduce_by_name(
603
        &self,
604
        view: &ViewName,
605
        key: Option<SerializedQueryKey>,
606
        access_policy: AccessPolicy,
607
    ) -> Result<Vec<u8>, Error>;
608

            
609
    /// Reduces the view entries from the named `view`, reducing the values by each
610
    /// unique key.
611
    ///
612
    /// This is a lower-level API. For better ergonomics, consider reducing
613
    /// the view using
614
    /// [`View::entries(self).reduce_grouped()`](super::View::reduce_grouped) instead.
615
    /// The parameters for the query can be customized on the builder returned
616
    /// from [`Connection::view()`](super::Connection::view).
617
    fn reduce_grouped_by_name(
618
        &self,
619
        view: &ViewName,
620
        key: Option<SerializedQueryKey>,
621
        access_policy: AccessPolicy,
622
    ) -> Result<Vec<MappedSerializedValue>, Error>;
623

            
624
    /// Deletes all source documents for entries that match within the named
625
    /// `view`.
626
    ///
627
    /// This is a lower-level API. For better ergonomics, consider querying the
628
    /// view using
629
    /// [`View::entries(self).delete_docs()`](super::View::delete_docs())
630
    /// instead. The parameters for the query can be customized on the builder
631
    /// returned from [`Connection::view()`](super::Connection::view).
632
    fn delete_docs_by_name(
633
        &self,
634
        view: &ViewName,
635
        key: Option<SerializedQueryKey>,
636
        access_policy: AccessPolicy,
637
    ) -> Result<u64, Error>;
638
}
639

            
640
/// The low-level interface to a database's [`schema::Schema`], giving access to
641
/// [`Collection`s](crate::schema::Collection) and
642
/// [`Views`s](crate::schema::View). This trait is for use within async
643
/// contexts. For access outside of async contexts, use [`LowLevelConnection`].
644
///
645
/// This trait's methods are not designed for ergonomics. See
646
/// [`AsyncConnection`](super::AsyncConnection) for a higher-level interface.
647
#[async_trait]
648
pub trait AsyncLowLevelConnection: HasSchema + HasSession + Send + Sync {
649
    /// Inserts a newly created document into the connected [`schema::Schema`]
650
    /// for the [`Collection`](schema::Collection) `C`. If `id` is `None` a unique id will be
651
    /// generated. If an id is provided and a document already exists with that
652
    /// id, a conflict error will be returned.
653
    ///
654
    /// This is the lower-level API. For better ergonomics, consider using
655
    /// one of:
656
    ///
657
    /// - [`SerializedCollection::push_into_async()`]
658
    /// - [`SerializedCollection::insert_into_async()`]
659
    /// - [`self.collection::<Collection>().insert()`](super::AsyncCollection::insert)
660
    /// - [`self.collection::<Collection>().push()`](super::AsyncCollection::push)
661
10734
    async fn insert<C: schema::Collection, PrimaryKey: Send, B: Into<Bytes> + Send>(
662
10734
        &self,
663
10734
        id: Option<&PrimaryKey>,
664
10734
        contents: B,
665
10734
    ) -> Result<CollectionHeader<C::PrimaryKey>, Error>
666
10734
    where
667
10734
        PrimaryKey: KeyEncoding<C::PrimaryKey> + ?Sized,
668
10734
    {
669
10734
        let contents = contents.into();
670
10734
        let results = self
671
10734
            .apply_transaction(Transaction::insert(
672
10734
                C::collection_name(),
673
10734
                id.map(|id| DocumentId::new(id)).transpose()?,
674
10734
                contents,
675
            ))
676
25978
            .await?;
677
10211
        if let Some(OperationResult::DocumentUpdated { header, .. }) = results.into_iter().next() {
678
10211
            CollectionHeader::try_from(header)
679
        } else {
680
            unreachable!(
681
                "apply_transaction on a single insert should yield a single DocumentUpdated entry"
682
            )
683
        }
684
21468
    }
685

            
686
    /// Updates an existing document in the connected [`schema::Schema`] for the
687
    /// [`Collection`](schema::Collection)(schema::Collection) `C`. Upon success, `doc.revision`
688
    /// will be updated with the new revision.
689
    ///
690
    /// This is the lower-level API. For better ergonomics, consider using one
691
    /// of:
692
    ///
693
    /// - [`CollectionDocument::update_async()`]
694
    /// - [`self.collection::<Collection>().update()`](super::AsyncCollection::update)
695
5818
    async fn update<C: schema::Collection, D: Document<C> + Send + Sync>(
696
5818
        &self,
697
5818
        doc: &mut D,
698
5818
    ) -> Result<(), Error> {
699
5818
        let results = self
700
5818
            .apply_transaction(Transaction::update(
701
5818
                C::collection_name(),
702
5818
                doc.header().into_header()?,
703
5818
                doc.bytes()?,
704
            ))
705
12313
            .await?;
706
5798
        if let Some(OperationResult::DocumentUpdated { header, .. }) = results.into_iter().next() {
707
5798
            doc.set_header(header)?;
708
5798
            Ok(())
709
        } else {
710
            unreachable!(
711
                "apply_transaction on a single update should yield a single DocumentUpdated entry"
712
            )
713
        }
714
11636
    }
715

            
716
    /// Overwrites an existing document, or inserts a new document. Upon success,
717
    /// `doc.revision` will be updated with the new revision information.
718
    ///
719
    /// This is the lower-level API. For better ergonomics, consider using
720
    /// one of:
721
    ///
722
    /// - [`SerializedCollection::overwrite_async()`]
723
    /// - [`SerializedCollection::overwrite_into_async()`]
724
    /// - [`self.collection::<Collection>().overwrite()`](super::AsyncCollection::overwrite)
725
15
    async fn overwrite<'a, C, PrimaryKey>(
726
15
        &self,
727
15
        id: &PrimaryKey,
728
15
        contents: Vec<u8>,
729
15
    ) -> Result<CollectionHeader<C::PrimaryKey>, Error>
730
15
    where
731
15
        C: schema::Collection,
732
15
        PrimaryKey: KeyEncoding<C::PrimaryKey>,
733
15
    {
734
15
        let results = self
735
15
            .apply_transaction(Transaction::overwrite(
736
15
                C::collection_name(),
737
15
                DocumentId::new(id)?,
738
15
                contents,
739
            ))
740
15
            .await?;
741
15
        if let Some(OperationResult::DocumentUpdated { header, .. }) = results.into_iter().next() {
742
15
            CollectionHeader::try_from(header)
743
        } else {
744
            unreachable!(
745
                "apply_transaction on a single update should yield a single DocumentUpdated entry"
746
            )
747
        }
748
30
    }
749

            
750
    /// Retrieves a stored document from [`Collection`](schema::Collection) `C` identified by `id`.
751
    ///
752
    /// This is the lower-level API. For better ergonomics, consider using
753
    /// one of:
754
    ///
755
    /// - [`SerializedCollection::get_async()`]
756
    /// - [`self.collection::<Collection>().get()`](super::AsyncCollection::get)
757
16049
    async fn get<C, PrimaryKey>(&self, id: &PrimaryKey) -> Result<Option<OwnedDocument>, Error>
758
16049
    where
759
16049
        C: schema::Collection,
760
16049
        PrimaryKey: KeyEncoding<C::PrimaryKey> + ?Sized,
761
16049
    {
762
16049
        self.get_from_collection(DocumentId::new(id)?, &C::collection_name())
763
19777
            .await
764
32098
    }
765

            
766
    /// Retrieves all documents matching `ids`. Documents that are not found
767
    /// are not returned, but no error will be generated.
768
    ///
769
    /// This is the lower-level API. For better ergonomics, consider using
770
    /// one of:
771
    ///
772
    /// - [`SerializedCollection::get_multiple_async()`]
773
    /// - [`self.collection::<Collection>().get_multiple()`](super::AsyncCollection::get_multiple)
774
9626
    async fn get_multiple<'id, C, PrimaryKey, DocumentIds, I>(
775
9626
        &self,
776
9626
        ids: DocumentIds,
777
9626
    ) -> Result<Vec<OwnedDocument>, Error>
778
9626
    where
779
9626
        C: schema::Collection,
780
9626
        DocumentIds: IntoIterator<Item = &'id PrimaryKey, IntoIter = I> + Send + Sync,
781
9626
        I: Iterator<Item = &'id PrimaryKey> + Send + Sync,
782
9626
        PrimaryKey: KeyEncoding<C::PrimaryKey> + 'id + ?Sized,
783
9626
    {
784
9626
        let ids = ids
785
9626
            .into_iter()
786
9626
            .map(DocumentId::new)
787
9626
            .collect::<Result<Vec<_>, _>>()?;
788
9626
        self.get_multiple_from_collection(&ids, &C::collection_name())
789
8788
            .await
790
19252
    }
791

            
792
    /// Retrieves all documents within the range of `ids`. To retrieve all
793
    /// documents, pass in `..` for `ids`.
794
    ///
795
    /// This is the lower-level API. For better ergonomics, consider using one
796
    /// of:
797
    ///
798
    /// - [`SerializedCollection::all_async()`]
799
    /// - [`self.collection::<Collection>().all()`](super::AsyncCollection::all)
800
    /// - [`SerializedCollection::list_async()`]
801
    /// - [`self.collection::<Collection>().list()`](super::AsyncCollection::list)
802
20
    async fn list<'id, C, R, PrimaryKey>(
803
20
        &self,
804
20
        ids: R,
805
20
        order: Sort,
806
20
        limit: Option<u32>,
807
20
    ) -> Result<Vec<OwnedDocument>, Error>
808
20
    where
809
20
        C: schema::Collection,
810
20
        R: Into<RangeRef<'id, C::PrimaryKey, PrimaryKey>> + Send,
811
20
        PrimaryKey: KeyEncoding<C::PrimaryKey> + PartialEq + 'id + ?Sized,
812
20
        C::PrimaryKey: Borrow<PrimaryKey> + PartialEq<PrimaryKey>,
813
20
    {
814
30
        let ids = ids.into().map_result(|id| DocumentId::new(id))?;
815
20
        self.list_from_collection(ids, order, limit, &C::collection_name())
816
20
            .await
817
40
    }
818

            
819
    /// Retrieves all documents within the range of `ids`. To retrieve all
820
    /// documents, pass in `..` for `ids`.
821
    ///
822
    /// This is the lower-level API. For better ergonomics, consider using one
823
    /// of:
824
    ///
825
    /// - [`SerializedCollection::all_async().headers()`](schema::AsyncList::headers)
826
    /// - [`self.collection::<Collection>().all()`](super::AsyncList::headers)
827
    /// - [`SerializedCollection::list_async().headers()`](schema::AsyncList::headers)
828
    /// - [`self.collection::<Collection>().list().headers()`](super::AsyncList::headers)
829
5
    async fn list_headers<'id, C, R, PrimaryKey>(
830
5
        &self,
831
5
        ids: R,
832
5
        order: Sort,
833
5
        limit: Option<u32>,
834
5
    ) -> Result<Vec<Header>, Error>
835
5
    where
836
5
        C: schema::Collection,
837
5
        R: Into<RangeRef<'id, C::PrimaryKey, PrimaryKey>> + Send,
838
5
        PrimaryKey: KeyEncoding<C::PrimaryKey> + PartialEq + 'id + ?Sized,
839
5
        C::PrimaryKey: Borrow<PrimaryKey> + PartialEq<PrimaryKey>,
840
5
    {
841
10
        let ids = ids.into().map_result(|id| DocumentId::new(id))?;
842
5
        self.list_headers_from_collection(ids, order, limit, &C::collection_name())
843
5
            .await
844
10
    }
845

            
846
    /// Counts the number of documents within the range of `ids`.
847
    ///
848
    /// This is the lower-level API. For better ergonomics, consider using
849
    /// one of:
850
    ///
851
    /// - [`SerializedCollection::all_async().count()`](schema::AsyncList::count)
852
    /// - [`self.collection::<Collection>().all().count()`](super::AsyncList::count)
853
    /// - [`SerializedCollection::list_async().count()`](schema::AsyncList::count)
854
    /// - [`self.collection::<Collection>().list().count()`](super::AsyncList::count)
855
10
    async fn count<'id, C, R, PrimaryKey>(&self, ids: R) -> Result<u64, Error>
856
10
    where
857
10
        C: schema::Collection,
858
10
        R: Into<RangeRef<'id, C::PrimaryKey, PrimaryKey>> + Send,
859
10
        PrimaryKey: KeyEncoding<C::PrimaryKey> + PartialEq + 'id + ?Sized,
860
10
        C::PrimaryKey: Borrow<PrimaryKey> + PartialEq<PrimaryKey>,
861
10
    {
862
10
        self.count_from_collection(
863
10
            ids.into().map_result(|id| DocumentId::new(id))?,
864
10
            &C::collection_name(),
865
        )
866
10
        .await
867
20
    }
868

            
869
    /// Removes a `Document` from the database.
870
    ///
871
    /// This is the lower-level API. For better ergonomics, consider using
872
    /// one of:
873
    ///
874
    /// - [`CollectionDocument::delete_async()`]
875
    /// - [`self.collection::<Collection>().delete()`](super::AsyncCollection::delete)
876
1046
    async fn delete<C: schema::Collection, H: HasHeader + Send + Sync>(
877
1046
        &self,
878
1046
        doc: &H,
879
1046
    ) -> Result<(), Error> {
880
1046
        let results = self
881
1046
            .apply_transaction(Transaction::delete(C::collection_name(), doc.header()?))
882
4137
            .await?;
883
1044
        if let OperationResult::DocumentDeleted { .. } = &results[0] {
884
1044
            Ok(())
885
        } else {
886
            unreachable!(
887
                "apply_transaction on a single update should yield a single DocumentUpdated entry"
888
            )
889
        }
890
2092
    }
891
    /// Queries for view entries matching [`View`](schema::View)(super::AsyncView).
892
    ///
893
    /// This is the lower-level API. For better ergonomics, consider querying
894
    /// the view using [`View::entries(self).query()`](super::AsyncView::query)
895
    /// instead. The parameters for the query can be customized on the builder
896
    /// returned from [`AsyncConnection::view()`](super::AsyncConnection::view).
897
9924
    async fn query<V: schema::SerializedView, Key>(
898
9924
        &self,
899
9924
        key: Option<QueryKey<'_, V::Key, Key>>,
900
9924
        order: Sort,
901
9924
        limit: Option<u32>,
902
9924
        access_policy: AccessPolicy,
903
9924
    ) -> Result<ViewMappings<V>, Error>
904
9924
    where
905
9924
        Key: KeyEncoding<V::Key> + PartialEq + ?Sized,
906
9924
        V::Key: Borrow<Key> + PartialEq<Key>,
907
9924
    {
908
9924
        let view = self.schematic().view::<V>()?;
909
9924
        let mappings = self
910
9924
            .query_by_name(
911
9924
                &view.view_name(),
912
9924
                key.map(|key| key.serialized()).transpose()?,
913
9924
                order,
914
9924
                limit,
915
9924
                access_policy,
916
            )
917
9143
            .await?;
918
9924
        mappings
919
9924
            .into_iter()
920
10035
            .map(|mapping| {
921
9880
                Ok(CollectionMap {
922
9880
                    key: <V::Key as key::Key>::from_ord_bytes(ByteSource::Borrowed(&mapping.key))
923
9880
                        .map_err(view::Error::key_serialization)
924
9880
                        .map_err(Error::from)?,
925
9880
                    value: V::deserialize(&mapping.value)?,
926
9880
                    source: mapping.source.try_into()?,
927
                })
928
10035
            })
929
9924
            .collect::<Result<Vec<_>, Error>>()
930
19848
    }
931

            
932
    /// Queries for view entries matching [`View`](schema::View) with their source documents.
933
    ///
934
    /// This is the lower-level API. For better ergonomics, consider querying
935
    /// the view using [`View::entries(self).query_with_docs()`](super::AsyncView::query_with_docs) instead.
936
    /// The parameters for the query can be customized on the builder returned
937
    /// from [`AsyncConnection::view()`](super::AsyncConnection::view).
938
    #[must_use]
939
9598
    async fn query_with_docs<V: schema::SerializedView, Key>(
940
9598
        &self,
941
9598
        key: Option<QueryKey<'_, V::Key, Key>>,
942
9598
        order: Sort,
943
9598
        limit: Option<u32>,
944
9598
        access_policy: AccessPolicy,
945
9598
    ) -> Result<MappedDocuments<OwnedDocument, V>, Error>
946
9598
    where
947
9598
        Key: KeyEncoding<V::Key> + PartialEq + ?Sized,
948
9598
        V::Key: Borrow<Key> + PartialEq<Key>,
949
9598
    {
950
        // Query permission is checked by the query call
951
9598
        let results = self
952
9598
            .query::<V, Key>(key, order, limit, access_policy)
953
8820
            .await?;
954

            
955
        // Verify that there is permission to fetch each document
956
9598
        let documents = self
957
9600
            .get_multiple::<V::Collection, _, _, _>(results.iter().map(|m| &m.source.id))
958
8760
            .await?
959
9598
            .into_iter()
960
9600
            .map(|doc| {
961
9512
                let id = doc.header.id.deserialize()?;
962
9512
                Ok((id, doc))
963
9600
            })
964
9598
            .collect::<Result<BTreeMap<_, _>, Error>>()?;
965

            
966
9598
        Ok(MappedDocuments {
967
9598
            mappings: results,
968
9598
            documents,
969
9598
        })
970
19196
    }
971

            
972
    /// Queries for view entries matching [`View`](schema::View) with their source documents,
973
    /// deserialized.
974
    ///
975
    /// This is the lower-level API. For better ergonomics, consider querying
976
    /// the view using
977
    /// [`View::entries(self).query_with_collection_docs()`](super::AsyncView::query_with_collection_docs)
978
    /// instead. The parameters for the query can be customized on the builder
979
    /// returned from [`AsyncConnection::view()`](super::AsyncConnection::view).
980
    #[must_use]
981
117
    async fn query_with_collection_docs<V, Key>(
982
117
        &self,
983
117
        key: Option<QueryKey<'_, V::Key, Key>>,
984
117
        order: Sort,
985
117
        limit: Option<u32>,
986
117
        access_policy: AccessPolicy,
987
117
    ) -> Result<MappedDocuments<CollectionDocument<V::Collection>, V>, Error>
988
117
    where
989
117
        Key: KeyEncoding<V::Key> + PartialEq + ?Sized,
990
117
        V::Key: Borrow<Key> + PartialEq<Key>,
991
117
        V: schema::SerializedView,
992
117
        V::Collection: SerializedCollection,
993
117
        <V::Collection as SerializedCollection>::Contents: std::fmt::Debug,
994
117
    {
995
117
        let mapped_docs = self
996
117
            .query_with_docs::<V, Key>(key, order, limit, access_policy)
997
232
            .await?;
998
117
        let mut collection_docs = BTreeMap::new();
999
236
        for (id, doc) in mapped_docs.documents {
119
            collection_docs.insert(id, CollectionDocument::<V::Collection>::try_from(&doc)?);
        }
117
        Ok(MappedDocuments {
117
            mappings: mapped_docs.mappings,
117
            documents: collection_docs,
117
        })
234
    }

            
    /// Reduces the view entries matching [`View`](schema::View).
    ///
    /// This is the lower-level API. For better ergonomics, consider querying
    /// the view using
    /// [`View::entries(self).reduce()`](super::AsyncView::reduce)
    /// instead. The parameters for the query can be customized on the builder
    /// returned from [`AsyncConnection::view()`](super::AsyncConnection::view).
    #[must_use]
18526
    async fn reduce<V: schema::SerializedView, Key>(
18526
        &self,
18526
        key: Option<QueryKey<'_, V::Key, Key>>,
18526
        access_policy: AccessPolicy,
18526
    ) -> Result<V::Value, Error>
18526
    where
18526
        Key: KeyEncoding<V::Key> + PartialEq + ?Sized,
18526
        V::Key: Borrow<Key> + PartialEq<Key>,
18526
    {
18526
        let view = self.schematic().view::<V>()?;
18526
        self.reduce_by_name(
18526
            &view.view_name(),
18526
            key.map(|key| key.serialized()).transpose()?,
18526
            access_policy,
        )
16982
        .await
18526
        .and_then(|value| V::deserialize(&value))
37052
    }

            
    /// Reduces the view entries matching [`View`](schema::View), reducing the values by each
    /// unique key.
    ///
    /// This is the lower-level API. For better ergonomics, consider querying
    /// the view using
    /// [`View::entries(self).reduce_grouped()`](super::AsyncView::reduce_grouped)
    /// instead. The parameters for the query can be customized on the builder
    /// returned from [`AsyncConnection::view()`](super::AsyncConnection::view).
    #[must_use]
12
    async fn reduce_grouped<V: schema::SerializedView, Key>(
12
        &self,
12
        key: Option<QueryKey<'_, V::Key, Key>>,
12
        access_policy: AccessPolicy,
12
    ) -> Result<GroupedReductions<V>, Error>
12
    where
12
        Key: KeyEncoding<V::Key> + PartialEq + ?Sized,
12
        V::Key: Borrow<Key> + PartialEq<Key>,
12
    {
12
        let view = self.schematic().view::<V>()?;
12
        self.reduce_grouped_by_name(
12
            &view.view_name(),
12
            key.map(|key| key.serialized()).transpose()?,
12
            access_policy,
        )
14
        .await?
12
        .into_iter()
31
        .map(|map| {
31
            Ok(MappedValue::new(
31
                V::Key::from_ord_bytes(ByteSource::Borrowed(&map.key))
31
                    .map_err(view::Error::key_serialization)?,
31
                V::deserialize(&map.value)?,
            ))
31
        })
12
        .collect::<Result<Vec<_>, Error>>()
24
    }

            
    /// Deletes all of the documents associated with this view.
    ///
    /// This is the lower-level API. For better ergonomics, consider querying
    /// the view using
    /// [`View::entries(self).delete_docs()`](super::AsyncView::delete_docs)
    /// instead. The parameters for the query can be customized on the builder
    /// returned from [`AsyncConnection::view()`](super::AsyncConnection::view).
    #[must_use]
13
    async fn delete_docs<V: schema::SerializedView, Key>(
13
        &self,
13
        key: Option<QueryKey<'_, V::Key, Key>>,
13
        access_policy: AccessPolicy,
13
    ) -> Result<u64, Error>
13
    where
13
        Key: KeyEncoding<V::Key> + PartialEq + ?Sized,
13
        V::Key: Borrow<Key> + PartialEq<Key>,
13
    {
13
        let view = self.schematic().view::<V>()?;
13
        self.delete_docs_by_name(
13
            &view.view_name(),
13
            key.map(|key| key.serialized()).transpose()?,
13
            access_policy,
        )
13
        .await
26
    }

            
    /// Applies a [`Transaction`] to the [`Schema`](schema::Schema). If any
    /// operation in the [`Transaction`] fails, none of the operations will be
    /// applied to the [`Schema`](schema::Schema).
    async fn apply_transaction(
        &self,
        transaction: Transaction,
    ) -> Result<Vec<OperationResult>, Error>;

            
    /// Retrieves the document with `id` stored within the named `collection`.
    ///
    /// This is a lower-level API. For better ergonomics, consider using one of:
    ///
    /// - [`SerializedCollection::get_async()`]
    /// - [`self.collection::<Collection>().get()`](super::AsyncCollection::get)
    async fn get_from_collection(
        &self,
        id: DocumentId,
        collection: &CollectionName,
    ) -> Result<Option<OwnedDocument>, Error>;

            
    /// Retrieves all documents matching `ids` from the named `collection`.
    /// Documents that are not found are not returned, but no error will be
    /// generated.
    ///
    /// This is a lower-level API. For better ergonomics, consider using one of:
    ///
    /// - [`SerializedCollection::get_multiple_async()`]
    /// - [`self.collection::<Collection>().get_multiple()`](super::AsyncCollection::get_multiple)
    async fn get_multiple_from_collection(
        &self,
        ids: &[DocumentId],
        collection: &CollectionName,
    ) -> Result<Vec<OwnedDocument>, Error>;

            
    /// Retrieves all documents within the range of `ids` from the named
    /// `collection`. To retrieve all documents, pass in `..` for `ids`.
    ///
    /// This is a lower-level API. For better ergonomics, consider using one of:
    ///
    /// - [`SerializedCollection::all().headers()`](schema::List::headers)
    /// - [`self.collection::<Collection>().all().headers()`](super::List::headers)
    /// - [`SerializedCollection::list().headers()`](schema::List::headers)
    /// - [`self.collection::<Collection>().list().headers()`](super::List::headers)
    async fn list_from_collection(
        &self,
        ids: Range<DocumentId>,
        order: Sort,
        limit: Option<u32>,
        collection: &CollectionName,
    ) -> Result<Vec<OwnedDocument>, Error>;

            
    /// Retrieves all headers within the range of `ids` from the named
    /// `collection`. To retrieve all documents, pass in `..` for `ids`.
    ///
    /// This is a lower-level API. For better ergonomics, consider using one of:
    ///
    /// - [`SerializedCollection::all().headers()`](schema::AsyncList::headers)
    /// - [`self.collection::<Collection>().all().headers()`](super::AsyncList::headers)
    /// - [`SerializedCollection::list().headers()`](schema::AsyncList::headers)
    /// - [`self.collection::<Collection>().list().headers()`](super::AsyncList::headers)
    async fn list_headers_from_collection(
        &self,
        ids: Range<DocumentId>,
        order: Sort,
        limit: Option<u32>,
        collection: &CollectionName,
    ) -> Result<Vec<Header>, Error>;

            
    /// Counts the number of documents within the range of `ids` from the named
    /// `collection`.
    ///
    /// This is a lower-level API. For better ergonomics, consider using one of:
    ///
    /// - [`SerializedCollection::all_async().count()`](schema::AsyncList::count)
    /// - [`self.collection::<Collection>().all().count()`](super::AsyncList::count)
    /// - [`SerializedCollection::list_async().count()`](schema::AsyncList::count)
    /// - [`self.collection::<Collection>().list().count()`](super::AsyncList::count)
    async fn count_from_collection(
        &self,
        ids: Range<DocumentId>,
        collection: &CollectionName,
    ) -> Result<u64, Error>;

            
    /// Compacts the collection to reclaim unused disk space.
    ///
    /// This process is done by writing data to a new file and swapping the file
    /// once the process completes. This ensures that if a hardware failure,
    /// power outage, or crash occurs that the original collection data is left
    /// untouched.
    ///
    /// ## Errors
    ///
    /// * [`Error::CollectionNotFound`]: database `name` does not exist.
    /// * [`Error::Other`]: an error occurred while compacting the database.
    async fn compact_collection_by_name(&self, collection: CollectionName) -> Result<(), Error>;

            
    /// Queries for view entries from the named `view`.
    ///
    /// This is the lower-level API. For better ergonomics, consider querying
    /// the view using [`View::entries(self).query()`](super::AsyncView::query)
    /// instead. The parameters for the query can be customized on the builder
    /// returned from [`AsyncConnection::view()`](super::AsyncConnection::view).
    async fn query_by_name(
        &self,
        view: &ViewName,
        key: Option<SerializedQueryKey>,
        order: Sort,
        limit: Option<u32>,
        access_policy: AccessPolicy,
    ) -> Result<Vec<schema::view::map::Serialized>, Error>;

            
    /// Queries for view entries from the named `view` with their source
    /// documents.
    ///
    /// This is the lower-level API. For better ergonomics, consider querying
    /// the view using [`View::entries(self).query_with_docs()`](super::AsyncView::query_with_docs) instead.
    /// The parameters for the query can be customized on the builder returned
    /// from [`AsyncConnection::view()`](super::AsyncConnection::view).
    async fn query_by_name_with_docs(
        &self,
        view: &ViewName,
        key: Option<SerializedQueryKey>,
        order: Sort,
        limit: Option<u32>,
        access_policy: AccessPolicy,
    ) -> Result<schema::view::map::MappedSerializedDocuments, Error>;

            
    /// Reduces the view entries from the named `view`.
    ///
    /// This is the lower-level API. For better ergonomics, consider querying
    /// the view using
    /// [`View::entries(self).reduce()`](super::AsyncView::reduce)
    /// instead. The parameters for the query can be customized on the builder
    /// returned from [`AsyncConnection::view()`](super::AsyncConnection::view).
    async fn reduce_by_name(
        &self,
        view: &ViewName,
        key: Option<SerializedQueryKey>,
        access_policy: AccessPolicy,
    ) -> Result<Vec<u8>, Error>;

            
    /// Reduces the view entries from the named `view`, reducing the values by each
    /// unique key.
    ///
    /// This is the lower-level API. For better ergonomics, consider querying
    /// the view using
    /// [`View::entries(self).reduce_grouped()`](super::AsyncView::reduce_grouped)
    /// instead. The parameters for the query can be customized on the builder
    /// returned from [`AsyncConnection::view()`](super::AsyncConnection::view).
    async fn reduce_grouped_by_name(
        &self,
        view: &ViewName,
        key: Option<SerializedQueryKey>,
        access_policy: AccessPolicy,
    ) -> Result<Vec<MappedSerializedValue>, Error>;

            
    /// Deletes all source documents for entries that match within the named
    /// `view`.
    ///
    /// This is the lower-level API. For better ergonomics, consider querying
    /// the view using
    /// [`View::entries(self).delete_docs()`](super::AsyncView::delete_docs)
    /// instead. The parameters for the query can be customized on the builder
    /// returned from [`AsyncConnection::view()`](super::AsyncConnection::view).
    async fn delete_docs_by_name(
        &self,
        view: &ViewName,
        key: Option<SerializedQueryKey>,
        access_policy: AccessPolicy,
    ) -> Result<u64, Error>;
}

            
/// Access to a connection's schema.
pub trait HasSchema {
    /// Returns the schema for the database.
    fn schematic(&self) -> &Schematic;
}