use std::borrow::Cow;
use std::collections::{HashMap, HashSet};
use std::fmt::{Debug, Display};
use std::fs::{self, File};
use std::io::{Read, Write};
use std::marker::PhantomData;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Weak};
use bonsaidb_core::admin::database::{self, ByName, Database as DatabaseRecord};
use bonsaidb_core::admin::user::User;
use bonsaidb_core::admin::{self, Admin, PermissionGroup, Role, ADMIN_DATABASE_NAME};
use bonsaidb_core::circulate;
pub use bonsaidb_core::circulate::Relay;
use bonsaidb_core::connection::{
self, Connection, HasSession, Identity, IdentityReference, LowLevelConnection, Session,
SessionAuthentication, SessionId, StorageConnection,
};
use bonsaidb_core::document::CollectionDocument;
#[cfg(any(feature = "encryption", feature = "compression"))]
use bonsaidb_core::document::KeyId;
use bonsaidb_core::permissions::bonsai::{
bonsaidb_resource_name, database_resource_name, role_resource_name, user_resource_name,
BonsaiAction, ServerAction,
};
use bonsaidb_core::permissions::Permissions;
use bonsaidb_core::schema::{
Nameable, NamedCollection, Schema, SchemaName, SchemaSummary, Schematic,
};
use fs2::FileExt;
use itertools::Itertools;
use nebari::io::any::{AnyFile, AnyFileManager};
use nebari::io::FileManager;
use nebari::{ChunkCache, ThreadPool};
use parking_lot::{Mutex, RwLock};
use rand::{thread_rng, Rng};
#[cfg(feature = "compression")]
use crate::config::Compression;
use crate::config::{KeyValuePersistence, StorageConfiguration};
use crate::database::Context;
use crate::tasks::manager::Manager;
use crate::tasks::TaskManager;
#[cfg(feature = "encryption")]
use crate::vault::{self, LocalVaultKeyStorage, Vault};
use crate::{Database, Error};
#[cfg(feature = "password-hashing")]
mod argon;
#[cfg(feature = "token-authentication")]
mod token_authentication;
mod backup;
mod pubsub;
pub use backup::{AnyBackupLocation, BackupLocation};
#[derive(Debug, Clone)]
#[must_use]
pub struct Storage {
pub(crate) instance: StorageInstance,
pub(crate) authentication: Option<Arc<AuthenticatedSession>>,
effective_session: Option<Arc<Session>>,
}
#[derive(Debug)]
pub struct AuthenticatedSession {
storage: Weak<Data>,
pub session: Mutex<Session>,
}
#[derive(Debug, Default)]
pub struct SessionSubscribers {
pub subscribers: HashMap<u64, SessionSubscriber>,
pub subscribers_by_session: HashMap<SessionId, HashSet<u64>>,
pub last_id: u64,
}
impl SessionSubscribers {
pub fn unregister(&mut self, subscriber_id: u64) {
if let Some(session_id) = self
.subscribers
.remove(&subscriber_id)
.and_then(|sub| sub.session_id)
{
if let Some(session_subscribers) = self.subscribers_by_session.get_mut(&session_id) {
session_subscribers.remove(&subscriber_id);
}
}
}
}
#[derive(Debug)]
pub struct SessionSubscriber {
pub session_id: Option<SessionId>,
pub subscriber: circulate::Subscriber,
}
impl Drop for AuthenticatedSession {
fn drop(&mut self) {
let mut session = self.session.lock();
if let Some(id) = session.id.take() {
if let Some(storage) = self.storage.upgrade() {
let mut sessions = storage.sessions.write();
sessions.sessions.remove(&id);
let mut sessions = storage.subscribers.write();
for id in sessions
.subscribers_by_session
.remove(&id)
.into_iter()
.flatten()
{
sessions.subscribers.remove(&id);
}
}
}
}
}
#[derive(Debug, Default)]
struct AuthenticatedSessions {
sessions: HashMap<SessionId, Arc<AuthenticatedSession>>,
last_session_id: u64,
}
#[derive(Debug, Clone)]
pub struct StorageInstance {
data: Arc<Data>,
}
impl From<StorageInstance> for Storage {
fn from(instance: StorageInstance) -> Self {
Self {
instance,
authentication: None,
effective_session: None,
}
}
}
struct Data {
lock: StorageLock,
path: PathBuf,
parallelization: usize,
threadpool: ThreadPool<AnyFile>,
file_manager: AnyFileManager,
pub(crate) tasks: TaskManager,
schemas: RwLock<HashMap<SchemaName, Arc<dyn DatabaseOpener>>>,
available_databases: RwLock<HashMap<String, SchemaName>>,
open_roots: Mutex<HashMap<String, Context>>,
authenticated_permissions: Permissions,
sessions: RwLock<AuthenticatedSessions>,
pub(crate) subscribers: Arc<RwLock<SessionSubscribers>>,
#[cfg(feature = "password-hashing")]
argon: argon::Hasher,
#[cfg(feature = "encryption")]
pub(crate) vault: Arc<Vault>,
#[cfg(feature = "encryption")]
default_encryption_key: Option<KeyId>,
#[cfg(any(feature = "compression", feature = "encryption"))]
tree_vault: Option<TreeVault>,
pub(crate) key_value_persistence: KeyValuePersistence,
chunk_cache: ChunkCache,
pub(crate) check_view_integrity_on_database_open: bool,
relay: Relay,
}
impl Storage {
pub fn open(configuration: StorageConfiguration) -> Result<Self, Error> {
let owned_path = configuration
.path
.clone()
.unwrap_or_else(|| PathBuf::from("db.bonsaidb"));
let file_manager = if configuration.memory_only {
AnyFileManager::memory()
} else {
AnyFileManager::std()
};
let manager = Manager::default();
for _ in 0..configuration.workers.worker_count {
manager.spawn_worker();
}
let tasks = TaskManager::new(manager);
fs::create_dir_all(&owned_path)?;
let storage_lock = Self::lookup_or_create_id(&configuration, &owned_path)?;
#[cfg(feature = "encryption")]
let vault = {
let vault_key_storage = match configuration.vault_key_storage {
Some(storage) => storage,
None => Arc::new(
LocalVaultKeyStorage::new(owned_path.join("vault-keys"))
.map_err(|err| Error::Vault(vault::Error::Initializing(err.to_string())))?,
),
};
Arc::new(Vault::initialize(
storage_lock.id(),
&owned_path,
vault_key_storage,
)?)
};
let parallelization = configuration.workers.parallelization;
let check_view_integrity_on_database_open = configuration.views.check_integrity_on_open;
let key_value_persistence = configuration.key_value_persistence;
#[cfg(feature = "password-hashing")]
let argon = argon::Hasher::new(configuration.argon);
#[cfg(feature = "encryption")]
let default_encryption_key = configuration.default_encryption_key;
#[cfg(all(feature = "compression", feature = "encryption"))]
let tree_vault = TreeVault::new_if_needed(
default_encryption_key.clone(),
&vault,
configuration.default_compression,
);
#[cfg(all(not(feature = "compression"), feature = "encryption"))]
let tree_vault = TreeVault::new_if_needed(default_encryption_key.clone(), &vault);
#[cfg(all(feature = "compression", not(feature = "encryption")))]
let tree_vault = TreeVault::new_if_needed(configuration.default_compression);
let authenticated_permissions = configuration.authenticated_permissions;
let storage = Self {
instance: StorageInstance {
data: Arc::new(Data {
lock: storage_lock,
tasks,
parallelization,
subscribers: Arc::default(),
authenticated_permissions,
sessions: RwLock::default(),
#[cfg(feature = "password-hashing")]
argon,
#[cfg(feature = "encryption")]
vault,
#[cfg(feature = "encryption")]
default_encryption_key,
#[cfg(any(feature = "compression", feature = "encryption"))]
tree_vault,
path: owned_path,
file_manager,
chunk_cache: ChunkCache::new(2000, 160_384),
threadpool: ThreadPool::new(parallelization),
schemas: RwLock::new(configuration.initial_schemas),
available_databases: RwLock::default(),
open_roots: Mutex::default(),
key_value_persistence,
check_view_integrity_on_database_open,
relay: Relay::default(),
}),
},
authentication: None,
effective_session: None,
};
storage.cache_available_databases()?;
storage.create_admin_database_if_needed()?;
Ok(storage)
}
#[cfg(feature = "internal-apis")]
#[doc(hidden)]
pub fn database_without_schema(&self, name: &str) -> Result<Database, Error> {
let name = name.to_owned();
self.instance
.database_without_schema(&name, Some(self), None)
}
fn lookup_or_create_id(
configuration: &StorageConfiguration,
path: &Path,
) -> Result<StorageLock, Error> {
let id_path = {
let storage_id = path.join("server-id");
if storage_id.exists() {
storage_id
} else {
path.join("storage-id")
}
};
let (id, file) = if let Some(id) = configuration.unique_id {
let file = if id_path.exists() {
File::open(id_path)?
} else {
let mut file = File::create(id_path)?;
let id = id.to_string();
file.write_all(id.as_bytes())?;
file
};
file.lock_exclusive()?;
(id, file)
} else {
if id_path.exists() {
let mut file = File::open(id_path)?;
file.lock_exclusive()?;
let mut bytes = Vec::new();
file.read_to_end(&mut bytes)?;
let existing_id =
String::from_utf8(bytes).expect("server-id contains invalid data");
(existing_id.parse().expect("server-id isn't numeric"), file)
} else {
let id = { thread_rng().gen::<u64>() };
let mut file = File::create(id_path)?;
file.lock_exclusive()?;
file.write_all(id.to_string().as_bytes())?;
(id, file)
}
};
Ok(StorageLock::new(StorageId(id), file))
}
fn cache_available_databases(&self) -> Result<(), Error> {
let available_databases = self
.admin()
.view::<ByName>()
.query()?
.into_iter()
.map(|map| (map.key, map.value))
.collect();
let mut storage_databases = self.instance.data.available_databases.write();
*storage_databases = available_databases;
Ok(())
}
fn create_admin_database_if_needed(&self) -> Result<(), Error> {
self.register_schema::<Admin>()?;
match self.database::<Admin>(ADMIN_DATABASE_NAME) {
Ok(_) => {}
Err(bonsaidb_core::Error::DatabaseNotFound(_)) => {
drop(self.create_database::<Admin>(ADMIN_DATABASE_NAME, true)?);
}
Err(err) => return Err(Error::Core(err)),
}
Ok(())
}
#[must_use]
pub fn unique_id(&self) -> StorageId {
self.instance.data.lock.id()
}
#[must_use]
pub(crate) fn parallelization(&self) -> usize {
self.instance.data.parallelization
}
#[must_use]
#[cfg(feature = "encryption")]
pub(crate) fn vault(&self) -> &Arc<Vault> {
&self.instance.data.vault
}
#[must_use]
#[cfg(any(feature = "encryption", feature = "compression"))]
pub(crate) fn tree_vault(&self) -> Option<&TreeVault> {
self.instance.data.tree_vault.as_ref()
}
#[must_use]
#[cfg(feature = "encryption")]
pub(crate) fn default_encryption_key(&self) -> Option<&KeyId> {
self.instance.data.default_encryption_key.as_ref()
}
#[must_use]
#[cfg(all(feature = "compression", not(feature = "encryption")))]
#[allow(clippy::unused_self)]
pub(crate) fn default_encryption_key(&self) -> Option<&KeyId> {
None
}
pub fn register_schema<DB: Schema>(&self) -> Result<(), Error> {
let mut schemas = self.instance.data.schemas.write();
if schemas
.insert(
DB::schema_name(),
Arc::new(StorageSchemaOpener::<DB>::new()?),
)
.is_none()
{
Ok(())
} else {
Err(Error::Core(bonsaidb_core::Error::SchemaAlreadyRegistered(
DB::schema_name(),
)))
}
}
fn validate_name(name: &str) -> Result<(), Error> {
if name.chars().enumerate().all(|(index, c)| {
c.is_ascii_alphanumeric()
|| (index == 0 && c == '_')
|| (index > 0 && (c == '.' || c == '-'))
}) {
Ok(())
} else {
Err(Error::Core(bonsaidb_core::Error::InvalidDatabaseName(
name.to_owned(),
)))
}
}
#[must_use]
pub fn with_effective_permissions(&self, effective_permissions: Permissions) -> Option<Self> {
if self.effective_session.is_some() {
None
} else {
Some(Self {
instance: self.instance.clone(),
authentication: self.authentication.clone(),
effective_session: Some(Arc::new(Session {
id: None,
authentication: SessionAuthentication::None,
permissions: effective_permissions,
})),
})
}
}
#[cfg(feature = "async")]
pub fn into_async(self) -> crate::AsyncStorage {
self.into_async_with_runtime(tokio::runtime::Handle::current())
}
#[cfg(feature = "async")]
pub fn into_async_with_runtime(self, runtime: tokio::runtime::Handle) -> crate::AsyncStorage {
crate::AsyncStorage {
storage: self,
runtime: Arc::new(runtime),
}
}
#[cfg(feature = "async")]
pub fn to_async(&self) -> crate::AsyncStorage {
self.clone().into_async()
}
#[cfg(feature = "async")]
pub fn to_async_with_runtime(&self, runtime: tokio::runtime::Handle) -> crate::AsyncStorage {
self.clone().into_async_with_runtime(runtime)
}
}
impl Debug for Data {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut f = f.debug_struct("Data");
f.field("lock", &self.lock)
.field("path", &self.path)
.field("parallelization", &self.parallelization)
.field("threadpool", &self.threadpool)
.field("file_manager", &self.file_manager)
.field("tasks", &self.tasks)
.field("available_databases", &self.available_databases)
.field("open_roots", &self.open_roots)
.field("authenticated_permissions", &self.authenticated_permissions)
.field("sessions", &self.sessions)
.field("subscribers", &self.subscribers)
.field("key_value_persistence", &self.key_value_persistence)
.field("chunk_cache", &self.chunk_cache)
.field(
"check_view_integrity_on_database_open",
&self.check_view_integrity_on_database_open,
)
.field("relay", &self.relay);
if let Some(schemas) = self.schemas.try_read() {
let mut schemas = schemas.keys().collect::<Vec<_>>();
schemas.sort();
f.field("schemas", &schemas);
} else {
f.field("schemas", &"RwLock locked");
}
#[cfg(feature = "password-hashing")]
f.field("argon", &self.argon);
#[cfg(feature = "encryption")]
{
f.field("vault", &self.vault)
.field("default_encryption_key", &self.default_encryption_key);
}
#[cfg(any(feature = "compression", feature = "encryption"))]
f.field("tree_vault", &self.tree_vault);
f.finish()
}
}
impl StorageInstance {
#[cfg_attr(
not(any(feature = "encryption", feature = "compression")),
allow(unused_mut)
)]
pub(crate) fn open_roots(&self, name: &str) -> Result<Context, Error> {
let mut open_roots = self.data.open_roots.lock();
if let Some(roots) = open_roots.get(name) {
Ok(roots.clone())
} else {
let task_name = name.to_string();
let mut config = nebari::Config::new(self.data.path.join(task_name))
.file_manager(self.data.file_manager.clone())
.cache(self.data.chunk_cache.clone())
.shared_thread_pool(&self.data.threadpool);
#[cfg(any(feature = "encryption", feature = "compression"))]
if let Some(vault) = self.data.tree_vault.clone() {
config = config.vault(vault);
}
let roots = config.open().map_err(Error::from)?;
let context = Context::new(
roots,
self.data.key_value_persistence.clone(),
Some(self.data.lock.clone()),
);
open_roots.insert(name.to_owned(), context.clone());
Ok(context)
}
}
pub(crate) fn tasks(&self) -> &'_ TaskManager {
&self.data.tasks
}
pub(crate) fn check_view_integrity_on_database_open(&self) -> bool {
self.data.check_view_integrity_on_database_open
}
pub(crate) fn relay(&self) -> &'_ Relay {
&self.data.relay
}
pub(crate) fn database_without_schema(
&self,
name: &str,
storage: Option<&Storage>,
expected_schema: Option<SchemaName>,
) -> Result<Database, Error> {
let stored_schema = {
let available_databases = self.data.available_databases.read();
available_databases
.get(name)
.ok_or_else(|| {
Error::Core(bonsaidb_core::Error::DatabaseNotFound(name.to_string()))
})?
.clone()
};
if let Some(expected_schema) = expected_schema {
if stored_schema != expected_schema {
return Err(Error::Core(bonsaidb_core::Error::SchemaMismatch {
database_name: name.to_owned(),
schema: expected_schema,
stored_schema,
}));
}
}
let mut schemas = self.data.schemas.write();
let storage =
storage.map_or_else(|| Cow::Owned(Storage::from(self.clone())), Cow::Borrowed);
if let Some(schema) = schemas.get_mut(&stored_schema) {
let db = schema.open(name.to_string(), storage.as_ref())?;
Ok(db)
} else {
Err(Error::Core(bonsaidb_core::Error::SchemaNotRegistered(
stored_schema,
)))
}
}
fn update_user_with_named_id<
'user,
'other,
Col: NamedCollection<PrimaryKey = u64>,
U: Nameable<'user, u64> + Send + Sync,
O: Nameable<'other, u64> + Send + Sync,
F: FnOnce(&mut CollectionDocument<User>, u64) -> Result<bool, bonsaidb_core::Error>,
>(
&self,
user: U,
other: O,
callback: F,
) -> Result<(), bonsaidb_core::Error> {
let admin = self.admin();
let other = other.name()?;
let user = User::load(user.name()?, &admin)?;
let other = other.id::<Col, _>(&admin)?;
match (user, other) {
(Some(mut user), Some(other)) => {
if callback(&mut user, other)? {
user.update(&admin)?;
}
Ok(())
}
_ => Err(bonsaidb_core::Error::UserNotFound),
}
}
#[cfg(any(feature = "token-authentication", feature = "password-hashing"))]
#[cfg_attr(
any(
not(feature = "token-authentication"),
not(feature = "password-hashing")
),
allow(unused_variables, clippy::needless_pass_by_value)
)]
fn authenticate_inner(
&self,
authentication: bonsaidb_core::connection::Authentication,
loaded_user: Option<CollectionDocument<User>>,
current_session_id: Option<SessionId>,
admin: &Database,
) -> Result<Storage, bonsaidb_core::Error> {
use bonsaidb_core::connection::Authentication;
match authentication {
#[cfg(feature = "token-authentication")]
Authentication::Token {
id,
now,
now_hash,
algorithm,
} => self.begin_token_authentication(id, now, &now_hash, algorithm, admin),
#[cfg(feature = "token-authentication")]
Authentication::TokenChallengeResponse(hash) => {
let session_id =
current_session_id.ok_or(bonsaidb_core::Error::InvalidCredentials)?;
self.finish_token_authentication(session_id, &hash, admin)
}
#[cfg(feature = "password-hashing")]
Authentication::Password { user, password } => {
let user = match loaded_user {
Some(user) => user,
None => {
User::load(user, admin)?.ok_or(bonsaidb_core::Error::InvalidCredentials)?
}
};
let saved_hash = user
.contents
.argon_hash
.clone()
.ok_or(bonsaidb_core::Error::InvalidCredentials)?;
self.data
.argon
.verify(user.header.id, password, saved_hash)?;
self.assume_user(user, admin)
}
}
}
fn assume_user(
&self,
user: CollectionDocument<User>,
admin: &Database,
) -> Result<Storage, bonsaidb_core::Error> {
let permissions = user.contents.effective_permissions(
admin,
&admin.storage().instance.data.authenticated_permissions,
)?;
let mut sessions = self.data.sessions.write();
sessions.last_session_id += 1;
let session_id = SessionId(sessions.last_session_id);
let session = Session {
id: Some(session_id),
authentication: SessionAuthentication::Identity(Arc::new(Identity::User {
id: user.header.id,
username: user.contents.username,
})),
permissions,
};
let authentication = Arc::new(AuthenticatedSession {
storage: Arc::downgrade(&self.data),
session: Mutex::new(session.clone()),
});
sessions.sessions.insert(session_id, authentication.clone());
Ok(Storage {
instance: self.clone(),
authentication: Some(authentication),
effective_session: Some(Arc::new(session)),
})
}
fn assume_role(
&self,
role: CollectionDocument<Role>,
admin: &Database,
) -> Result<Storage, bonsaidb_core::Error> {
let permissions = role.contents.effective_permissions(
admin,
&admin.storage().instance.data.authenticated_permissions,
)?;
let mut sessions = self.data.sessions.write();
sessions.last_session_id += 1;
let session_id = SessionId(sessions.last_session_id);
let session = Session {
id: Some(session_id),
authentication: SessionAuthentication::Identity(Arc::new(Identity::Role {
id: role.header.id,
name: role.contents.name,
})),
permissions,
};
let authentication = Arc::new(AuthenticatedSession {
storage: Arc::downgrade(&self.data),
session: Mutex::new(session.clone()),
});
sessions.sessions.insert(session_id, authentication.clone());
Ok(Storage {
instance: self.clone(),
authentication: Some(authentication),
effective_session: Some(Arc::new(session)),
})
}
fn add_permission_group_to_user_inner(
user: &mut CollectionDocument<User>,
permission_group_id: u64,
) -> bool {
if user.contents.groups.contains(&permission_group_id) {
false
} else {
user.contents.groups.push(permission_group_id);
true
}
}
fn remove_permission_group_from_user_inner(
user: &mut CollectionDocument<User>,
permission_group_id: u64,
) -> bool {
let old_len = user.contents.groups.len();
user.contents.groups.retain(|id| id != &permission_group_id);
old_len != user.contents.groups.len()
}
fn add_role_to_user_inner(user: &mut CollectionDocument<User>, role_id: u64) -> bool {
if user.contents.roles.contains(&role_id) {
false
} else {
user.contents.roles.push(role_id);
true
}
}
fn remove_role_from_user_inner(user: &mut CollectionDocument<User>, role_id: u64) -> bool {
let old_len = user.contents.roles.len();
user.contents.roles.retain(|id| id != &role_id);
old_len != user.contents.roles.len()
}
}
pub trait DatabaseOpener: Send + Sync {
fn schematic(&self) -> &'_ Schematic;
fn open(&self, name: String, storage: &Storage) -> Result<Database, Error>;
}
pub struct StorageSchemaOpener<DB: Schema> {
schematic: Schematic,
_phantom: PhantomData<DB>,
}
impl<DB> StorageSchemaOpener<DB>
where
DB: Schema,
{
pub fn new() -> Result<Self, Error> {
let schematic = DB::schematic()?;
Ok(Self {
schematic,
_phantom: PhantomData,
})
}
}
impl<DB> DatabaseOpener for StorageSchemaOpener<DB>
where
DB: Schema,
{
fn schematic(&self) -> &'_ Schematic {
&self.schematic
}
fn open(&self, name: String, storage: &Storage) -> Result<Database, Error> {
let roots = storage.instance.open_roots(&name)?;
let db = Database::new::<DB, _>(name, roots, storage)?;
Ok(db)
}
}
impl HasSession for StorageInstance {
fn session(&self) -> Option<&Session> {
None
}
}
impl StorageConnection for StorageInstance {
type Authenticated = Storage;
type Database = Database;
fn admin(&self) -> Self::Database {
Database::new::<Admin, _>(
ADMIN_DATABASE_NAME,
self.open_roots(ADMIN_DATABASE_NAME).unwrap(),
&Storage::from(self.clone()),
)
.unwrap()
}
#[cfg_attr(feature = "tracing", tracing::instrument(
level = "trace",
skip(self, schema),
fields(
schema.authority = schema.authority.as_ref(),
schema.name = schema.name.as_ref(),
)
))]
fn create_database_with_schema(
&self,
name: &str,
schema: SchemaName,
only_if_needed: bool,
) -> Result<(), bonsaidb_core::Error> {
Storage::validate_name(name)?;
{
let schemas = self.data.schemas.read();
if !schemas.contains_key(&schema) {
return Err(bonsaidb_core::Error::SchemaNotRegistered(schema));
}
}
let mut available_databases = self.data.available_databases.write();
let admin = self.admin();
if !available_databases.contains_key(name) {
admin
.collection::<DatabaseRecord>()
.push(&admin::Database {
name: name.to_string(),
schema: schema.clone(),
})?;
available_databases.insert(name.to_string(), schema);
} else if !only_if_needed {
return Err(bonsaidb_core::Error::DatabaseNameAlreadyTaken(
name.to_string(),
));
}
Ok(())
}
fn database<DB: Schema>(&self, name: &str) -> Result<Self::Database, bonsaidb_core::Error> {
self.database_without_schema(name, None, Some(DB::schema_name()))
.map_err(bonsaidb_core::Error::from)
}
#[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip(self)))]
fn delete_database(&self, name: &str) -> Result<(), bonsaidb_core::Error> {
let admin = self.admin();
let mut available_databases = self.data.available_databases.write();
available_databases.remove(name);
let mut open_roots = self.data.open_roots.lock();
open_roots.remove(name);
let database_folder = self.data.path.join(name);
if database_folder.exists() {
let file_manager = self.data.file_manager.clone();
file_manager
.delete_directory(&database_folder)
.map_err(Error::Nebari)?;
}
if let Some(entry) = admin
.view::<database::ByName>()
.with_key(name)
.query()?
.first()
{
admin.delete::<DatabaseRecord, _>(&entry.source)?;
Ok(())
} else {
Err(bonsaidb_core::Error::DatabaseNotFound(name.to_string()))
}
}
#[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
fn list_databases(&self) -> Result<Vec<connection::Database>, bonsaidb_core::Error> {
let available_databases = self.data.available_databases.read();
Ok(available_databases
.iter()
.map(|(name, schema)| connection::Database {
name: name.to_string(),
schema: schema.clone(),
})
.collect())
}
#[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
fn list_available_schemas(&self) -> Result<Vec<SchemaSummary>, bonsaidb_core::Error> {
let available_databases = self.data.available_databases.read();
let schemas = self.data.schemas.read();
Ok(available_databases
.values()
.unique()
.filter_map(|name| {
schemas
.get(name)
.map(|opener| SchemaSummary::from(opener.schematic()))
})
.collect())
}
#[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
fn create_user(&self, username: &str) -> Result<u64, bonsaidb_core::Error> {
let result = self
.admin()
.collection::<User>()
.push(&User::default_with_username(username))?;
Ok(result.id)
}
#[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
fn delete_user<'user, U: Nameable<'user, u64> + Send + Sync>(
&self,
user: U,
) -> Result<(), bonsaidb_core::Error> {
let admin = self.admin();
let user = User::load(user, &admin)?.ok_or(bonsaidb_core::Error::UserNotFound)?;
user.delete(&admin)?;
Ok(())
}
#[cfg(feature = "password-hashing")]
#[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
fn set_user_password<'user, U: Nameable<'user, u64> + Send + Sync>(
&self,
user: U,
password: bonsaidb_core::connection::SensitiveString,
) -> Result<(), bonsaidb_core::Error> {
let admin = self.admin();
let mut user = User::load(user, &admin)?.ok_or(bonsaidb_core::Error::UserNotFound)?;
user.contents.argon_hash = Some(self.data.argon.hash(user.header.id, password)?);
user.update(&admin)
}
#[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
#[cfg(any(feature = "token-authentication", feature = "password-hashing"))]
fn authenticate(
&self,
authentication: bonsaidb_core::connection::Authentication,
) -> Result<Self::Authenticated, bonsaidb_core::Error> {
let admin = self.admin();
self.authenticate_inner(authentication, None, None, &admin)
.map(Storage::from)
}
#[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
fn assume_identity(
&self,
identity: IdentityReference<'_>,
) -> Result<Self::Authenticated, bonsaidb_core::Error> {
let admin = self.admin();
match identity {
IdentityReference::User(user) => {
let user =
User::load(user, &admin)?.ok_or(bonsaidb_core::Error::InvalidCredentials)?;
self.assume_user(user, &admin).map(Storage::from)
}
IdentityReference::Role(role) => {
let role =
Role::load(role, &admin)?.ok_or(bonsaidb_core::Error::InvalidCredentials)?;
self.assume_role(role, &admin).map(Storage::from)
}
_ => Err(bonsaidb_core::Error::InvalidCredentials),
}
}
#[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
fn add_permission_group_to_user<
'user,
'group,
U: Nameable<'user, u64> + Send + Sync,
G: Nameable<'group, u64> + Send + Sync,
>(
&self,
user: U,
permission_group: G,
) -> Result<(), bonsaidb_core::Error> {
self.update_user_with_named_id::<PermissionGroup, _, _, _>(
user,
permission_group,
|user, permission_group_id| {
Ok(Self::add_permission_group_to_user_inner(
user,
permission_group_id,
))
},
)
}
#[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
fn remove_permission_group_from_user<
'user,
'group,
U: Nameable<'user, u64> + Send + Sync,
G: Nameable<'group, u64> + Send + Sync,
>(
&self,
user: U,
permission_group: G,
) -> Result<(), bonsaidb_core::Error> {
self.update_user_with_named_id::<PermissionGroup, _, _, _>(
user,
permission_group,
|user, permission_group_id| {
Ok(Self::remove_permission_group_from_user_inner(
user,
permission_group_id,
))
},
)
}
#[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
fn add_role_to_user<
'user,
'group,
U: Nameable<'user, u64> + Send + Sync,
G: Nameable<'group, u64> + Send + Sync,
>(
&self,
user: U,
role: G,
) -> Result<(), bonsaidb_core::Error> {
self.update_user_with_named_id::<PermissionGroup, _, _, _>(user, role, |user, role_id| {
Ok(Self::add_role_to_user_inner(user, role_id))
})
}
#[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
fn remove_role_from_user<
'user,
'group,
U: Nameable<'user, u64> + Send + Sync,
G: Nameable<'group, u64> + Send + Sync,
>(
&self,
user: U,
role: G,
) -> Result<(), bonsaidb_core::Error> {
self.update_user_with_named_id::<Role, _, _, _>(user, role, |user, role_id| {
Ok(Self::remove_role_from_user_inner(user, role_id))
})
}
}
impl HasSession for Storage {
fn session(&self) -> Option<&Session> {
self.effective_session.as_deref()
}
}
impl StorageConnection for Storage {
type Authenticated = Self;
type Database = Database;
fn admin(&self) -> Self::Database {
self.instance.admin()
}
fn create_database_with_schema(
&self,
name: &str,
schema: SchemaName,
only_if_needed: bool,
) -> Result<(), bonsaidb_core::Error> {
self.check_permission(
database_resource_name(name),
&BonsaiAction::Server(ServerAction::CreateDatabase),
)?;
self.instance
.create_database_with_schema(name, schema, only_if_needed)
}
fn database<DB: Schema>(&self, name: &str) -> Result<Self::Database, bonsaidb_core::Error> {
self.instance.database::<DB>(name)
}
fn delete_database(&self, name: &str) -> Result<(), bonsaidb_core::Error> {
self.check_permission(
database_resource_name(name),
&BonsaiAction::Server(ServerAction::DeleteDatabase),
)?;
self.instance.delete_database(name)
}
fn list_databases(&self) -> Result<Vec<connection::Database>, bonsaidb_core::Error> {
self.check_permission(
bonsaidb_resource_name(),
&BonsaiAction::Server(ServerAction::ListDatabases),
)?;
self.instance.list_databases()
}
fn list_available_schemas(&self) -> Result<Vec<SchemaSummary>, bonsaidb_core::Error> {
self.check_permission(
bonsaidb_resource_name(),
&BonsaiAction::Server(ServerAction::ListAvailableSchemas),
)?;
self.instance.list_available_schemas()
}
fn create_user(&self, username: &str) -> Result<u64, bonsaidb_core::Error> {
self.check_permission(
bonsaidb_resource_name(),
&BonsaiAction::Server(ServerAction::CreateUser),
)?;
self.instance.create_user(username)
}
fn delete_user<'user, U: Nameable<'user, u64> + Send + Sync>(
&self,
user: U,
) -> Result<(), bonsaidb_core::Error> {
let admin = self.admin();
let user = user.name()?;
let user_id = user
.id::<User, _>(&admin)?
.ok_or(bonsaidb_core::Error::UserNotFound)?;
self.check_permission(
user_resource_name(user_id),
&BonsaiAction::Server(ServerAction::DeleteUser),
)?;
self.instance.delete_user(user)
}
#[cfg(feature = "password-hashing")]
fn set_user_password<'user, U: Nameable<'user, u64> + Send + Sync>(
&self,
user: U,
password: bonsaidb_core::connection::SensitiveString,
) -> Result<(), bonsaidb_core::Error> {
let admin = self.admin();
let user = user.name()?;
let user_id = user
.id::<User, _>(&admin)?
.ok_or(bonsaidb_core::Error::UserNotFound)?;
self.check_permission(
user_resource_name(user_id),
&BonsaiAction::Server(ServerAction::SetPassword),
)?;
self.instance.set_user_password(user, password)
}
#[cfg(any(feature = "token-authentication", feature = "password-hashing"))]
#[cfg_attr(not(feature = "token-authentication"), allow(unused_assignments))]
#[cfg_attr(not(feature = "password-hashing"), allow(unused_mut))]
fn authenticate(
&self,
authentication: bonsaidb_core::connection::Authentication,
) -> Result<Self, bonsaidb_core::Error> {
let admin = self.admin();
let mut loaded_user = None;
match &authentication {
#[cfg(feature = "token-authentication")]
bonsaidb_core::connection::Authentication::Token { id, .. } => {
self.check_permission(
bonsaidb_core::permissions::bonsai::authentication_token_resource_name(*id),
&BonsaiAction::Server(ServerAction::Authenticate(
bonsaidb_core::connection::AuthenticationMethod::Token,
)),
)?;
}
#[cfg(feature = "password-hashing")]
bonsaidb_core::connection::Authentication::Password { user, .. } => {
let user =
User::load(user, &admin)?.ok_or(bonsaidb_core::Error::InvalidCredentials)?;
self.check_permission(
user_resource_name(user.header.id),
&BonsaiAction::Server(ServerAction::Authenticate(
bonsaidb_core::connection::AuthenticationMethod::PasswordHash,
)),
)?;
loaded_user = Some(user);
}
#[cfg(feature = "token-authentication")]
bonsaidb_core::connection::Authentication::TokenChallengeResponse(_) => {}
}
self.instance.authenticate_inner(
authentication,
loaded_user,
self.authentication
.as_ref()
.and_then(|auth| auth.session.lock().id),
&admin,
)
}
fn assume_identity(
&self,
identity: IdentityReference<'_>,
) -> Result<Self::Authenticated, bonsaidb_core::Error> {
match identity {
IdentityReference::User(user) => {
let admin = self.admin();
let user =
User::load(user, &admin)?.ok_or(bonsaidb_core::Error::InvalidCredentials)?;
self.check_permission(
user_resource_name(user.header.id),
&BonsaiAction::Server(ServerAction::AssumeIdentity),
)?;
self.instance.assume_user(user, &admin)
}
IdentityReference::Role(role) => {
let admin = self.admin();
let role =
Role::load(role, &admin)?.ok_or(bonsaidb_core::Error::InvalidCredentials)?;
self.check_permission(
role_resource_name(role.header.id),
&BonsaiAction::Server(ServerAction::AssumeIdentity),
)?;
self.instance.assume_role(role, &admin)
}
_ => Err(bonsaidb_core::Error::InvalidCredentials),
}
}
fn add_permission_group_to_user<
'user,
'group,
U: Nameable<'user, u64> + Send + Sync,
G: Nameable<'group, u64> + Send + Sync,
>(
&self,
user: U,
permission_group: G,
) -> Result<(), bonsaidb_core::Error> {
self.instance
.update_user_with_named_id::<PermissionGroup, _, _, _>(
user,
permission_group,
|user, permission_group_id| {
self.check_permission(
user_resource_name(user.header.id),
&BonsaiAction::Server(ServerAction::ModifyUserPermissionGroups),
)?;
Ok(StorageInstance::add_permission_group_to_user_inner(
user,
permission_group_id,
))
},
)
}
fn remove_permission_group_from_user<
'user,
'group,
U: Nameable<'user, u64> + Send + Sync,
G: Nameable<'group, u64> + Send + Sync,
>(
&self,
user: U,
permission_group: G,
) -> Result<(), bonsaidb_core::Error> {
self.instance
.update_user_with_named_id::<PermissionGroup, _, _, _>(
user,
permission_group,
|user, permission_group_id| {
self.check_permission(
user_resource_name(user.header.id),
&BonsaiAction::Server(ServerAction::ModifyUserPermissionGroups),
)?;
Ok(StorageInstance::remove_permission_group_from_user_inner(
user,
permission_group_id,
))
},
)
}
fn add_role_to_user<
'user,
'group,
U: Nameable<'user, u64> + Send + Sync,
G: Nameable<'group, u64> + Send + Sync,
>(
&self,
user: U,
role: G,
) -> Result<(), bonsaidb_core::Error> {
self.instance
.update_user_with_named_id::<PermissionGroup, _, _, _>(user, role, |user, role_id| {
self.check_permission(
user_resource_name(user.header.id),
&BonsaiAction::Server(ServerAction::ModifyUserRoles),
)?;
Ok(StorageInstance::add_role_to_user_inner(user, role_id))
})
}
fn remove_role_from_user<
'user,
'group,
U: Nameable<'user, u64> + Send + Sync,
G: Nameable<'group, u64> + Send + Sync,
>(
&self,
user: U,
role: G,
) -> Result<(), bonsaidb_core::Error> {
self.instance
.update_user_with_named_id::<Role, _, _, _>(user, role, |user, role_id| {
self.check_permission(
user_resource_name(user.header.id),
&BonsaiAction::Server(ServerAction::ModifyUserRoles),
)?;
Ok(StorageInstance::remove_role_from_user_inner(user, role_id))
})
}
}
#[test]
fn name_validation_tests() {
assert!(matches!(Storage::validate_name("azAZ09.-"), Ok(())));
assert!(matches!(
Storage::validate_name("_internal-names-work"),
Ok(())
));
assert!(matches!(
Storage::validate_name("-alphaunmericfirstrequired"),
Err(Error::Core(bonsaidb_core::Error::InvalidDatabaseName(_)))
));
assert!(matches!(
Storage::validate_name("\u{2661}"),
Err(Error::Core(bonsaidb_core::Error::InvalidDatabaseName(_)))
));
}
#[derive(Clone, Copy, Eq, PartialEq, Hash)]
pub struct StorageId(u64);
impl StorageId {
#[must_use]
pub const fn as_u64(self) -> u64 {
self.0
}
}
impl Debug for StorageId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:016x}", self.0)
}
}
impl Display for StorageId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Debug::fmt(self, f)
}
}
#[derive(Debug, Clone)]
#[cfg(any(feature = "compression", feature = "encryption"))]
pub(crate) struct TreeVault {
#[cfg(feature = "compression")]
compression: Option<Compression>,
#[cfg(feature = "encryption")]
pub key: Option<KeyId>,
#[cfg(feature = "encryption")]
pub vault: Arc<Vault>,
}
#[cfg(all(feature = "compression", feature = "encryption"))]
impl TreeVault {
pub(crate) fn new_if_needed(
key: Option<KeyId>,
vault: &Arc<Vault>,
compression: Option<Compression>,
) -> Option<Self> {
if key.is_none() && compression.is_none() {
None
} else {
Some(Self {
key,
compression,
vault: vault.clone(),
})
}
}
fn header(&self, compressed: bool) -> u8 {
let mut bits = if self.key.is_some() { 0b1000_0000 } else { 0 };
if compressed {
if let Some(compression) = self.compression {
bits |= compression as u8;
}
}
bits
}
}
#[cfg(all(feature = "compression", feature = "encryption"))]
impl nebari::Vault for TreeVault {
type Error = Error;
fn encrypt(&self, payload: &[u8]) -> Result<Vec<u8>, Error> {
let mut includes_compression = false;
let compressed = match (payload.len(), self.compression) {
(128..=usize::MAX, Some(Compression::Lz4)) => {
includes_compression = true;
Cow::Owned(lz4_flex::block::compress_prepend_size(payload))
}
_ => Cow::Borrowed(payload),
};
let mut complete = if let Some(key) = &self.key {
self.vault.encrypt_payload(key, &compressed, None)?
} else {
compressed.into_owned()
};
let header = self.header(includes_compression);
if header != 0 {
let header = [b't', b'r', b'v', header];
complete.splice(0..0, header);
}
Ok(complete)
}
fn decrypt(&self, payload: &[u8]) -> Result<Vec<u8>, Error> {
if payload.len() >= 4 && &payload[0..3] == b"trv" {
let header = payload[3];
let payload = &payload[4..];
let encrypted = (header & 0b1000_0000) != 0;
let compression = header & 0b0111_1111;
let decrypted = if encrypted {
Cow::Owned(self.vault.decrypt_payload(payload, None)?)
} else {
Cow::Borrowed(payload)
};
#[allow(clippy::single_match)] return Ok(match Compression::from_u8(compression) {
Some(Compression::Lz4) => {
lz4_flex::block::decompress_size_prepended(&decrypted).map_err(Error::from)?
}
None => decrypted.into_owned(),
});
}
self.vault.decrypt_payload(payload, None)
}
}
pub trait StorageNonBlocking: Sized {
#[must_use]
fn path(&self) -> &Path;
fn assume_session(&self, session: Session) -> Result<Self, bonsaidb_core::Error>;
}
impl StorageNonBlocking for Storage {
fn path(&self) -> &Path {
&self.instance.data.path
}
fn assume_session(&self, session: Session) -> Result<Storage, bonsaidb_core::Error> {
if self.authentication.is_some() {
return Err(bonsaidb_core::Error::InvalidCredentials);
}
let Some(session_id) = session.id else {
return Ok(Self {
instance: self.instance.clone(),
authentication: None,
effective_session: Some(Arc::new(session)),
});
};
let session_data = self.instance.data.sessions.read();
let authentication = session_data
.sessions
.get(&session_id)
.ok_or(bonsaidb_core::Error::InvalidCredentials)?;
let authentication_session = authentication.session.lock();
let effective_permissions =
Permissions::merged([&session.permissions, &authentication_session.permissions]);
let effective_session = Session {
id: authentication_session.id,
authentication: authentication_session.authentication.clone(),
permissions: effective_permissions,
};
Ok(Self {
instance: self.instance.clone(),
authentication: Some(authentication.clone()),
effective_session: Some(Arc::new(effective_session)),
})
}
}
#[cfg(all(feature = "compression", not(feature = "encryption")))]
impl TreeVault {
pub(crate) fn new_if_needed(compression: Option<Compression>) -> Option<Self> {
compression.map(|compression| Self {
compression: Some(compression),
})
}
}
#[cfg(all(feature = "compression", not(feature = "encryption")))]
impl nebari::Vault for TreeVault {
type Error = Error;
fn encrypt(&self, payload: &[u8]) -> Result<Vec<u8>, Error> {
Ok(match (payload.len(), self.compression) {
(128..=usize::MAX, Some(Compression::Lz4)) => {
let mut destination =
vec![0; lz4_flex::block::get_maximum_output_size(payload.len()) + 8];
let compressed_length =
lz4_flex::block::compress_into(payload, &mut destination[8..])
.expect("lz4-flex documents this shouldn't fail");
destination.truncate(compressed_length + 8);
destination[0..4].copy_from_slice(&[b't', b'r', b'v', Compression::Lz4 as u8]);
let uncompressed_length =
u32::try_from(payload.len()).expect("nebari doesn't support >32 bit blocks");
destination[4..8].copy_from_slice(&uncompressed_length.to_le_bytes());
destination
}
_ => payload.to_vec(),
})
}
fn decrypt(&self, payload: &[u8]) -> Result<Vec<u8>, Error> {
if payload.len() >= 4 && &payload[0..3] == b"trv" {
let header = payload[3];
let payload = &payload[4..];
let encrypted = (header & 0b1000_0000) != 0;
let compression = header & 0b0111_1111;
if encrypted {
return Err(Error::EncryptionDisabled);
}
#[allow(clippy::single_match)] return Ok(match Compression::from_u8(compression) {
Some(Compression::Lz4) => {
lz4_flex::block::decompress_size_prepended(payload).map_err(Error::from)?
}
None => payload.to_vec(),
});
}
Ok(payload.to_vec())
}
}
#[cfg(all(not(feature = "compression"), feature = "encryption"))]
impl TreeVault {
pub(crate) fn new_if_needed(key: Option<KeyId>, vault: &Arc<Vault>) -> Option<Self> {
key.map(|key| Self {
key: Some(key),
vault: vault.clone(),
})
}
#[allow(dead_code)] fn header(&self) -> u8 {
if self.key.is_some() {
0b1000_0000
} else {
0
}
}
}
#[cfg(all(not(feature = "compression"), feature = "encryption"))]
impl nebari::Vault for TreeVault {
type Error = Error;
fn encrypt(&self, payload: &[u8]) -> Result<Vec<u8>, Error> {
if let Some(key) = &self.key {
self.vault.encrypt_payload(key, payload, None)
} else {
Ok(payload.to_vec())
}
}
fn decrypt(&self, payload: &[u8]) -> Result<Vec<u8>, Error> {
self.vault.decrypt_payload(payload, None)
}
}
#[derive(Clone, Debug)]
pub struct StorageLock(StorageId, Arc<LockData>);
impl StorageLock {
pub const fn id(&self) -> StorageId {
self.0
}
}
#[derive(Debug)]
struct LockData(File);
impl StorageLock {
fn new(id: StorageId, file: File) -> Self {
Self(id, Arc::new(LockData(file)))
}
}
impl Drop for LockData {
fn drop(&mut self) {
drop(self.0.unlock());
}
}