Lines
91.48 %
Functions
13.66 %
Branches
100 %
#[cfg(feature = "password-hashing")]
use bonsaidb_core::networking::{Authenticate, SetUserPassword};
use bonsaidb_core::{
arc_bytes::serde::Bytes,
async_trait::async_trait,
connection::{AsyncConnection, AsyncLowLevelConnection, AsyncStorageConnection, HasSession},
keyvalue::AsyncKeyValue,
networking::{
AlterUserPermissionGroupMembership, AlterUserRoleMembership, ApplyTransaction,
AssumeIdentity, Compact, CompactCollection, CompactKeyValueStore, Count, CreateDatabase,
CreateSubscriber, CreateUser, DeleteDatabase, DeleteDocs, DeleteUser, ExecuteKeyOperation,
Get, GetMultiple, LastTransactionId, List, ListAvailableSchemas, ListDatabases,
ListExecutedTransactions, ListHeaders, LogOutSession, Publish, PublishToAll, Query,
QueryWithDocs, Reduce, ReduceGrouped, SubscribeTo, UnregisterSubscriber, UnsubscribeFrom,
},
pubsub::AsyncPubSub,
schema::ApiName,
};
use crate::{
api::{Handler, HandlerError, HandlerResult, HandlerSession},
Backend, Error, ServerConfiguration,
#[cfg_attr(not(feature = "password-hashing"), allow(unused_mut))]
pub fn register_api_handlers<B: Backend>(
config: ServerConfiguration<B>,
) -> Result<ServerConfiguration<B>, Error> {
let mut config = config
.with_api::<ServerDispatcher, AlterUserPermissionGroupMembership>()?
.with_api::<ServerDispatcher, AlterUserRoleMembership>()?
.with_api::<ServerDispatcher, ApplyTransaction>()?
.with_api::<ServerDispatcher, AssumeIdentity>()?
.with_api::<ServerDispatcher, Compact>()?
.with_api::<ServerDispatcher, CompactCollection>()?
.with_api::<ServerDispatcher, CompactKeyValueStore>()?
.with_api::<ServerDispatcher, Count>()?
.with_api::<ServerDispatcher, CreateDatabase>()?
.with_api::<ServerDispatcher, CreateSubscriber>()?
.with_api::<ServerDispatcher, CreateUser>()?
.with_api::<ServerDispatcher, DeleteDatabase>()?
.with_api::<ServerDispatcher, DeleteDocs>()?
.with_api::<ServerDispatcher, DeleteUser>()?
.with_api::<ServerDispatcher, ExecuteKeyOperation>()?
.with_api::<ServerDispatcher, Get>()?
.with_api::<ServerDispatcher, GetMultiple>()?
.with_api::<ServerDispatcher, LastTransactionId>()?
.with_api::<ServerDispatcher, List>()?
.with_api::<ServerDispatcher, ListHeaders>()?
.with_api::<ServerDispatcher, ListAvailableSchemas>()?
.with_api::<ServerDispatcher, ListDatabases>()?
.with_api::<ServerDispatcher, ListExecutedTransactions>()?
.with_api::<ServerDispatcher, LogOutSession>()?
.with_api::<ServerDispatcher, Publish>()?
.with_api::<ServerDispatcher, PublishToAll>()?
.with_api::<ServerDispatcher, Query>()?
.with_api::<ServerDispatcher, QueryWithDocs>()?
.with_api::<ServerDispatcher, Reduce>()?
.with_api::<ServerDispatcher, ReduceGrouped>()?
.with_api::<ServerDispatcher, SubscribeTo>()?
.with_api::<ServerDispatcher, UnregisterSubscriber>()?
.with_api::<ServerDispatcher, UnsubscribeFrom>()?;
{
config = config
.with_api::<ServerDispatcher, Authenticate>()?
.with_api::<ServerDispatcher, SetUserPassword>()?;
}
Ok(config)
#[derive(Debug)]
pub struct ServerDispatcher;
impl ServerDispatcher {
pub async fn dispatch_api_request<B: Backend>(
session: HandlerSession<'_, B>,
name: &ApiName,
request: Bytes,
) -> Result<Bytes, Error> {
if let Some(dispatcher) = session.server.custom_api_dispatcher(name) {
dispatcher.handle(session, &request).await
} else {
Err(Error::from(bonsaidb_core::Error::ApiNotFound(name.clone())))
#[async_trait]
impl<B: Backend> Handler<B, CreateDatabase> for ServerDispatcher {
async fn handle(
request: CreateDatabase,
) -> HandlerResult<CreateDatabase> {
session
.as_client
.create_database_with_schema(
&request.database.name,
request.database.schema,
request.only_if_needed,
)
.await?;
Ok(())
impl<B: Backend> Handler<B, DeleteDatabase> for ServerDispatcher {
command: DeleteDatabase,
) -> HandlerResult<DeleteDatabase> {
session.as_client.delete_database(&command.name).await?;
impl<B: Backend> Handler<B, ListDatabases> for ServerDispatcher {
_command: ListDatabases,
) -> HandlerResult<ListDatabases> {
.list_databases()
.await
.map_err(HandlerError::from)
impl<B: Backend> Handler<B, ListAvailableSchemas> for ServerDispatcher {
_command: ListAvailableSchemas,
) -> HandlerResult<ListAvailableSchemas> {
.list_available_schemas()
impl<B: Backend> Handler<B, CreateUser> for ServerDispatcher {
command: CreateUser,
) -> HandlerResult<CreateUser> {
.create_user(&command.username)
impl<B: Backend> Handler<B, DeleteUser> for ServerDispatcher {
command: DeleteUser,
) -> HandlerResult<DeleteUser> {
.delete_user(command.user)
impl<B: Backend> Handler<B, SetUserPassword> for ServerDispatcher {
command: SetUserPassword,
) -> HandlerResult<SetUserPassword> {
.set_user_password(command.user, command.password)
impl<B: Backend> Handler<B, Authenticate> for ServerDispatcher {
command: Authenticate,
) -> HandlerResult<Authenticate> {
let authenticated = session
.authenticate(command.user, command.authentication)
let new_session = authenticated.session().cloned().unwrap();
session.client.logged_in_as(new_session.clone());
Ok(new_session)
impl<B: Backend> Handler<B, AssumeIdentity> for ServerDispatcher {
command: AssumeIdentity,
) -> HandlerResult<AssumeIdentity> {
let authenticated = session.as_client.assume_identity(command.0).await?;
impl<B: Backend> Handler<B, LogOutSession> for ServerDispatcher {
command: LogOutSession,
) -> HandlerResult<LogOutSession> {
session.client.log_out(command.0);
impl<B: Backend> Handler<B, AlterUserPermissionGroupMembership> for ServerDispatcher {
command: AlterUserPermissionGroupMembership,
) -> HandlerResult<AlterUserPermissionGroupMembership> {
if command.should_be_member {
.add_permission_group_to_user(command.user, command.group)
.remove_permission_group_from_user(command.user, command.group)
impl<B: Backend> Handler<B, AlterUserRoleMembership> for ServerDispatcher {
command: AlterUserRoleMembership,
) -> HandlerResult<AlterUserRoleMembership> {
.add_role_to_user(command.user, command.role)
.remove_role_from_user(command.user, command.role)
impl<B: Backend> Handler<B, Get> for ServerDispatcher {
async fn handle(session: HandlerSession<'_, B>, command: Get) -> HandlerResult<Get> {
let database = session
.database_without_schema(&command.database)
database
.get_from_collection(command.id, &command.collection)
impl<B: Backend> Handler<B, GetMultiple> for ServerDispatcher {
command: GetMultiple,
) -> HandlerResult<GetMultiple> {
.get_multiple_from_collection(&command.ids, &command.collection)
impl<B: Backend> Handler<B, List> for ServerDispatcher {
async fn handle(session: HandlerSession<'_, B>, command: List) -> HandlerResult<List> {
.list_from_collection(
command.ids,
command.order,
command.limit,
&command.collection,
impl<B: Backend> Handler<B, ListHeaders> for ServerDispatcher {
command: ListHeaders,
) -> HandlerResult<ListHeaders> {
.database_without_schema(&command.0.database)
.list_headers_from_collection(
command.0.ids,
command.0.order,
command.0.limit,
&command.0.collection,
impl<B: Backend> Handler<B, Count> for ServerDispatcher {
async fn handle(session: HandlerSession<'_, B>, command: Count) -> HandlerResult<Count> {
.count_from_collection(command.ids, &command.collection)
impl<B: Backend> Handler<B, Query> for ServerDispatcher {
async fn handle(session: HandlerSession<'_, B>, command: Query) -> HandlerResult<Query> {
.query_by_name(
&command.view,
command.key,
command.access_policy,
impl<B: Backend> Handler<B, QueryWithDocs> for ServerDispatcher {
command: QueryWithDocs,
) -> HandlerResult<QueryWithDocs> {
.query_by_name_with_docs(
&command.0.view,
command.0.key,
command.0.access_policy,
impl<B: Backend> Handler<B, Reduce> for ServerDispatcher {
async fn handle(session: HandlerSession<'_, B>, command: Reduce) -> HandlerResult<Reduce> {
.reduce_by_name(&command.view, command.key, command.access_policy)
.map(Bytes::from)
impl<B: Backend> Handler<B, ReduceGrouped> for ServerDispatcher {
command: ReduceGrouped,
) -> HandlerResult<ReduceGrouped> {
.reduce_grouped_by_name(&command.0.view, command.0.key, command.0.access_policy)
impl<B: Backend> Handler<B, ApplyTransaction> for ServerDispatcher {
command: ApplyTransaction,
) -> HandlerResult<ApplyTransaction> {
.apply_transaction(command.transaction)
impl<B: Backend> Handler<B, DeleteDocs> for ServerDispatcher {
command: DeleteDocs,
) -> HandlerResult<DeleteDocs> {
.delete_docs_by_name(&command.view, command.key, command.access_policy)
impl<B: Backend> Handler<B, ListExecutedTransactions> for ServerDispatcher {
command: ListExecutedTransactions,
) -> HandlerResult<ListExecutedTransactions> {
.list_executed_transactions(command.starting_id, command.result_limit)
impl<B: Backend> Handler<B, LastTransactionId> for ServerDispatcher {
command: LastTransactionId,
) -> HandlerResult<LastTransactionId> {
.last_transaction_id()
impl<B: Backend> Handler<B, CreateSubscriber> for ServerDispatcher {
command: CreateSubscriber,
) -> HandlerResult<CreateSubscriber> {
let subscriber = database.create_subscriber().await?;
let subscriber_id = subscriber.id();
session.client.register_subscriber(
subscriber,
session.as_client.session().and_then(|session| session.id),
);
Ok(subscriber_id)
impl<B: Backend> Handler<B, Publish> for ServerDispatcher {
async fn handle(session: HandlerSession<'_, B>, command: Publish) -> HandlerResult<Publish> {
.publish_bytes(command.topic.into_vec(), command.payload.into_vec())
impl<B: Backend> Handler<B, PublishToAll> for ServerDispatcher {
command: PublishToAll,
) -> HandlerResult<PublishToAll> {
.publish_bytes_to_all(
command.topics.into_iter().map(Bytes::into_vec),
command.payload.into_vec(),
impl<B: Backend> Handler<B, SubscribeTo> for ServerDispatcher {
command: SubscribeTo,
) -> HandlerResult<SubscribeTo> {
.client
.subscribe_by_id(
command.subscriber_id,
command.topic,
impl<B: Backend> Handler<B, UnsubscribeFrom> for ServerDispatcher {
command: UnsubscribeFrom,
) -> HandlerResult<UnsubscribeFrom> {
.unsubscribe_by_id(
&command.topic,
impl<B: Backend> Handler<B, UnregisterSubscriber> for ServerDispatcher {
command: UnregisterSubscriber,
) -> HandlerResult<UnregisterSubscriber> {
.unregister_subscriber_by_id(
impl<B: Backend> Handler<B, ExecuteKeyOperation> for ServerDispatcher {
command: ExecuteKeyOperation,
) -> HandlerResult<ExecuteKeyOperation> {
.execute_key_operation(command.op)
impl<B: Backend> Handler<B, CompactCollection> for ServerDispatcher {
command: CompactCollection,
) -> HandlerResult<CompactCollection> {
.compact_collection_by_name(command.name)
impl<B: Backend> Handler<B, CompactKeyValueStore> for ServerDispatcher {
command: CompactKeyValueStore,
) -> HandlerResult<CompactKeyValueStore> {
.compact_key_value_store()
impl<B: Backend> Handler<B, Compact> for ServerDispatcher {
async fn handle(client: HandlerSession<'_, B>, command: Compact) -> HandlerResult<Compact> {
let database = client
database.compact().await.map_err(HandlerError::from)