Lines
84.68 %
Functions
58.98 %
Branches
100 %
use std::borrow::{Borrow, Cow};
use std::fmt::Debug;
use std::marker::PhantomData;
use std::task::Poll;
use async_trait::async_trait;
use futures::future::BoxFuture;
use futures::{ready, Future, FutureExt};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use transmog::{Format, OwnedDeserializer};
use transmog_pot::Pot;
use crate::connection::{self, AsyncConnection, Connection, RangeRef};
use crate::document::{
BorrowedDocument, CollectionDocument, CollectionHeader, Document, DocumentId, Header, KeyId,
OwnedDocument, OwnedDocuments, Revision,
};
use crate::key::{IntoPrefixRange, Key, KeyEncoding};
use crate::schema::{CollectionName, Schematic};
use crate::transaction::{Operation, OperationResult, Transaction};
use crate::Error;
/// A namespaced collection of `Document<Self>` items and views.
///
/// ## Deriving this trait
/// This trait can be derived instead of manually implemented:
/// ```rust
/// use bonsaidb_core::schema::Collection;
/// use serde::{Deserialize, Serialize};
/// #[derive(Serialize, Deserialize, Default, Collection)]
/// #[collection(name = "MyCollection")]
/// # #[collection(core = bonsaidb_core)]
/// pub struct MyCollection;
/// ```
/// If you're publishing a collection for use in multiple projects, consider
/// giving the collection an `authority`, which gives your collection a
/// namespace:
/// #[collection(name = "MyCollection", authority = "khonsulabs")]
/// The list of views can be specified using the `views` parameter:
/// use bonsaidb_core::schema::{Collection, View, ViewSchema};
/// #[collection(name = "MyCollection", views = [ScoresByRank])]
/// #[derive(Clone, View, ViewSchema)]
/// #[view(collection = MyCollection, key = u32, value = f32, name = "scores-by-rank")]
/// # #[view(core = bonsaidb_core)]
/// # #[view_schema(core = bonsaidb_core)]
/// pub struct ScoresByRank;
/// #
/// # use bonsaidb_core::{
/// # document::CollectionDocument,
/// # schema::{
/// # ReduceResult, CollectionMapReduce,
/// # ViewMapResult, ViewMappedValue,
/// # },
/// # };
/// # impl CollectionMapReduce for ScoresByRank {
/// # fn map<'doc>(
/// # &self,
/// # _document: CollectionDocument<<Self::View as View>::Collection>,
/// # ) -> ViewMapResult<'doc, Self::View> {
/// # todo!()
/// # }
/// # fn reduce(
/// # _mappings: &[ViewMappedValue<'_, Self::View>],
/// # _rereduce: bool,
/// # ) -> ReduceResult<Self::View> {
/// ### Selecting a Primary Key type
/// By default, the `#[collection]` macro will use `u64` for the
/// [`Self::PrimaryKey`] type. Collections can use any type that implements the
/// [`Key`] trait:
/// #[collection(name = "MyCollection", primary_key = u128)]
/// If the data being stored has a ["natural key"][natural-key], the field can
/// be annotated with `#[natural_id]` to use the field's contents as the primary
/// key when doing a push operation:
/// pub struct MyCollection {
/// #[natural_id]
/// pub external_id: u64,
/// }
/// Alternatively, this can be accomplished with an expression using the
/// `natural_id` attribute:
/// #[collection(name = "MyCollection", natural_id = Some(self.external_id))]
/// Primary keys are not able to be updated. To update a document's primary key,
/// the contents must be inserted at the new id and deleted from the previous
/// id.
/// [natural-key]: https://en.wikipedia.org/wiki/Natural_key
/// ### Specifying a Collection Encryption Key
/// By default, encryption will be required if an `encryption_key` is provided:
/// use bonsaidb_core::document::KeyId;
/// #[collection(name = "MyCollection", encryption_key = Some(KeyId::Master))]
/// The `encryption_required` parameter can be provided if you wish to be
/// explicit:
/// #[collection(encryption_key = Some(KeyId::Master), encryption_required)]
/// Or, if you wish your collection to be encrypted if its available, but not
/// cause errors when being stored without encryption, you can provide the
/// `encryption_optional` parameter:
/// #[collection(encryption_key = Some(KeyId::Master), encryption_optional)]
/// ### Changing the serialization strategy
/// BonsaiDb uses [`transmog`](https://github.com/khonsulabs/transmog) to allow
/// customizing serialization formats. To use one of the formats Transmog
/// already supports, add its crate to your Cargo.toml and use it like this
/// example using `transmog_bincode`:
/// #[collection(serialization = transmog_bincode::Bincode)]
/// To manually implement `SerializedCollection` you can pass `None` to
/// `serialization`:
/// #[derive(Default, Collection)]
/// #[collection(serialization = None)]
/// If the collection type implements or derives the [`Key`](crate::key::Key)
/// trait, `serialization = Key` can be passed to serialize using the [key
/// format](crate::key::KeyFormat).
pub trait Collection: Send + Sync {
/// The unique id type. Each document stored in a collection will be
/// uniquely identified by this type.
/// ## Primary Key Limits
/// The result of [`KeyEncoding::as_ord_bytes()`] must be less than or equal
/// to [`DocumentId::MAX_LENGTH`].
type PrimaryKey: for<'k> Key<'k> + Eq + Ord;
/// The unique name of this collection. Each collection must be uniquely
/// named within the [`Schema`](crate::schema::Schema) it is registered
/// within.
fn collection_name() -> CollectionName;
/// Defines all `View`s in this collection in `schema`.
fn define_views(schema: &mut Schematic) -> Result<(), Error>;
/// If a [`KeyId`] is returned, this collection will be stored encrypted
/// at-rest using the key specified.
#[must_use]
fn encryption_key() -> Option<KeyId> {
None
}
/// A collection that knows how to serialize and deserialize documents to an associated type.
/// These examples for this type use this basic collection definition:
/// use bonsaidb_core::schema::{Collection, DefaultSerialization, Schematic};
/// use bonsaidb_core::Error;
/// #[derive(Debug, Serialize, Deserialize, Default, Collection)]
/// pub rank: u32,
/// pub score: f32,
#[async_trait]
pub trait SerializedCollection: Collection {
/// The type of the contents stored in documents in this collection.
type Contents: Send + Sync;
/// The serialization format for this collection.
type Format: OwnedDeserializer<Self::Contents>;
/// Returns the natural identifier of `contents`. This is called when
/// pushing values into a collection, before attempting to automatically
/// assign a unique id.
#[allow(unused_variables)]
fn natural_id(contents: &Self::Contents) -> Option<Self::PrimaryKey>
where
Self: Sized,
{
/// Returns the configured instance of [`Self::Format`].
// TODO allow configuration to be passed here, such as max allocation bytes.
fn format() -> Self::Format;
/// Deserialize `data` as `Self::Contents` using this collection's format.
fn deserialize(data: &[u8]) -> Result<Self::Contents, Error> {
Self::format()
.deserialize_owned(data)
.map_err(|err| crate::Error::other("serialization", err))
/// Returns the deserialized contents of `doc`.
fn document_contents<D: Document<Self>>(doc: &D) -> Result<Self::Contents, Error>
doc.contents()
/// Sets the contents of `doc` to `contents`.
fn set_document_contents<D: Document<Self>>(
doc: &mut D,
contents: Self::Contents,
) -> Result<(), Error>
doc.set_contents(contents)
/// Serialize `item` using this collection's format.
fn serialize(item: &Self::Contents) -> Result<Vec<u8>, Error> {
.serialize(item)
/// Gets a [`CollectionDocument`] with `id` from `connection`.
/// # bonsaidb_core::__doctest_prelude!();
/// # use bonsaidb_core::connection::Connection;
/// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
/// if let Some(doc) = MyCollection::get(&42, &db)? {
/// println!(
/// "Retrieved revision {} with deserialized contents: {:?}",
/// doc.header.revision, doc.contents
/// );
/// # Ok(())
fn get<C, PrimaryKey>(
id: &PrimaryKey,
connection: &C,
) -> Result<Option<CollectionDocument<Self>>, Error>
C: Connection,
PrimaryKey: KeyEncoding<Self::PrimaryKey>,
let possible_doc = connection.get::<Self, _>(id)?;
possible_doc.as_ref().map(TryInto::try_into).transpose()
/// # use bonsaidb_core::connection::AsyncConnection;
/// # fn test_fn<C: AsyncConnection>(db: C) -> Result<(), Error> {
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
/// if let Some(doc) = MyCollection::get_async(&42, &db).await? {
/// # })
async fn get_async<C, PrimaryKey>(
C: AsyncConnection,
let possible_doc = connection.get::<Self, _>(id).await?;
Ok(possible_doc.as_ref().map(TryInto::try_into).transpose()?)
/// Retrieves all documents matching `ids`. Documents that are not found
/// are not returned, but no error will be generated.
/// for doc in MyCollection::get_multiple(&[42, 43], &db)? {
/// "Retrieved #{} with deserialized contents: {:?}",
/// doc.header.id, doc.contents
fn get_multiple<'id, C, DocumentIds, PrimaryKey, I>(
ids: DocumentIds,
) -> Result<Vec<CollectionDocument<Self>>, Error>
DocumentIds: IntoIterator<Item = &'id PrimaryKey, IntoIter = I> + Send + Sync,
I: Iterator<Item = &'id PrimaryKey> + Send + Sync,
PrimaryKey: KeyEncoding<Self::PrimaryKey> + 'id,
connection
.collection::<Self>()
.get_multiple(ids)
.and_then(|docs| docs.collection_documents())
/// for doc in MyCollection::get_multiple_async(&[42, 43], &db).await? {
async fn get_multiple_async<'id, C, DocumentIds, PrimaryKey, I>(
.await
/// Retrieves all documents matching the range of `ids`.
/// for doc in MyCollection::list(42.., &db)
/// .descending()
/// .limit(20)
/// .query()?
/// {
fn list<'id, R, PrimaryKey, C>(ids: R, connection: &'id C) -> List<'id, C, Self, PrimaryKey>
R: Into<RangeRef<'id, Self::PrimaryKey, PrimaryKey>>,
PrimaryKey: KeyEncoding<Self::PrimaryKey> + PartialEq + 'id,
Self::PrimaryKey: Borrow<PrimaryKey> + PartialEq<PrimaryKey>,
List(connection::List::new(
connection::MaybeOwned::Owned(connection.collection::<Self>()),
ids.into(),
))
/// for doc in MyCollection::list_async(42.., &db)
/// .await?
fn list_async<'id, R, PrimaryKey, C>(
ids: R,
connection: &'id C,
) -> AsyncList<'id, C, Self, PrimaryKey>
PrimaryKey: KeyEncoding<Self::PrimaryKey> + PartialEq + 'id + ?Sized,
AsyncList(connection::AsyncList::new(
/// Retrieves all documents with ids that start with `prefix`.
/// use bonsaidb_core::connection::Connection;
/// use bonsaidb_core::document::CollectionDocument;
/// use bonsaidb_core::schema::{Collection, Schematic, SerializedCollection};
/// #[collection(name = "MyCollection", primary_key = String)]
/// async fn starts_with_a<C: Connection>(
/// db: &C,
/// ) -> Result<Vec<CollectionDocument<MyCollection>>, Error> {
/// MyCollection::list_with_prefix("a", db).query()
fn list_with_prefix<'a, PrimaryKey, C>(
prefix: &'a PrimaryKey,
connection: &'a C,
) -> List<'a, C, Self, PrimaryKey>
PrimaryKey: IntoPrefixRange<'a, Self::PrimaryKey>
+ KeyEncoding<Self::PrimaryKey>
+ PartialEq
+ ?Sized,
prefix.to_prefix_range(),
/// use bonsaidb_core::connection::AsyncConnection;
/// async fn starts_with_a<C: AsyncConnection>(
/// MyCollection::list_with_prefix_async("a", db).await
fn list_with_prefix_async<'a, PrimaryKey, C>(
) -> AsyncList<'a, C, Self, PrimaryKey>
/// Retrieves all documents.
/// for doc in MyCollection::all(&db).query()? {
fn all<C: Connection>(connection: &C) -> List<'_, C, Self, Self::PrimaryKey>
RangeRef::from(..),
/// for doc in MyCollection::all_async(&db).await? {
fn all_async<C: AsyncConnection>(connection: &C) -> AsyncList<'_, C, Self, Self::PrimaryKey>
/// Pushes this value into the collection, returning the created document.
/// This function is useful when `Self != Self::Contents`.
/// ## Automatic ID Assignment
/// This function calls [`Self::natural_id()`] to try to retrieve a primary
/// key value from `contents`. If an id is returned, the item is inserted
/// with that id. If an id is not returned, an id will be automatically
/// assigned, if possible, by the storage backend, which uses the [`Key`]
/// trait to assign ids.
/// let document = MyCollection::push(MyCollection::default(), &db)?;
/// "Inserted {:?} with id {} with revision {}",
/// document.contents, document.header.id, document.header.revision
fn push<Cn: Connection>(
connection: &Cn,
) -> Result<CollectionDocument<Self>, InsertError<Self::Contents>>
Self: Sized + 'static,
let header = match connection.collection::<Self>().push(&contents) {
Ok(header) => header,
Err(error) => return Err(InsertError { contents, error }),
Ok(CollectionDocument { header, contents })
/// let document = MyCollection::push_async(MyCollection::default(), &db).await?;
async fn push_async<Cn: AsyncConnection>(
Self::Contents: 'async_trait,
let header = match connection.collection::<Self>().push(&contents).await {
/// Pushes all `contents` in a single transaction. If successful, all
/// collection documents will be returned. If an error occurs during this
/// operation, no documents will be pushed.
/// key value from each instance of `contents`. If an id is returned, the
/// item is inserted with that id. If an id is not returned, an id will be
/// automatically assigned, if possible, by the storage backend, which uses
/// the [`Key`] trait to assign ids.
/// let documents = MyCollection::push_all(
/// [
/// MyCollection::default(),
/// ],
/// &db,
/// )?;
/// for document in documents {
fn push_all<Contents: IntoIterator<Item = Self::Contents>, Cn: Connection>(
contents: Contents,
Self::PrimaryKey: Default,
let mut tx = Transaction::new();
let contents = contents.into_iter();
let mut results = Vec::with_capacity(contents.size_hint().0);
for contents in contents {
tx.push(Operation::push_serialized::<Self>(&contents)?);
results.push(CollectionDocument {
header: CollectionHeader {
id: <<Self as Collection>::PrimaryKey as Default>::default(),
revision: Revision {
id: 0,
sha256: [0; 32],
},
contents,
});
for (result, document) in tx.apply(connection)?.into_iter().zip(&mut results) {
match result {
OperationResult::DocumentUpdated { header, .. } => {
document.header = CollectionHeader::try_from(header)?;
_ => unreachable!("invalid result from transaction"),
Ok(results)
/// let documents = MyCollection::push_all_async(
/// )
/// .await?;
/// # })}
async fn push_all_async<
Contents: IntoIterator<Item = Self::Contents> + Send,
Cn: AsyncConnection,
>(
Contents::IntoIter: Send,
for (result, document) in tx
.apply_async(connection)
.await?
.into_iter()
.zip(&mut results)
/// key value from `self`. If an id is returned, the item is inserted with
/// that id. If an id is not returned, an id will be automatically assigned,
/// if possible, by the storage backend, which uses the [`Key`] trait to
/// assign ids.
/// let document = MyCollection::default().push_into(&db)?;
fn push_into<Cn: Connection>(
self,
) -> Result<CollectionDocument<Self>, InsertError<Self>>
Self: SerializedCollection<Contents = Self> + Sized + 'static,
Self::push(self, connection)
/// let document = MyCollection::default().push_into_async(&db).await?;
async fn push_into_async<Cn: AsyncConnection>(
Self::push_async(self, connection).await
/// Pushes an insert [`Operation`] without a key to the transaction for this
/// document, allowing the database to generate the primary key for the
/// document.
/// The document will be inserted once the transaction is applied.
fn push_in_transaction(&self, transaction: &mut Transaction) -> Result<(), Error>
transaction.push(Operation::push_serialized::<Self>(self)?);
Ok(())
/// Inserts this value into the collection with the specified id, returning
/// the created document.
/// let document = MyCollection::insert(&42, MyCollection::default(), &db)?;
/// assert_eq!(document.header.id, 42);
/// "Inserted {:?} with revision {}",
/// document.contents, document.header.revision
fn insert<PrimaryKey, Cn>(
Cn: Connection,
let header = match connection.collection::<Self>().insert(id, &contents) {
/// let document = MyCollection::insert_async(&42, MyCollection::default(), &db).await?;
async fn insert_async<PrimaryKey, Cn>(
let header = match connection.collection::<Self>().insert(id, &contents).await {
/// Inserts this value into the collection with the given `id`, returning
/// let document = MyCollection::default().insert_into(&42, &db)?;
fn insert_into<PrimaryKey, Cn>(
Self::insert(id, self, connection)
/// let document = MyCollection::default().insert_into_async(&42, &db).await?;
async fn insert_into_async<PrimaryKey, Cn>(
Self::insert_async(id, self, connection).await
/// Pushes an insert [`Operation`] to the transaction for this document.
fn insert_in_transaction(
&self,
key: &Self::PrimaryKey,
transaction: &mut Transaction,
transaction.push(Operation::insert_serialized::<Self>(Some(key), self)?);
/// Overwrites this value into the collection with the specified id, returning
/// the created or updated document.
/// let document = MyCollection::overwrite(&42, MyCollection::default(), &db)?;
/// "Overwrote {:?} with revision {}",
fn overwrite<PrimaryKey, Cn>(
let header = match Self::serialize(&contents) {
Ok(serialized) => match connection.overwrite::<Self, _>(id, serialized) {
/// let document = MyCollection::overwrite_async(&42, MyCollection::default(), &db).await?;
async fn overwrite_async<PrimaryKey, Cn>(
Ok(serialized) => match connection.overwrite::<Self, _>(id, serialized).await {
/// Overwrites this value into the collection with the given `id`, returning
/// let document = MyCollection::default().overwrite_into(&42, &db)?;
fn overwrite_into<Cn: Connection, PrimaryKey>(
Self::overwrite(id, self, connection)
/// Pushes an overwrite [`Operation`] to the transaction for this document.
/// The document will be overwritten once the transaction is applied.
fn overwrite_in_transaction<PrimaryKey>(
transaction.push(Operation::overwrite_serialized::<Self, PrimaryKey>(
id, self,
)?);
/// let document = MyCollection::default()
/// .overwrite_into_async(&42, &db)
async fn overwrite_into_async<Cn: AsyncConnection, PrimaryKey>(
Self::overwrite_async(id, self, connection).await
/// A convenience trait for easily storing Serde-compatible types in documents.
pub trait DefaultSerialization: Collection {
fn natural_id(&self) -> Option<Self::PrimaryKey> {
impl<T> SerializedCollection for T
T: DefaultSerialization + Serialize + DeserializeOwned,
type Contents = Self;
type Format = Pot;
fn format() -> Self::Format {
Pot::default()
fn natural_id(contents: &Self::Contents) -> Option<Self::PrimaryKey> {
T::natural_id(contents)
/// An error from inserting a [`CollectionDocument`].
#[derive(thiserror::Error, Debug)]
#[error("{error}")]
pub struct InsertError<T> {
/// The original value being inserted.
pub contents: T,
/// The error that occurred while inserting.
pub error: Error,
/// A collection with a unique name column.
/// ## Finding a document by unique name
/// if let Some(doc) = MyCollection::load("unique name", &db)? {
/// Load accepts either a string or a [`DocumentId`]. This enables building
/// methods that accept either the unique ID or the unique name:
/// if let Some(doc) = MyCollection::load(42, &db)? {
/// ## Executing an insert or update
/// let upserted = MyCollection::entry("unique name", &db)
/// .update_with(|existing: &mut MyCollection| {
/// existing.rank += 1;
/// })
/// .or_insert_with(MyCollection::default)
/// .execute()?
/// .unwrap();
/// println!("Rank: {:?}", upserted.contents.rank);
pub trait NamedCollection: Collection + Unpin {
/// The name view defined for the collection.
type ByNameView: crate::schema::SerializedView<Key = String, Collection = Self>;
fn load<'name, N: Nameable<'name, Self::PrimaryKey> + Send + Sync, C: Connection>(
id: N,
Self: SerializedCollection + Sized + 'static,
let possible_doc = Self::load_document(id, connection)?;
possible_doc
.as_ref()
.map(CollectionDocument::try_from)
.transpose()
async fn load_async<
'name,
N: Nameable<'name, Self::PrimaryKey> + Send + Sync,
let possible_doc = Self::load_document_async(id, connection).await?;
Ok(possible_doc
.transpose()?)
fn entry<
'connection,
N: Into<NamedReference<'name, Self::PrimaryKey>> + Send + Sync,
connection: &'connection C,
) -> Entry<'connection, 'name, C, Self, (), ()>
Self: SerializedCollection + Sized,
let name = id.into();
Entry {
name,
connection,
insert: None,
update: None,
retry_limit: 0,
_collection: PhantomData,
fn entry_async<
) -> AsyncEntry<'connection, 'name, C, Self, (), ()>
AsyncEntry {
state: EntryState::Pending(Some(EntryBuilder {
})),
/// Loads a document from this collection by name, if applicable. Return
/// `Ok(None)` if unsupported.
fn load_document<'name, N: Nameable<'name, Self::PrimaryKey> + Send + Sync, C: Connection>(
name: N,
) -> Result<Option<OwnedDocument>, Error>
match name.name()? {
NamedReference::Id(id) => connection.collection::<Self>().get(&id),
NamedReference::Key(id) => connection.collection::<Self>().get(&id),
NamedReference::Name(name) => Ok(connection
.view::<Self::ByNameView>()
.with_key(name.as_ref())
.query_with_docs()?
.documents
.next()
.map(|(_, document)| document)),
async fn load_document_async<
NamedReference::Id(id) => connection.collection::<Self>().get(&id).await,
NamedReference::Key(id) => connection.collection::<Self>().get(&id).await,
.query_with_docs()
/// Deletes a document by its name. Returns true if a document was deleted.
fn delete_by_name<C: Connection>(name: &str, connection: &C) -> Result<bool, Error>
Ok(connection
.with_key(name)
.delete_docs()?
> 0)
async fn delete_by_name_async<C: AsyncConnection>(
name: &str,
) -> Result<bool, Error>
.delete_docs()
/// A reference to a collection that has a unique name view.
#[derive(Clone, Eq, PartialEq, Deserialize, Serialize, Debug)]
pub enum NamedReference<'a, Id> {
/// An entity's name.
Name(Cow<'a, str>),
/// A document id.
Id(DocumentId),
Key(Id),
impl<'a, Id> From<&'a str> for NamedReference<'a, Id> {
fn from(name: &'a str) -> Self {
Self::Name(Cow::Borrowed(name))
/// A type that can be used as a unique reference for a collection that
/// implements [`NamedCollection`].
pub trait Nameable<'a, Id> {
/// Returns this name as a [`NamedReference`].
fn name(self) -> Result<NamedReference<'a, Id>, crate::Error>;
impl<'a, Id> Nameable<'a, Id> for NamedReference<'a, Id> {
fn name(self) -> Result<NamedReference<'a, Id>, crate::Error> {
Ok(self)
impl<'a, Id> Nameable<'a, Id> for &'a NamedReference<'a, Id>
Id: Clone,
Ok(match self {
NamedReference::Name(name) => NamedReference::Name(name.clone()),
NamedReference::Id(id) => NamedReference::Id(id.clone()),
NamedReference::Key(key) => NamedReference::Key(key.clone()),
})
impl<'a, Id> Nameable<'a, Id> for &'a str {
Ok(NamedReference::from(self))
impl<'a, Id> From<&'a String> for NamedReference<'a, Id> {
fn from(name: &'a String) -> Self {
Self::Name(Cow::Borrowed(name.as_str()))
impl<'a, Id> Nameable<'a, Id> for &'a String {
impl<'a, 'b, Id> From<&'b BorrowedDocument<'b>> for NamedReference<'a, Id> {
fn from(doc: &'b BorrowedDocument<'b>) -> Self {
Self::Id(doc.header.id.clone())
impl<'a, 'b, Id> Nameable<'a, Id> for &'a BorrowedDocument<'b> {
impl<'a, 'c, C> TryFrom<&'c CollectionDocument<C>> for NamedReference<'a, C::PrimaryKey>
C: SerializedCollection,
type Error = crate::Error;
fn try_from(doc: &'c CollectionDocument<C>) -> Result<Self, crate::Error> {
DocumentId::new(&doc.header.id).map(Self::Id)
impl<'a, C> Nameable<'a, C::PrimaryKey> for &'a CollectionDocument<C>
fn name(self) -> Result<NamedReference<'a, C::PrimaryKey>, crate::Error> {
NamedReference::try_from(self)
impl<'a, Id> From<String> for NamedReference<'a, Id> {
fn from(name: String) -> Self {
Self::Name(Cow::Owned(name))
impl<'a, Id> Nameable<'a, Id> for String {
impl<'a, Id> From<DocumentId> for NamedReference<'a, Id> {
fn from(id: DocumentId) -> Self {
Self::Id(id)
impl<'a, Id> Nameable<'a, Id> for DocumentId {
impl<'a> Nameable<'a, Self> for u64 {
fn name(self) -> Result<NamedReference<'a, Self>, crate::Error> {
Ok(NamedReference::Key(self))
impl<'a, Id> NamedReference<'a, Id>
Id: for<'k> Key<'k>,
/// Converts this reference to an owned reference with a `'static` lifetime.
pub fn into_owned(self) -> NamedReference<'static, Id> {
match self {
Self::Name(name) => NamedReference::Name(match name {
Cow::Owned(string) => Cow::Owned(string),
Cow::Borrowed(borrowed) => Cow::Owned(borrowed.to_owned()),
}),
Self::Id(id) => NamedReference::Id(id),
Self::Key(key) => NamedReference::Key(key),
/// Returns this reference's id. If the reference is a name, the
/// [`NamedCollection::ByNameView`] is queried for the id.
pub fn id<Col: NamedCollection<PrimaryKey = Id>, Cn: Connection>(
) -> Result<Option<Col::PrimaryKey>, Error> {
Self::Name(name) => connection
.view::<Col::ByNameView>()
.query()?
.map(|e| Ok(e.source.id))
.transpose(),
Self::Id(id) => Ok(Some(id.deserialize()?)),
Self::Key(id) => Ok(Some(id.clone())),
pub async fn id_async<Col: NamedCollection<PrimaryKey = Id>, Cn: AsyncConnection>(
.query()
/// A future that resolves to an entry in a [`NamedCollection`].
pub struct Entry<'a, 'name, Connection, Col, EI, EU>
Col: NamedCollection + SerializedCollection,
EI: EntryInsert<Col>,
EU: EntryUpdate<Col>,
name: NamedReference<'name, Col::PrimaryKey>,
connection: &'a Connection,
insert: Option<EI>,
update: Option<EU>,
retry_limit: usize,
_collection: PhantomData<Col>,
impl<'a, 'name, Connection, Col, EI, EU> Entry<'a, 'name, Connection, Col, EI, EU>
Col: NamedCollection + SerializedCollection + 'static + Unpin,
Connection: crate::connection::Connection,
EI: EntryInsert<Col> + 'a + Unpin,
EU: EntryUpdate<Col> + 'a + Unpin,
'name: 'a,
/// Retrieves the document, if found/inserted.
pub fn execute(self) -> Result<Option<CollectionDocument<Col>>, Error> {
let Self {
insert,
update,
mut retry_limit,
..
} = self;
if let Some(mut existing) = Col::load(name, connection)? {
if let Some(mut update) = update {
loop {
update.update(&mut existing.contents);
match existing.update(connection) {
Ok(()) => return Ok(Some(existing)),
Err(Error::DocumentConflict(collection, header)) => {
// Another client has updated the document underneath us.
if retry_limit > 0 {
retry_limit -= 1;
existing = match Col::load(header.id, connection)? {
Some(doc) => doc,
// Another client deleted the document before we could reload it.
None => break Ok(None),
} else {
break Err(Error::DocumentConflict(collection, header));
Err(other) => break Err(other),
Ok(Some(existing))
} else if let Some(insert) = insert {
let new_document = insert.call();
Ok(Some(Col::push(new_document, connection)?))
Ok(None)
/// If an entry with the key doesn't exist, `cb` will be executed to provide
/// an initial document. This document will be saved before being returned.
#[allow(clippy::missing_const_for_fn)] // false positive, destructors
pub fn or_insert_with<F: EntryInsert<Col> + 'a + Unpin>(
cb: F,
) -> Entry<'a, 'name, Connection, Col, F, EU> {
name: self.name,
connection: self.connection,
insert: Some(cb),
update: self.update,
retry_limit: self.retry_limit,
/// If an entry with the keys exists, `cb` will be executed with the stored
/// value, allowing an opportunity to update the value. This new value will
/// be saved to the database before returning. If an error occurs during
/// update, `cb` may be invoked multiple times, up to the
/// [`retry_limit`](Self::retry_limit()).
pub fn update_with<F: EntryUpdate<Col> + 'a + Unpin>(
) -> Entry<'a, 'name, Connection, Col, EI, F> {
update: Some(cb),
insert: self.insert,
/// The number of attempts to attempt updating the document using
/// `update_with` before returning an error.
pub const fn retry_limit(mut self, attempts: usize) -> Self {
self.retry_limit = attempts;
self
pub struct AsyncEntry<'a, 'name, Connection, Col, EI, EU>
state: EntryState<'a, 'name, Connection, Col, EI, EU>,
struct EntryBuilder<
'a,
Connection,
Col,
EI: EntryInsert<Col> + 'a,
EU: EntryUpdate<Col> + 'a,
> where
Col: SerializedCollection,
impl<'a, 'name, Connection, Col, EI, EU> AsyncEntry<'a, 'name, Connection, Col, EI, EU>
Connection: crate::connection::AsyncConnection,
async fn execute(
mut retry_limit: usize,
) -> Result<Option<CollectionDocument<Col>>, Error> {
if let Some(mut existing) = Col::load_async(name, connection).await? {
match existing.update_async(connection).await {
existing = match Col::load_async(header.id, connection).await? {
Ok(Some(Col::push_async(new_document, connection).await?))
fn pending(&mut self) -> &mut EntryBuilder<'a, 'name, Connection, Col, EI, EU> {
match &mut self.state {
EntryState::Pending(pending) => pending.as_mut().unwrap(),
EntryState::Executing(_) => unreachable!(),
) -> AsyncEntry<'a, 'name, Connection, Col, F, EU> {
state: match self.state {
EntryState::Pending(Some(EntryBuilder {
retry_limit,
})) => EntryState::Pending(Some(EntryBuilder {
_ => {
unreachable!("attempting to modify an already executing future")
) -> AsyncEntry<'a, 'name, Connection, Col, EI, F> {
pub fn retry_limit(mut self, attempts: usize) -> Self {
self.pending().retry_limit = attempts;
/// A function that is invoked when inserting a document using the entry api.
pub trait EntryInsert<Col: SerializedCollection>: Send + Unpin {
/// Returns the contents of the new document.
fn call(self) -> Col::Contents;
impl<F, Col> EntryInsert<Col> for F
F: FnOnce() -> Col::Contents + Send + Unpin,
fn call(self) -> Col::Contents {
self()
impl<Col> EntryInsert<Col> for ()
unreachable!()
/// A function that is invoked when updating a document using the entry api.
pub trait EntryUpdate<Col>: Send + Unpin
/// Updates `doc` with modifications to perform before returning the
fn update(&mut self, doc: &mut Col::Contents);
impl<F, Col> EntryUpdate<Col> for F
F: FnMut(&mut Col::Contents) + Send + Unpin,
fn update(&mut self, doc: &mut Col::Contents) {
self(doc);
impl<Col> EntryUpdate<Col> for ()
fn update(&mut self, _doc: &mut Col::Contents) {
unreachable!();
impl<'a, 'name, Conn, Col, EI, EU> Future for AsyncEntry<'a, 'name, Conn, Col, EI, EU>
Col: NamedCollection + SerializedCollection + 'static,
<Col as Collection>::PrimaryKey: Unpin,
Conn: AsyncConnection,
type Output = Result<Option<CollectionDocument<Col>>, Error>;
fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Self::Output> {
if let Some(EntryBuilder {
}) = match &mut self.state {
EntryState::Executing(_) => None,
EntryState::Pending(builder) => builder.take(),
} {
let future = Self::execute(name, connection, insert, update, retry_limit).boxed();
self.state = EntryState::Executing(future);
if let EntryState::Executing(future) = &mut self.state {
future.as_mut().poll(cx)
enum EntryState<'a, 'name, Connection, Col, EI, EU>
Pending(Option<EntryBuilder<'a, 'name, Connection, Col, EI, EU>>),
Executing(BoxFuture<'a, Result<Option<CollectionDocument<Col>>, Error>>),
/// Retrieves a list of documents from a collection. This
/// structure also offers functions to customize the options for the operation.
pub struct List<'a, Cn, Cl, PrimaryKey>(connection::List<'a, Cn, Cl, PrimaryKey>)
Cl: Collection,
PrimaryKey: KeyEncoding<Cl::PrimaryKey> + PartialEq + ?Sized,
Cl::PrimaryKey: Borrow<PrimaryKey> + PartialEq<PrimaryKey>;
impl<'a, Cn, Cl, PrimaryKey> List<'a, Cn, Cl, PrimaryKey>
Cl: SerializedCollection,
PrimaryKey: KeyEncoding<Cl::PrimaryKey> + PartialEq + ?Sized + 'a,
Cl::PrimaryKey: Borrow<PrimaryKey> + PartialEq<PrimaryKey>,
/// Lists documents by id in ascending order.
pub fn ascending(mut self) -> Self {
self.0 = self.0.ascending();
/// Lists documents by id in descending order.
pub fn descending(mut self) -> Self {
self.0 = self.0.descending();
/// Sets the maximum number of results to return.
pub fn limit(mut self, maximum_results: u32) -> Self {
self.0 = self.0.limit(maximum_results);
/// Returns the list of document headers contained within the range.
/// # fn test_fn<C: Connection>(db: &C) -> Result<(), Error> {
/// "Headers with id 42 or larger: {:?}",
/// MyCollection::list(42.., db).headers()?
/// "Headers in MyCollection: {:?}",
/// MyCollection::all(db).headers()?
pub fn headers(self) -> Result<Vec<Header>, Error> {
self.0.headers()
/// Returns the number of documents contained within the range.
/// Order and limit are ignored if they were set.
/// "Number of documents with id 42 or larger: {}",
/// MyCollection::list(42.., db).count()?
/// "Number of documents in MyCollection: {}",
/// MyCollection::all(db).count()?
pub fn count(self) -> Result<u64, Error> {
self.0.count()
/// Retrieves the list of documents, using the configured options.
pub fn query(self) -> Result<Vec<CollectionDocument<Cl>>, Error> {
self.0.query().and_then(|docs| docs.collection_documents())
/// Retrieves a list of documents from a collection, when awaited. This
pub struct AsyncList<'a, Cn, Cl, PrimaryKey>(connection::AsyncList<'a, Cn, Cl, PrimaryKey>)
impl<'a, Cn, Cl, PrimaryKey> AsyncList<'a, Cn, Cl, PrimaryKey>
/// # fn test_fn<C: AsyncConnection>(db: &C) -> Result<(), Error> {
/// MyCollection::list_async(42.., db).count().await?
/// MyCollection::all_async(db).count().await?
pub async fn count(self) -> Result<u64, Error> {
self.0.count().await
/// MyCollection::list_async(42.., db).headers().await?
/// MyCollection::all_async(db).headers().await?
pub async fn headers(self) -> Result<Vec<Header>, Error> {
self.0.headers().await
#[allow(clippy::type_repetition_in_bounds)]
impl<'a, Cn, Cl, PrimaryKey> Future for AsyncList<'a, Cn, Cl, PrimaryKey>
Cl: SerializedCollection + Unpin,
PrimaryKey: KeyEncoding<Cl::PrimaryKey> + PartialEq + Unpin + ?Sized + 'a,
Cl::PrimaryKey: Borrow<PrimaryKey> + PartialEq<PrimaryKey> + Unpin,
type Output = Result<Vec<CollectionDocument<Cl>>, Error>;
let result = ready!(self.0.poll_unpin(cx));
Poll::Ready(result.and_then(|docs| docs.collection_documents()))