Lines
84.95 %
Functions
41.51 %
Branches
100 %
use std::sync::Arc;
use bonsaidb_core::{
admin::{Admin, ADMIN_DATABASE_NAME},
arc_bytes::serde::Bytes,
connection::{
Connection, Database, IdentityReference, LowLevelConnection, Range, Sort, StorageConnection,
},
document::{DocumentId, Header, OwnedDocument},
keyvalue::KeyValue,
networking::{
AlterUserPermissionGroupMembership, AlterUserRoleMembership, ApplyTransaction,
AssumeIdentity, Compact, CompactCollection, CompactKeyValueStore, Count, CreateDatabase,
CreateSubscriber, CreateUser, DeleteDatabase, DeleteDocs, DeleteUser, ExecuteKeyOperation,
Get, GetMultiple, LastTransactionId, List, ListAvailableSchemas, ListDatabases,
ListExecutedTransactions, ListHeaders, Publish, PublishToAll, Query, QueryWithDocs, Reduce,
ReduceGrouped, SubscribeTo, UnsubscribeFrom,
pubsub::{AsyncSubscriber, PubSub, Receiver, Subscriber},
schema::{CollectionName, Schematic},
};
use futures::Future;
use tokio::{
runtime::{Handle, Runtime},
sync::oneshot,
task::JoinHandle,
use crate::{Client, RemoteDatabase, RemoteSubscriber};
impl StorageConnection for Client {
type Database = RemoteDatabase;
type Authenticated = Self;
fn admin(&self) -> Self::Database {
self.database::<Admin>(ADMIN_DATABASE_NAME).unwrap()
}
fn database<DB: bonsaidb_core::schema::Schema>(
&self,
name: &str,
) -> Result<Self::Database, bonsaidb_core::Error> {
self.database::<DB>(name)
fn create_database_with_schema(
schema: bonsaidb_core::schema::SchemaName,
only_if_needed: bool,
) -> Result<(), bonsaidb_core::Error> {
self.send_api_request(&CreateDatabase {
database: Database {
name: name.to_string(),
schema,
only_if_needed,
})?;
Ok(())
fn delete_database(&self, name: &str) -> Result<(), bonsaidb_core::Error> {
self.send_api_request(&DeleteDatabase {
fn list_databases(
) -> Result<Vec<bonsaidb_core::connection::Database>, bonsaidb_core::Error> {
Ok(self.send_api_request(&ListDatabases)?)
fn list_available_schemas(
) -> Result<Vec<bonsaidb_core::schema::SchemaName>, bonsaidb_core::Error> {
Ok(self.send_api_request(&ListAvailableSchemas)?)
fn create_user(&self, username: &str) -> Result<u64, bonsaidb_core::Error> {
Ok(self.send_api_request(&CreateUser {
username: username.to_string(),
})?)
fn delete_user<'user, U: bonsaidb_core::schema::Nameable<'user, u64> + Send + Sync>(
user: U,
Ok(self.send_api_request(&DeleteUser {
user: user.name()?.into_owned(),
#[cfg(feature = "password-hashing")]
fn set_user_password<'user, U: bonsaidb_core::schema::Nameable<'user, u64> + Send + Sync>(
password: bonsaidb_core::connection::SensitiveString,
use bonsaidb_core::networking::SetUserPassword;
Ok(self.send_api_request(&SetUserPassword {
password,
fn authenticate<'user, U: bonsaidb_core::schema::Nameable<'user, u64> + Send + Sync>(
authentication: bonsaidb_core::connection::Authentication,
) -> Result<Self::Authenticated, bonsaidb_core::Error> {
let session = self.send_api_request(&bonsaidb_core::networking::Authenticate {
authentication,
Ok(Self {
data: self.data.clone(),
session: Arc::new(session),
})
fn assume_identity(
identity: IdentityReference<'_>,
let session = self.send_api_request(&AssumeIdentity(identity.into_owned()))?;
fn add_permission_group_to_user<
'user,
'group,
U: bonsaidb_core::schema::Nameable<'user, u64> + Send + Sync,
G: bonsaidb_core::schema::Nameable<'group, u64> + Send + Sync,
>(
permission_group: G,
self.send_api_request(&AlterUserPermissionGroupMembership {
group: permission_group.name()?.into_owned(),
should_be_member: true,
fn remove_permission_group_from_user<
should_be_member: false,
fn add_role_to_user<
'role,
R: bonsaidb_core::schema::Nameable<'role, u64> + Send + Sync,
role: R,
self.send_api_request(&AlterUserRoleMembership {
role: role.name()?.into_owned(),
fn remove_role_from_user<
impl Connection for RemoteDatabase {
type Storage = Client;
fn storage(&self) -> Self::Storage {
self.client.clone()
fn list_executed_transactions(
starting_id: Option<u64>,
result_limit: Option<u32>,
) -> Result<Vec<bonsaidb_core::transaction::Executed>, bonsaidb_core::Error> {
Ok(self.client.send_api_request(&ListExecutedTransactions {
database: self.name.to_string(),
starting_id,
result_limit,
fn last_transaction_id(&self) -> Result<Option<u64>, bonsaidb_core::Error> {
Ok(self.client.send_api_request(&LastTransactionId {
fn compact(&self) -> Result<(), bonsaidb_core::Error> {
self.send_api_request(&Compact {
fn compact_key_value_store(&self) -> Result<(), bonsaidb_core::Error> {
self.send_api_request(&CompactKeyValueStore {
impl LowLevelConnection for RemoteDatabase {
fn schematic(&self) -> &Schematic {
&self.schema
fn apply_transaction(
transaction: bonsaidb_core::transaction::Transaction,
) -> Result<Vec<bonsaidb_core::transaction::OperationResult>, bonsaidb_core::Error> {
Ok(self.client.send_api_request(&ApplyTransaction {
transaction,
fn get_from_collection(
id: bonsaidb_core::document::DocumentId,
collection: &CollectionName,
) -> Result<Option<OwnedDocument>, bonsaidb_core::Error> {
Ok(self.client.send_api_request(&Get {
collection: collection.clone(),
id,
fn get_multiple_from_collection(
ids: &[bonsaidb_core::document::DocumentId],
) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
Ok(self.client.send_api_request(&GetMultiple {
ids: ids.to_vec(),
fn list_from_collection(
ids: Range<bonsaidb_core::document::DocumentId>,
order: Sort,
limit: Option<u32>,
Ok(self.client.send_api_request(&List {
ids,
order,
limit,
fn list_headers_from_collection(
ids: Range<DocumentId>,
) -> Result<Vec<Header>, bonsaidb_core::Error> {
Ok(self.client.send_api_request(&ListHeaders(List {
}))?)
fn count_from_collection(
) -> Result<u64, bonsaidb_core::Error> {
Ok(self.client.send_api_request(&Count {
fn compact_collection_by_name(
collection: CollectionName,
self.send_api_request(&CompactCollection {
name: collection,
fn query_by_name(
view: &bonsaidb_core::schema::ViewName,
key: Option<bonsaidb_core::connection::QueryKey<bonsaidb_core::arc_bytes::serde::Bytes>>,
access_policy: bonsaidb_core::connection::AccessPolicy,
) -> Result<Vec<bonsaidb_core::schema::view::map::Serialized>, bonsaidb_core::Error> {
Ok(self.client.send_api_request(&Query {
view: view.clone(),
key,
access_policy,
fn query_by_name_with_docs(
) -> Result<bonsaidb_core::schema::view::map::MappedSerializedDocuments, bonsaidb_core::Error>
{
Ok(self.client.send_api_request(&QueryWithDocs(Query {
fn reduce_by_name(
) -> Result<Vec<u8>, bonsaidb_core::Error> {
Ok(self
.client
.send_api_request(&Reduce {
})?
.into_vec())
fn reduce_grouped_by_name(
) -> Result<Vec<bonsaidb_core::schema::view::map::MappedSerializedValue>, bonsaidb_core::Error>
Ok(self.client.send_api_request(&ReduceGrouped(Reduce {
fn delete_docs_by_name(
Ok(self.client.send_api_request(&DeleteDocs {
impl PubSub for RemoteDatabase {
type Subscriber = RemoteSubscriber;
fn create_subscriber(&self) -> Result<Self::Subscriber, bonsaidb_core::Error> {
let subscriber_id = self.client.send_api_request(&CreateSubscriber {
let (sender, receiver) = flume::unbounded();
self.client.register_subscriber(subscriber_id, sender);
Ok(RemoteSubscriber {
client: self.client.clone(),
database: self.name.clone(),
id: subscriber_id,
receiver: Receiver::new(receiver),
tokio: None,
fn publish_bytes(&self, topic: Vec<u8>, payload: Vec<u8>) -> Result<(), bonsaidb_core::Error> {
self.client.send_api_request(&Publish {
topic: Bytes::from(topic),
payload: Bytes::from(payload),
fn publish_bytes_to_all(
topics: impl IntoIterator<Item = Vec<u8>> + Send,
payload: Vec<u8>,
let topics = topics.into_iter().map(Bytes::from).collect();
self.client.send_api_request(&PublishToAll {
topics,
impl Subscriber for RemoteSubscriber {
fn subscribe_to_bytes(&self, topic: Vec<u8>) -> Result<(), bonsaidb_core::Error> {
self.client.send_api_request(&SubscribeTo {
database: self.database.to_string(),
subscriber_id: self.id,
fn unsubscribe_from_bytes(&self, topic: &[u8]) -> Result<(), bonsaidb_core::Error> {
self.client.send_api_request(&UnsubscribeFrom {
fn receiver(&self) -> &Receiver {
AsyncSubscriber::receiver(self)
impl KeyValue for RemoteDatabase {
fn execute_key_operation(
op: bonsaidb_core::keyvalue::KeyOperation,
) -> Result<bonsaidb_core::keyvalue::Output, bonsaidb_core::Error> {
Ok(self.client.send_api_request(&ExecuteKeyOperation {
op,
pub enum Tokio {
Runtime(Runtime),
Handle(Handle),
impl Tokio {
pub fn spawn<F: Future<Output = Result<(), crate::Error>> + Send + 'static>(
self,
task: F,
) -> JoinHandle<Result<(), crate::Error>> {
match self {
Self::Runtime(tokio) => {
// When we have an owned runtime, we must have a thread driving
// the runtime. To keep the interface to `Client` simple, we are
// going to spawn the task and let the main block_on task simply
// wait for the completion event. If the JoinHandle is
// cancelled, the sender will be dropped and everything will
// clean up.
let (completion_sender, completion_receiver) = oneshot::channel();
let task = async move {
task.await?;
let _ = completion_sender.send(());
let task = tokio.spawn(task);
std::thread::spawn(move || {
tokio.block_on(async move {
let _ = completion_receiver.await;
});
task
Self::Handle(tokio) => tokio.spawn(task),
pub fn spawn_client<F: Future<Output = Result<(), crate::Error>> + Send + 'static>(
handle: Option<Handle>,
// We need to spawn a runtime or
let tokio = if let Some(handle) = handle {
Tokio::Handle(handle)
} else {
Tokio::Runtime(Runtime::new().unwrap())
tokio.spawn(task)