Lines
92.01 %
Functions
13.22 %
Branches
100 %
use bonsaidb_core::api::ApiName;
use bonsaidb_core::arc_bytes::serde::Bytes;
use bonsaidb_core::async_trait::async_trait;
use bonsaidb_core::connection::{
AsyncConnection, AsyncLowLevelConnection, AsyncStorageConnection, HasSession,
};
use bonsaidb_core::keyvalue::AsyncKeyValue;
use bonsaidb_core::networking::{
AlterUserPermissionGroupMembership, AlterUserRoleMembership, ApplyTransaction, AssumeIdentity,
Compact, CompactCollection, CompactKeyValueStore, Count, CreateDatabase, CreateSubscriber,
CreateUser, DeleteDatabase, DeleteDocs, DeleteUser, ExecuteKeyOperation, Get, GetMultiple,
LastTransactionId, List, ListAvailableSchemas, ListDatabases, ListExecutedTransactions,
ListHeaders, LogOutSession, Publish, PublishToAll, Query, QueryWithDocs, Reduce, ReduceGrouped,
SubscribeTo, UnregisterSubscriber, UnsubscribeFrom,
#[cfg(feature = "password-hashing")]
use bonsaidb_core::networking::{Authenticate, SetUserPassword};
use bonsaidb_core::pubsub::AsyncPubSub;
use crate::api::{Handler, HandlerError, HandlerResult, HandlerSession};
use crate::{Backend, Error, ServerConfiguration};
#[cfg_attr(not(feature = "password-hashing"), allow(unused_mut))]
pub fn register_api_handlers<B: Backend>(
config: ServerConfiguration<B>,
) -> Result<ServerConfiguration<B>, Error> {
let mut config = config
.with_api::<ServerDispatcher, AlterUserPermissionGroupMembership>()?
.with_api::<ServerDispatcher, AlterUserRoleMembership>()?
.with_api::<ServerDispatcher, ApplyTransaction>()?
.with_api::<ServerDispatcher, AssumeIdentity>()?
.with_api::<ServerDispatcher, Compact>()?
.with_api::<ServerDispatcher, CompactCollection>()?
.with_api::<ServerDispatcher, CompactKeyValueStore>()?
.with_api::<ServerDispatcher, Count>()?
.with_api::<ServerDispatcher, CreateDatabase>()?
.with_api::<ServerDispatcher, CreateSubscriber>()?
.with_api::<ServerDispatcher, CreateUser>()?
.with_api::<ServerDispatcher, DeleteDatabase>()?
.with_api::<ServerDispatcher, DeleteDocs>()?
.with_api::<ServerDispatcher, DeleteUser>()?
.with_api::<ServerDispatcher, ExecuteKeyOperation>()?
.with_api::<ServerDispatcher, Get>()?
.with_api::<ServerDispatcher, GetMultiple>()?
.with_api::<ServerDispatcher, LastTransactionId>()?
.with_api::<ServerDispatcher, List>()?
.with_api::<ServerDispatcher, ListHeaders>()?
.with_api::<ServerDispatcher, ListAvailableSchemas>()?
.with_api::<ServerDispatcher, ListDatabases>()?
.with_api::<ServerDispatcher, ListExecutedTransactions>()?
.with_api::<ServerDispatcher, LogOutSession>()?
.with_api::<ServerDispatcher, Publish>()?
.with_api::<ServerDispatcher, PublishToAll>()?
.with_api::<ServerDispatcher, Query>()?
.with_api::<ServerDispatcher, QueryWithDocs>()?
.with_api::<ServerDispatcher, Reduce>()?
.with_api::<ServerDispatcher, ReduceGrouped>()?
.with_api::<ServerDispatcher, SubscribeTo>()?
.with_api::<ServerDispatcher, UnregisterSubscriber>()?
.with_api::<ServerDispatcher, UnsubscribeFrom>()?;
{
config = config
.with_api::<ServerDispatcher, Authenticate>()?
.with_api::<ServerDispatcher, SetUserPassword>()?;
}
Ok(config)
#[derive(Debug)]
pub struct ServerDispatcher;
impl ServerDispatcher {
pub async fn dispatch_api_request<B: Backend>(
session: HandlerSession<'_, B>,
name: &ApiName,
request: Bytes,
) -> Result<Bytes, Error> {
if let Some(dispatcher) = session.server.custom_api_dispatcher(name) {
dispatcher.handle(session, &request).await
} else {
Err(Error::from(bonsaidb_core::Error::ApiNotFound(name.clone())))
#[async_trait]
impl<B: Backend> Handler<CreateDatabase, B> for ServerDispatcher {
async fn handle(
request: CreateDatabase,
) -> HandlerResult<CreateDatabase> {
session
.as_client
.create_database_with_schema(
&request.database.name,
request.database.schema,
request.only_if_needed,
)
.await?;
Ok(())
impl<B: Backend> Handler<DeleteDatabase, B> for ServerDispatcher {
command: DeleteDatabase,
) -> HandlerResult<DeleteDatabase> {
session.as_client.delete_database(&command.name).await?;
impl<B: Backend> Handler<ListDatabases, B> for ServerDispatcher {
_command: ListDatabases,
) -> HandlerResult<ListDatabases> {
.list_databases()
.await
.map_err(HandlerError::from)
impl<B: Backend> Handler<ListAvailableSchemas, B> for ServerDispatcher {
_command: ListAvailableSchemas,
) -> HandlerResult<ListAvailableSchemas> {
.list_available_schemas()
impl<B: Backend> Handler<CreateUser, B> for ServerDispatcher {
command: CreateUser,
) -> HandlerResult<CreateUser> {
.create_user(&command.username)
impl<B: Backend> Handler<DeleteUser, B> for ServerDispatcher {
command: DeleteUser,
) -> HandlerResult<DeleteUser> {
.delete_user(command.user)
impl<B: Backend> Handler<SetUserPassword, B> for ServerDispatcher {
command: SetUserPassword,
) -> HandlerResult<SetUserPassword> {
.set_user_password(command.user, command.password)
impl<B: Backend> Handler<Authenticate, B> for ServerDispatcher {
command: Authenticate,
) -> HandlerResult<Authenticate> {
let authenticated = session
.authenticate(command.authentication)
let new_session = authenticated.session().cloned().unwrap();
session.client.logged_in_as(new_session.clone());
if let Err(err) = session
.server
.backend()
.client_authenticated(session.client.clone(), &new_session, session.server)
log::error!("[server] Error in `client_authenticated`: {err:?}");
Ok(new_session)
impl<B: Backend> Handler<AssumeIdentity, B> for ServerDispatcher {
command: AssumeIdentity,
) -> HandlerResult<AssumeIdentity> {
let authenticated = session.as_client.assume_identity(command.0).await?;
impl<B: Backend> Handler<LogOutSession, B> for ServerDispatcher {
command: LogOutSession,
) -> HandlerResult<LogOutSession> {
if let Some(logged_out) = session.client.log_out(command.0) {
.client_session_ended(logged_out, session.client, false, session.server)
log::error!("[server] Error in `client_session_ended`: {err:?}");
impl<B: Backend> Handler<AlterUserPermissionGroupMembership, B> for ServerDispatcher {
command: AlterUserPermissionGroupMembership,
) -> HandlerResult<AlterUserPermissionGroupMembership> {
if command.should_be_member {
.add_permission_group_to_user(command.user, command.group)
.remove_permission_group_from_user(command.user, command.group)
impl<B: Backend> Handler<AlterUserRoleMembership, B> for ServerDispatcher {
command: AlterUserRoleMembership,
) -> HandlerResult<AlterUserRoleMembership> {
.add_role_to_user(command.user, command.role)
.remove_role_from_user(command.user, command.role)
impl<B: Backend> Handler<Get, B> for ServerDispatcher {
async fn handle(session: HandlerSession<'_, B>, command: Get) -> HandlerResult<Get> {
let database = session
.database_without_schema(&command.database)
database
.get_from_collection(command.id, &command.collection)
impl<B: Backend> Handler<GetMultiple, B> for ServerDispatcher {
command: GetMultiple,
) -> HandlerResult<GetMultiple> {
.get_multiple_from_collection(&command.ids, &command.collection)
impl<B: Backend> Handler<List, B> for ServerDispatcher {
async fn handle(session: HandlerSession<'_, B>, command: List) -> HandlerResult<List> {
.list_from_collection(
command.ids,
command.order,
command.limit,
&command.collection,
impl<B: Backend> Handler<ListHeaders, B> for ServerDispatcher {
command: ListHeaders,
) -> HandlerResult<ListHeaders> {
.database_without_schema(&command.0.database)
.list_headers_from_collection(
command.0.ids,
command.0.order,
command.0.limit,
&command.0.collection,
impl<B: Backend> Handler<Count, B> for ServerDispatcher {
async fn handle(session: HandlerSession<'_, B>, command: Count) -> HandlerResult<Count> {
.count_from_collection(command.ids, &command.collection)
impl<B: Backend> Handler<Query, B> for ServerDispatcher {
async fn handle(session: HandlerSession<'_, B>, command: Query) -> HandlerResult<Query> {
.query_by_name(
&command.view,
command.key,
command.access_policy,
impl<B: Backend> Handler<QueryWithDocs, B> for ServerDispatcher {
command: QueryWithDocs,
) -> HandlerResult<QueryWithDocs> {
.query_by_name_with_docs(
&command.0.view,
command.0.key,
command.0.access_policy,
impl<B: Backend> Handler<Reduce, B> for ServerDispatcher {
async fn handle(session: HandlerSession<'_, B>, command: Reduce) -> HandlerResult<Reduce> {
.reduce_by_name(&command.view, command.key, command.access_policy)
.map(Bytes::from)
impl<B: Backend> Handler<ReduceGrouped, B> for ServerDispatcher {
command: ReduceGrouped,
) -> HandlerResult<ReduceGrouped> {
.reduce_grouped_by_name(&command.0.view, command.0.key, command.0.access_policy)
impl<B: Backend> Handler<ApplyTransaction, B> for ServerDispatcher {
command: ApplyTransaction,
) -> HandlerResult<ApplyTransaction> {
.apply_transaction(command.transaction)
impl<B: Backend> Handler<DeleteDocs, B> for ServerDispatcher {
command: DeleteDocs,
) -> HandlerResult<DeleteDocs> {
.delete_docs_by_name(&command.view, command.key, command.access_policy)
impl<B: Backend> Handler<ListExecutedTransactions, B> for ServerDispatcher {
command: ListExecutedTransactions,
) -> HandlerResult<ListExecutedTransactions> {
.list_executed_transactions(command.starting_id, command.result_limit)
impl<B: Backend> Handler<LastTransactionId, B> for ServerDispatcher {
command: LastTransactionId,
) -> HandlerResult<LastTransactionId> {
.last_transaction_id()
impl<B: Backend> Handler<CreateSubscriber, B> for ServerDispatcher {
command: CreateSubscriber,
) -> HandlerResult<CreateSubscriber> {
let subscriber = database.create_subscriber().await?;
let subscriber_id = subscriber.id();
session.client.register_subscriber(
subscriber,
session.as_client.session().and_then(|session| session.id),
);
Ok(subscriber_id)
impl<B: Backend> Handler<Publish, B> for ServerDispatcher {
async fn handle(session: HandlerSession<'_, B>, command: Publish) -> HandlerResult<Publish> {
.publish_bytes(command.topic.into_vec(), command.payload.into_vec())
impl<B: Backend> Handler<PublishToAll, B> for ServerDispatcher {
command: PublishToAll,
) -> HandlerResult<PublishToAll> {
.publish_bytes_to_all(
command.topics.into_iter().map(Bytes::into_vec),
command.payload.into_vec(),
impl<B: Backend> Handler<SubscribeTo, B> for ServerDispatcher {
command: SubscribeTo,
) -> HandlerResult<SubscribeTo> {
.client
.subscribe_by_id(
command.subscriber_id,
command.topic,
impl<B: Backend> Handler<UnsubscribeFrom, B> for ServerDispatcher {
command: UnsubscribeFrom,
) -> HandlerResult<UnsubscribeFrom> {
.unsubscribe_by_id(
&command.topic,
impl<B: Backend> Handler<UnregisterSubscriber, B> for ServerDispatcher {
command: UnregisterSubscriber,
) -> HandlerResult<UnregisterSubscriber> {
.unregister_subscriber_by_id(
impl<B: Backend> Handler<ExecuteKeyOperation, B> for ServerDispatcher {
command: ExecuteKeyOperation,
) -> HandlerResult<ExecuteKeyOperation> {
.execute_key_operation(command.op)
impl<B: Backend> Handler<CompactCollection, B> for ServerDispatcher {
command: CompactCollection,
) -> HandlerResult<CompactCollection> {
.compact_collection_by_name(command.name)
impl<B: Backend> Handler<CompactKeyValueStore, B> for ServerDispatcher {
command: CompactKeyValueStore,
) -> HandlerResult<CompactKeyValueStore> {
.compact_key_value_store()
impl<B: Backend> Handler<Compact, B> for ServerDispatcher {
async fn handle(client: HandlerSession<'_, B>, command: Compact) -> HandlerResult<Compact> {
let database = client
database.compact().await.map_err(HandlerError::from)