use std::borrow::Cow;
use std::collections::{HashMap, HashSet};
use std::fmt::{Debug, Display};
use std::fs::{self, File};
use std::io::{Read, Write};
use std::marker::PhantomData;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Weak};
use bonsaidb_core::admin::database::{self, ByName, Database as DatabaseRecord};
use bonsaidb_core::admin::user::User;
use bonsaidb_core::admin::{self, Admin, PermissionGroup, Role, ADMIN_DATABASE_NAME};
use bonsaidb_core::circulate;
pub use bonsaidb_core::circulate::Relay;
use bonsaidb_core::connection::{
    self, Connection, HasSession, Identity, IdentityReference, LowLevelConnection, Session,
    SessionAuthentication, SessionId, StorageConnection,
};
use bonsaidb_core::document::CollectionDocument;
#[cfg(any(feature = "encryption", feature = "compression"))]
use bonsaidb_core::document::KeyId;
use bonsaidb_core::permissions::bonsai::{
    bonsaidb_resource_name, database_resource_name, role_resource_name, user_resource_name,
    BonsaiAction, ServerAction,
};
use bonsaidb_core::permissions::Permissions;
use bonsaidb_core::schema::{
    Nameable, NamedCollection, Schema, SchemaName, SchemaSummary, Schematic,
};
use fs2::FileExt;
use itertools::Itertools;
use nebari::io::any::{AnyFile, AnyFileManager};
use nebari::io::FileManager;
use nebari::{ChunkCache, ThreadPool};
use parking_lot::{Mutex, RwLock};
use rand::{thread_rng, Rng};
#[cfg(feature = "compression")]
use crate::config::Compression;
use crate::config::{KeyValuePersistence, StorageConfiguration};
use crate::database::Context;
use crate::tasks::manager::Manager;
use crate::tasks::TaskManager;
#[cfg(feature = "encryption")]
use crate::vault::{self, LocalVaultKeyStorage, Vault};
use crate::{Database, Error};
#[cfg(feature = "password-hashing")]
mod argon;
#[cfg(feature = "token-authentication")]
mod token_authentication;
mod backup;
mod pubsub;
pub use backup::{AnyBackupLocation, BackupLocation};
#[derive(Debug, Clone)]
#[must_use]
pub struct Storage {
    pub(crate) instance: StorageInstance,
    pub(crate) authentication: Option<Arc<AuthenticatedSession>>,
    effective_session: Option<Arc<Session>>,
}
#[derive(Debug)]
pub struct AuthenticatedSession {
    storage: Weak<Data>,
    pub session: Mutex<Session>,
}
#[derive(Debug, Default)]
pub struct SessionSubscribers {
    pub subscribers: HashMap<u64, SessionSubscriber>,
    pub subscribers_by_session: HashMap<SessionId, HashSet<u64>>,
    pub last_id: u64,
}
impl SessionSubscribers {
    pub fn unregister(&mut self, subscriber_id: u64) {
        if let Some(session_id) = self
            .subscribers
            .remove(&subscriber_id)
            .and_then(|sub| sub.session_id)
        {
            if let Some(session_subscribers) = self.subscribers_by_session.get_mut(&session_id) {
                session_subscribers.remove(&subscriber_id);
            }
        }
    }
}
#[derive(Debug)]
pub struct SessionSubscriber {
    pub session_id: Option<SessionId>,
    pub subscriber: circulate::Subscriber,
}
impl Drop for AuthenticatedSession {
    fn drop(&mut self) {
        let mut session = self.session.lock();
        if let Some(id) = session.id.take() {
            if let Some(storage) = self.storage.upgrade() {
                let mut sessions = storage.sessions.write();
                sessions.sessions.remove(&id);
                let mut sessions = storage.subscribers.write();
                for id in sessions
                    .subscribers_by_session
                    .remove(&id)
                    .into_iter()
                    .flatten()
                {
                    sessions.subscribers.remove(&id);
                }
            }
        }
    }
}
#[derive(Debug, Default)]
struct AuthenticatedSessions {
    sessions: HashMap<SessionId, Arc<AuthenticatedSession>>,
    last_session_id: u64,
}
#[derive(Debug, Clone)]
pub struct StorageInstance {
    data: Arc<Data>,
}
impl From<StorageInstance> for Storage {
    fn from(instance: StorageInstance) -> Self {
        Self {
            instance,
            authentication: None,
            effective_session: None,
        }
    }
}
struct Data {
    lock: StorageLock,
    path: PathBuf,
    parallelization: usize,
    threadpool: ThreadPool<AnyFile>,
    file_manager: AnyFileManager,
    pub(crate) tasks: TaskManager,
    schemas: RwLock<HashMap<SchemaName, Arc<dyn DatabaseOpener>>>,
    available_databases: RwLock<HashMap<String, SchemaName>>,
    open_roots: Mutex<HashMap<String, Context>>,
    authenticated_permissions: Permissions,
    sessions: RwLock<AuthenticatedSessions>,
    pub(crate) subscribers: Arc<RwLock<SessionSubscribers>>,
    #[cfg(feature = "password-hashing")]
    argon: argon::Hasher,
    #[cfg(feature = "encryption")]
    pub(crate) vault: Arc<Vault>,
    #[cfg(feature = "encryption")]
    default_encryption_key: Option<KeyId>,
    #[cfg(any(feature = "compression", feature = "encryption"))]
    tree_vault: Option<TreeVault>,
    pub(crate) key_value_persistence: KeyValuePersistence,
    chunk_cache: ChunkCache,
    pub(crate) check_view_integrity_on_database_open: bool,
    relay: Relay,
}
impl Storage {
    pub fn open(configuration: StorageConfiguration) -> Result<Self, Error> {
        let owned_path = configuration
            .path
            .clone()
            .unwrap_or_else(|| PathBuf::from("db.bonsaidb"));
        let file_manager = if configuration.memory_only {
            AnyFileManager::memory()
        } else {
            AnyFileManager::std()
        };
        let manager = Manager::default();
        for _ in 0..configuration.workers.worker_count {
            manager.spawn_worker();
        }
        let tasks = TaskManager::new(manager);
        fs::create_dir_all(&owned_path)?;
        let storage_lock = Self::lookup_or_create_id(&configuration, &owned_path)?;
        #[cfg(feature = "encryption")]
        let vault = {
            let vault_key_storage = match configuration.vault_key_storage {
                Some(storage) => storage,
                None => Arc::new(
                    LocalVaultKeyStorage::new(owned_path.join("vault-keys"))
                        .map_err(|err| Error::Vault(vault::Error::Initializing(err.to_string())))?,
                ),
            };
            Arc::new(Vault::initialize(
                storage_lock.id(),
                &owned_path,
                vault_key_storage,
            )?)
        };
        let parallelization = configuration.workers.parallelization;
        let check_view_integrity_on_database_open = configuration.views.check_integrity_on_open;
        let key_value_persistence = configuration.key_value_persistence;
        #[cfg(feature = "password-hashing")]
        let argon = argon::Hasher::new(configuration.argon);
        #[cfg(feature = "encryption")]
        let default_encryption_key = configuration.default_encryption_key;
        #[cfg(all(feature = "compression", feature = "encryption"))]
        let tree_vault = TreeVault::new_if_needed(
            default_encryption_key.clone(),
            &vault,
            configuration.default_compression,
        );
        #[cfg(all(not(feature = "compression"), feature = "encryption"))]
        let tree_vault = TreeVault::new_if_needed(default_encryption_key.clone(), &vault);
        #[cfg(all(feature = "compression", not(feature = "encryption")))]
        let tree_vault = TreeVault::new_if_needed(configuration.default_compression);
        let authenticated_permissions = configuration.authenticated_permissions;
        let storage = Self {
            instance: StorageInstance {
                data: Arc::new(Data {
                    lock: storage_lock,
                    tasks,
                    parallelization,
                    subscribers: Arc::default(),
                    authenticated_permissions,
                    sessions: RwLock::default(),
                    #[cfg(feature = "password-hashing")]
                    argon,
                    #[cfg(feature = "encryption")]
                    vault,
                    #[cfg(feature = "encryption")]
                    default_encryption_key,
                    #[cfg(any(feature = "compression", feature = "encryption"))]
                    tree_vault,
                    path: owned_path,
                    file_manager,
                    chunk_cache: ChunkCache::new(2000, 160_384),
                    threadpool: ThreadPool::new(parallelization),
                    schemas: RwLock::new(configuration.initial_schemas),
                    available_databases: RwLock::default(),
                    open_roots: Mutex::default(),
                    key_value_persistence,
                    check_view_integrity_on_database_open,
                    relay: Relay::default(),
                }),
            },
            authentication: None,
            effective_session: None,
        };
        storage.cache_available_databases()?;
        storage.create_admin_database_if_needed()?;
        Ok(storage)
    }
    #[cfg(feature = "internal-apis")]
    #[doc(hidden)]
    pub fn database_without_schema(&self, name: &str) -> Result<Database, Error> {
        let name = name.to_owned();
        self.instance
            .database_without_schema(&name, Some(self), None)
    }
    fn lookup_or_create_id(
        configuration: &StorageConfiguration,
        path: &Path,
    ) -> Result<StorageLock, Error> {
        let id_path = {
            let storage_id = path.join("server-id");
            if storage_id.exists() {
                storage_id
            } else {
                path.join("storage-id")
            }
        };
        let (id, file) = if let Some(id) = configuration.unique_id {
            let file = if id_path.exists() {
                File::open(id_path)?
            } else {
                let mut file = File::create(id_path)?;
                let id = id.to_string();
                file.write_all(id.as_bytes())?;
                file
            };
            file.lock_exclusive()?;
            (id, file)
        } else {
            if id_path.exists() {
                let mut file = File::open(id_path)?;
                file.lock_exclusive()?;
                let mut bytes = Vec::new();
                file.read_to_end(&mut bytes)?;
                let existing_id =
                    String::from_utf8(bytes).expect("server-id contains invalid data");
                (existing_id.parse().expect("server-id isn't numeric"), file)
            } else {
                let id = { thread_rng().gen::<u64>() };
                let mut file = File::create(id_path)?;
                file.lock_exclusive()?;
                file.write_all(id.to_string().as_bytes())?;
                (id, file)
            }
        };
        Ok(StorageLock::new(StorageId(id), file))
    }
    fn cache_available_databases(&self) -> Result<(), Error> {
        let available_databases = self
            .admin()
            .view::<ByName>()
            .query()?
            .into_iter()
            .map(|map| (map.key, map.value))
            .collect();
        let mut storage_databases = self.instance.data.available_databases.write();
        *storage_databases = available_databases;
        Ok(())
    }
    fn create_admin_database_if_needed(&self) -> Result<(), Error> {
        self.register_schema::<Admin>()?;
        match self.database::<Admin>(ADMIN_DATABASE_NAME) {
            Ok(_) => {}
            Err(bonsaidb_core::Error::DatabaseNotFound(_)) => {
                drop(self.create_database::<Admin>(ADMIN_DATABASE_NAME, true)?);
            }
            Err(err) => return Err(Error::Core(err)),
        }
        Ok(())
    }
    #[must_use]
    pub fn unique_id(&self) -> StorageId {
        self.instance.data.lock.id()
    }
    #[must_use]
    pub(crate) fn parallelization(&self) -> usize {
        self.instance.data.parallelization
    }
    #[must_use]
    #[cfg(feature = "encryption")]
    pub(crate) fn vault(&self) -> &Arc<Vault> {
        &self.instance.data.vault
    }
    #[must_use]
    #[cfg(any(feature = "encryption", feature = "compression"))]
    pub(crate) fn tree_vault(&self) -> Option<&TreeVault> {
        self.instance.data.tree_vault.as_ref()
    }
    #[must_use]
    #[cfg(feature = "encryption")]
    pub(crate) fn default_encryption_key(&self) -> Option<&KeyId> {
        self.instance.data.default_encryption_key.as_ref()
    }
    #[must_use]
    #[cfg(all(feature = "compression", not(feature = "encryption")))]
    #[allow(clippy::unused_self)]
    pub(crate) fn default_encryption_key(&self) -> Option<&KeyId> {
        None
    }
    pub fn register_schema<DB: Schema>(&self) -> Result<(), Error> {
        let mut schemas = self.instance.data.schemas.write();
        if schemas
            .insert(
                DB::schema_name(),
                Arc::new(StorageSchemaOpener::<DB>::new()?),
            )
            .is_none()
        {
            Ok(())
        } else {
            Err(Error::Core(bonsaidb_core::Error::SchemaAlreadyRegistered(
                DB::schema_name(),
            )))
        }
    }
    fn validate_name(name: &str) -> Result<(), Error> {
        if name.chars().enumerate().all(|(index, c)| {
            c.is_ascii_alphanumeric()
                || (index == 0 && c == '_')
                || (index > 0 && (c == '.' || c == '-'))
        }) {
            Ok(())
        } else {
            Err(Error::Core(bonsaidb_core::Error::InvalidDatabaseName(
                name.to_owned(),
            )))
        }
    }
    #[must_use]
    pub fn with_effective_permissions(&self, effective_permissions: Permissions) -> Option<Self> {
        if self.effective_session.is_some() {
            None
        } else {
            Some(Self {
                instance: self.instance.clone(),
                authentication: self.authentication.clone(),
                effective_session: Some(Arc::new(Session {
                    id: None,
                    authentication: SessionAuthentication::None,
                    permissions: effective_permissions,
                })),
            })
        }
    }
    #[cfg(feature = "async")]
    pub fn into_async(self) -> crate::AsyncStorage {
        self.into_async_with_runtime(tokio::runtime::Handle::current())
    }
    #[cfg(feature = "async")]
    pub fn into_async_with_runtime(self, runtime: tokio::runtime::Handle) -> crate::AsyncStorage {
        crate::AsyncStorage {
            storage: self,
            runtime: Arc::new(runtime),
        }
    }
    #[cfg(feature = "async")]
    pub fn to_async(&self) -> crate::AsyncStorage {
        self.clone().into_async()
    }
    #[cfg(feature = "async")]
    pub fn to_async_with_runtime(&self, runtime: tokio::runtime::Handle) -> crate::AsyncStorage {
        self.clone().into_async_with_runtime(runtime)
    }
}
impl Debug for Data {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        let mut f = f.debug_struct("Data");
        f.field("lock", &self.lock)
            .field("path", &self.path)
            .field("parallelization", &self.parallelization)
            .field("threadpool", &self.threadpool)
            .field("file_manager", &self.file_manager)
            .field("tasks", &self.tasks)
            .field("available_databases", &self.available_databases)
            .field("open_roots", &self.open_roots)
            .field("authenticated_permissions", &self.authenticated_permissions)
            .field("sessions", &self.sessions)
            .field("subscribers", &self.subscribers)
            .field("key_value_persistence", &self.key_value_persistence)
            .field("chunk_cache", &self.chunk_cache)
            .field(
                "check_view_integrity_on_database_open",
                &self.check_view_integrity_on_database_open,
            )
            .field("relay", &self.relay);
        if let Some(schemas) = self.schemas.try_read() {
            let mut schemas = schemas.keys().collect::<Vec<_>>();
            schemas.sort();
            f.field("schemas", &schemas);
        } else {
            f.field("schemas", &"RwLock locked");
        }
        #[cfg(feature = "password-hashing")]
        f.field("argon", &self.argon);
        #[cfg(feature = "encryption")]
        {
            f.field("vault", &self.vault)
                .field("default_encryption_key", &self.default_encryption_key);
        }
        #[cfg(any(feature = "compression", feature = "encryption"))]
        f.field("tree_vault", &self.tree_vault);
        f.finish()
    }
}
impl StorageInstance {
    #[cfg_attr(
        not(any(feature = "encryption", feature = "compression")),
        allow(unused_mut)
    )]
    pub(crate) fn open_roots(&self, name: &str) -> Result<Context, Error> {
        let mut open_roots = self.data.open_roots.lock();
        if let Some(roots) = open_roots.get(name) {
            Ok(roots.clone())
        } else {
            let task_name = name.to_string();
            let mut config = nebari::Config::new(self.data.path.join(task_name))
                .file_manager(self.data.file_manager.clone())
                .cache(self.data.chunk_cache.clone())
                .shared_thread_pool(&self.data.threadpool);
            #[cfg(any(feature = "encryption", feature = "compression"))]
            if let Some(vault) = self.data.tree_vault.clone() {
                config = config.vault(vault);
            }
            let roots = config.open().map_err(Error::from)?;
            let context = Context::new(
                roots,
                self.data.key_value_persistence.clone(),
                Some(self.data.lock.clone()),
            );
            open_roots.insert(name.to_owned(), context.clone());
            Ok(context)
        }
    }
    pub(crate) fn tasks(&self) -> &'_ TaskManager {
        &self.data.tasks
    }
    pub(crate) fn check_view_integrity_on_database_open(&self) -> bool {
        self.data.check_view_integrity_on_database_open
    }
    pub(crate) fn relay(&self) -> &'_ Relay {
        &self.data.relay
    }
    pub(crate) fn database_without_schema(
        &self,
        name: &str,
        storage: Option<&Storage>,
        expected_schema: Option<SchemaName>,
    ) -> Result<Database, Error> {
        let stored_schema = {
            let available_databases = self.data.available_databases.read();
            available_databases
                .get(name)
                .ok_or_else(|| {
                    Error::Core(bonsaidb_core::Error::DatabaseNotFound(name.to_string()))
                })?
                .clone()
        };
        if let Some(expected_schema) = expected_schema {
            if stored_schema != expected_schema {
                return Err(Error::Core(bonsaidb_core::Error::SchemaMismatch {
                    database_name: name.to_owned(),
                    schema: expected_schema,
                    stored_schema,
                }));
            }
        }
        let mut schemas = self.data.schemas.write();
        let storage =
            storage.map_or_else(|| Cow::Owned(Storage::from(self.clone())), Cow::Borrowed);
        if let Some(schema) = schemas.get_mut(&stored_schema) {
            let db = schema.open(name.to_string(), storage.as_ref())?;
            Ok(db)
        } else {
            Err(Error::Core(bonsaidb_core::Error::SchemaNotRegistered(
                stored_schema,
            )))
        }
    }
    fn update_user_with_named_id<
        'user,
        'other,
        Col: NamedCollection<PrimaryKey = u64>,
        U: Nameable<'user, u64> + Send + Sync,
        O: Nameable<'other, u64> + Send + Sync,
        F: FnOnce(&mut CollectionDocument<User>, u64) -> Result<bool, bonsaidb_core::Error>,
    >(
        &self,
        user: U,
        other: O,
        callback: F,
    ) -> Result<(), bonsaidb_core::Error> {
        let admin = self.admin();
        let other = other.name()?;
        let user = User::load(user.name()?, &admin)?;
        let other = other.id::<Col, _>(&admin)?;
        match (user, other) {
            (Some(mut user), Some(other)) => {
                if callback(&mut user, other)? {
                    user.update(&admin)?;
                }
                Ok(())
            }
            _ => Err(bonsaidb_core::Error::UserNotFound),
        }
    }
    #[cfg(any(feature = "token-authentication", feature = "password-hashing"))]
    #[cfg_attr(
        any(
            not(feature = "token-authentication"),
            not(feature = "password-hashing")
        ),
        allow(unused_variables, clippy::needless_pass_by_value)
    )]
    fn authenticate_inner(
        &self,
        authentication: bonsaidb_core::connection::Authentication,
        loaded_user: Option<CollectionDocument<User>>,
        current_session_id: Option<SessionId>,
        admin: &Database,
    ) -> Result<Storage, bonsaidb_core::Error> {
        use bonsaidb_core::connection::Authentication;
        match authentication {
            #[cfg(feature = "token-authentication")]
            Authentication::Token {
                id,
                now,
                now_hash,
                algorithm,
            } => self.begin_token_authentication(id, now, &now_hash, algorithm, admin),
            #[cfg(feature = "token-authentication")]
            Authentication::TokenChallengeResponse(hash) => {
                let session_id =
                    current_session_id.ok_or(bonsaidb_core::Error::InvalidCredentials)?;
                self.finish_token_authentication(session_id, &hash, admin)
            }
            #[cfg(feature = "password-hashing")]
            Authentication::Password { user, password } => {
                let user = match loaded_user {
                    Some(user) => user,
                    None => {
                        User::load(user, admin)?.ok_or(bonsaidb_core::Error::InvalidCredentials)?
                    }
                };
                let saved_hash = user
                    .contents
                    .argon_hash
                    .clone()
                    .ok_or(bonsaidb_core::Error::InvalidCredentials)?;
                self.data
                    .argon
                    .verify(user.header.id, password, saved_hash)?;
                self.assume_user(user, admin)
            }
        }
    }
    fn assume_user(
        &self,
        user: CollectionDocument<User>,
        admin: &Database,
    ) -> Result<Storage, bonsaidb_core::Error> {
        let permissions = user.contents.effective_permissions(
            admin,
            &admin.storage().instance.data.authenticated_permissions,
        )?;
        let mut sessions = self.data.sessions.write();
        sessions.last_session_id += 1;
        let session_id = SessionId(sessions.last_session_id);
        let session = Session {
            id: Some(session_id),
            authentication: SessionAuthentication::Identity(Arc::new(Identity::User {
                id: user.header.id,
                username: user.contents.username,
            })),
            permissions,
        };
        let authentication = Arc::new(AuthenticatedSession {
            storage: Arc::downgrade(&self.data),
            session: Mutex::new(session.clone()),
        });
        sessions.sessions.insert(session_id, authentication.clone());
        Ok(Storage {
            instance: self.clone(),
            authentication: Some(authentication),
            effective_session: Some(Arc::new(session)),
        })
    }
    fn assume_role(
        &self,
        role: CollectionDocument<Role>,
        admin: &Database,
    ) -> Result<Storage, bonsaidb_core::Error> {
        let permissions = role.contents.effective_permissions(
            admin,
            &admin.storage().instance.data.authenticated_permissions,
        )?;
        let mut sessions = self.data.sessions.write();
        sessions.last_session_id += 1;
        let session_id = SessionId(sessions.last_session_id);
        let session = Session {
            id: Some(session_id),
            authentication: SessionAuthentication::Identity(Arc::new(Identity::Role {
                id: role.header.id,
                name: role.contents.name,
            })),
            permissions,
        };
        let authentication = Arc::new(AuthenticatedSession {
            storage: Arc::downgrade(&self.data),
            session: Mutex::new(session.clone()),
        });
        sessions.sessions.insert(session_id, authentication.clone());
        Ok(Storage {
            instance: self.clone(),
            authentication: Some(authentication),
            effective_session: Some(Arc::new(session)),
        })
    }
    fn add_permission_group_to_user_inner(
        user: &mut CollectionDocument<User>,
        permission_group_id: u64,
    ) -> bool {
        if user.contents.groups.contains(&permission_group_id) {
            false
        } else {
            user.contents.groups.push(permission_group_id);
            true
        }
    }
    fn remove_permission_group_from_user_inner(
        user: &mut CollectionDocument<User>,
        permission_group_id: u64,
    ) -> bool {
        let old_len = user.contents.groups.len();
        user.contents.groups.retain(|id| id != &permission_group_id);
        old_len != user.contents.groups.len()
    }
    fn add_role_to_user_inner(user: &mut CollectionDocument<User>, role_id: u64) -> bool {
        if user.contents.roles.contains(&role_id) {
            false
        } else {
            user.contents.roles.push(role_id);
            true
        }
    }
    fn remove_role_from_user_inner(user: &mut CollectionDocument<User>, role_id: u64) -> bool {
        let old_len = user.contents.roles.len();
        user.contents.roles.retain(|id| id != &role_id);
        old_len != user.contents.roles.len()
    }
}
pub trait DatabaseOpener: Send + Sync {
    fn schematic(&self) -> &'_ Schematic;
    fn open(&self, name: String, storage: &Storage) -> Result<Database, Error>;
}
pub struct StorageSchemaOpener<DB: Schema> {
    schematic: Schematic,
    _phantom: PhantomData<DB>,
}
impl<DB> StorageSchemaOpener<DB>
where
    DB: Schema,
{
    pub fn new() -> Result<Self, Error> {
        let schematic = DB::schematic()?;
        Ok(Self {
            schematic,
            _phantom: PhantomData,
        })
    }
}
impl<DB> DatabaseOpener for StorageSchemaOpener<DB>
where
    DB: Schema,
{
    fn schematic(&self) -> &'_ Schematic {
        &self.schematic
    }
    fn open(&self, name: String, storage: &Storage) -> Result<Database, Error> {
        let roots = storage.instance.open_roots(&name)?;
        let db = Database::new::<DB, _>(name, roots, storage)?;
        Ok(db)
    }
}
impl HasSession for StorageInstance {
    fn session(&self) -> Option<&Session> {
        None
    }
}
impl StorageConnection for StorageInstance {
    type Authenticated = Storage;
    type Database = Database;
    fn admin(&self) -> Self::Database {
        Database::new::<Admin, _>(
            ADMIN_DATABASE_NAME,
            self.open_roots(ADMIN_DATABASE_NAME).unwrap(),
            &Storage::from(self.clone()),
        )
        .unwrap()
    }
    #[cfg_attr(feature = "tracing", tracing::instrument(
        level = "trace",
        skip(self, schema),
        fields(
            schema.authority = schema.authority.as_ref(),
            schema.name = schema.name.as_ref(),
        )
    ))]
    fn create_database_with_schema(
        &self,
        name: &str,
        schema: SchemaName,
        only_if_needed: bool,
    ) -> Result<(), bonsaidb_core::Error> {
        Storage::validate_name(name)?;
        {
            let schemas = self.data.schemas.read();
            if !schemas.contains_key(&schema) {
                return Err(bonsaidb_core::Error::SchemaNotRegistered(schema));
            }
        }
        let mut available_databases = self.data.available_databases.write();
        let admin = self.admin();
        if !available_databases.contains_key(name) {
            admin
                .collection::<DatabaseRecord>()
                .push(&admin::Database {
                    name: name.to_string(),
                    schema: schema.clone(),
                })?;
            available_databases.insert(name.to_string(), schema);
        } else if !only_if_needed {
            return Err(bonsaidb_core::Error::DatabaseNameAlreadyTaken(
                name.to_string(),
            ));
        }
        Ok(())
    }
    fn database<DB: Schema>(&self, name: &str) -> Result<Self::Database, bonsaidb_core::Error> {
        self.database_without_schema(name, None, Some(DB::schema_name()))
            .map_err(bonsaidb_core::Error::from)
    }
    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip(self)))]
    fn delete_database(&self, name: &str) -> Result<(), bonsaidb_core::Error> {
        let admin = self.admin();
        let mut available_databases = self.data.available_databases.write();
        available_databases.remove(name);
        let mut open_roots = self.data.open_roots.lock();
        open_roots.remove(name);
        let database_folder = self.data.path.join(name);
        if database_folder.exists() {
            let file_manager = self.data.file_manager.clone();
            file_manager
                .delete_directory(&database_folder)
                .map_err(Error::Nebari)?;
        }
        if let Some(entry) = admin
            .view::<database::ByName>()
            .with_key(name)
            .query()?
            .first()
        {
            admin.delete::<DatabaseRecord, _>(&entry.source)?;
            Ok(())
        } else {
            Err(bonsaidb_core::Error::DatabaseNotFound(name.to_string()))
        }
    }
    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
    fn list_databases(&self) -> Result<Vec<connection::Database>, bonsaidb_core::Error> {
        let available_databases = self.data.available_databases.read();
        Ok(available_databases
            .iter()
            .map(|(name, schema)| connection::Database {
                name: name.to_string(),
                schema: schema.clone(),
            })
            .collect())
    }
    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
    fn list_available_schemas(&self) -> Result<Vec<SchemaSummary>, bonsaidb_core::Error> {
        let available_databases = self.data.available_databases.read();
        let schemas = self.data.schemas.read();
        Ok(available_databases
            .values()
            .unique()
            .filter_map(|name| {
                schemas
                    .get(name)
                    .map(|opener| SchemaSummary::from(opener.schematic()))
            })
            .collect())
    }
    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
    fn create_user(&self, username: &str) -> Result<u64, bonsaidb_core::Error> {
        let result = self
            .admin()
            .collection::<User>()
            .push(&User::default_with_username(username))?;
        Ok(result.id)
    }
    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
    fn delete_user<'user, U: Nameable<'user, u64> + Send + Sync>(
        &self,
        user: U,
    ) -> Result<(), bonsaidb_core::Error> {
        let admin = self.admin();
        let user = User::load(user, &admin)?.ok_or(bonsaidb_core::Error::UserNotFound)?;
        user.delete(&admin)?;
        Ok(())
    }
    #[cfg(feature = "password-hashing")]
    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
    fn set_user_password<'user, U: Nameable<'user, u64> + Send + Sync>(
        &self,
        user: U,
        password: bonsaidb_core::connection::SensitiveString,
    ) -> Result<(), bonsaidb_core::Error> {
        let admin = self.admin();
        let mut user = User::load(user, &admin)?.ok_or(bonsaidb_core::Error::UserNotFound)?;
        user.contents.argon_hash = Some(self.data.argon.hash(user.header.id, password)?);
        user.update(&admin)
    }
    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
    #[cfg(any(feature = "token-authentication", feature = "password-hashing"))]
    fn authenticate(
        &self,
        authentication: bonsaidb_core::connection::Authentication,
    ) -> Result<Self::Authenticated, bonsaidb_core::Error> {
        let admin = self.admin();
        self.authenticate_inner(authentication, None, None, &admin)
            .map(Storage::from)
    }
    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
    fn assume_identity(
        &self,
        identity: IdentityReference<'_>,
    ) -> Result<Self::Authenticated, bonsaidb_core::Error> {
        let admin = self.admin();
        match identity {
            IdentityReference::User(user) => {
                let user =
                    User::load(user, &admin)?.ok_or(bonsaidb_core::Error::InvalidCredentials)?;
                self.assume_user(user, &admin).map(Storage::from)
            }
            IdentityReference::Role(role) => {
                let role =
                    Role::load(role, &admin)?.ok_or(bonsaidb_core::Error::InvalidCredentials)?;
                self.assume_role(role, &admin).map(Storage::from)
            }
            _ => Err(bonsaidb_core::Error::InvalidCredentials),
        }
    }
    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
    fn add_permission_group_to_user<
        'user,
        'group,
        U: Nameable<'user, u64> + Send + Sync,
        G: Nameable<'group, u64> + Send + Sync,
    >(
        &self,
        user: U,
        permission_group: G,
    ) -> Result<(), bonsaidb_core::Error> {
        self.update_user_with_named_id::<PermissionGroup, _, _, _>(
            user,
            permission_group,
            |user, permission_group_id| {
                Ok(Self::add_permission_group_to_user_inner(
                    user,
                    permission_group_id,
                ))
            },
        )
    }
    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
    fn remove_permission_group_from_user<
        'user,
        'group,
        U: Nameable<'user, u64> + Send + Sync,
        G: Nameable<'group, u64> + Send + Sync,
    >(
        &self,
        user: U,
        permission_group: G,
    ) -> Result<(), bonsaidb_core::Error> {
        self.update_user_with_named_id::<PermissionGroup, _, _, _>(
            user,
            permission_group,
            |user, permission_group_id| {
                Ok(Self::remove_permission_group_from_user_inner(
                    user,
                    permission_group_id,
                ))
            },
        )
    }
    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
    fn add_role_to_user<
        'user,
        'group,
        U: Nameable<'user, u64> + Send + Sync,
        G: Nameable<'group, u64> + Send + Sync,
    >(
        &self,
        user: U,
        role: G,
    ) -> Result<(), bonsaidb_core::Error> {
        self.update_user_with_named_id::<PermissionGroup, _, _, _>(user, role, |user, role_id| {
            Ok(Self::add_role_to_user_inner(user, role_id))
        })
    }
    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
    fn remove_role_from_user<
        'user,
        'group,
        U: Nameable<'user, u64> + Send + Sync,
        G: Nameable<'group, u64> + Send + Sync,
    >(
        &self,
        user: U,
        role: G,
    ) -> Result<(), bonsaidb_core::Error> {
        self.update_user_with_named_id::<Role, _, _, _>(user, role, |user, role_id| {
            Ok(Self::remove_role_from_user_inner(user, role_id))
        })
    }
}
impl HasSession for Storage {
    fn session(&self) -> Option<&Session> {
        self.effective_session.as_deref()
    }
}
impl StorageConnection for Storage {
    type Authenticated = Self;
    type Database = Database;
    fn admin(&self) -> Self::Database {
        self.instance.admin()
    }
    fn create_database_with_schema(
        &self,
        name: &str,
        schema: SchemaName,
        only_if_needed: bool,
    ) -> Result<(), bonsaidb_core::Error> {
        self.check_permission(
            database_resource_name(name),
            &BonsaiAction::Server(ServerAction::CreateDatabase),
        )?;
        self.instance
            .create_database_with_schema(name, schema, only_if_needed)
    }
    fn database<DB: Schema>(&self, name: &str) -> Result<Self::Database, bonsaidb_core::Error> {
        self.instance.database::<DB>(name)
    }
    fn delete_database(&self, name: &str) -> Result<(), bonsaidb_core::Error> {
        self.check_permission(
            database_resource_name(name),
            &BonsaiAction::Server(ServerAction::DeleteDatabase),
        )?;
        self.instance.delete_database(name)
    }
    fn list_databases(&self) -> Result<Vec<connection::Database>, bonsaidb_core::Error> {
        self.check_permission(
            bonsaidb_resource_name(),
            &BonsaiAction::Server(ServerAction::ListDatabases),
        )?;
        self.instance.list_databases()
    }
    fn list_available_schemas(&self) -> Result<Vec<SchemaSummary>, bonsaidb_core::Error> {
        self.check_permission(
            bonsaidb_resource_name(),
            &BonsaiAction::Server(ServerAction::ListAvailableSchemas),
        )?;
        self.instance.list_available_schemas()
    }
    fn create_user(&self, username: &str) -> Result<u64, bonsaidb_core::Error> {
        self.check_permission(
            bonsaidb_resource_name(),
            &BonsaiAction::Server(ServerAction::CreateUser),
        )?;
        self.instance.create_user(username)
    }
    fn delete_user<'user, U: Nameable<'user, u64> + Send + Sync>(
        &self,
        user: U,
    ) -> Result<(), bonsaidb_core::Error> {
        let admin = self.admin();
        let user = user.name()?;
        let user_id = user
            .id::<User, _>(&admin)?
            .ok_or(bonsaidb_core::Error::UserNotFound)?;
        self.check_permission(
            user_resource_name(user_id),
            &BonsaiAction::Server(ServerAction::DeleteUser),
        )?;
        self.instance.delete_user(user)
    }
    #[cfg(feature = "password-hashing")]
    fn set_user_password<'user, U: Nameable<'user, u64> + Send + Sync>(
        &self,
        user: U,
        password: bonsaidb_core::connection::SensitiveString,
    ) -> Result<(), bonsaidb_core::Error> {
        let admin = self.admin();
        let user = user.name()?;
        let user_id = user
            .id::<User, _>(&admin)?
            .ok_or(bonsaidb_core::Error::UserNotFound)?;
        self.check_permission(
            user_resource_name(user_id),
            &BonsaiAction::Server(ServerAction::SetPassword),
        )?;
        self.instance.set_user_password(user, password)
    }
    #[cfg(any(feature = "token-authentication", feature = "password-hashing"))]
    #[cfg_attr(not(feature = "token-authentication"), allow(unused_assignments))]
    #[cfg_attr(not(feature = "password-hashing"), allow(unused_mut))]
    fn authenticate(
        &self,
        authentication: bonsaidb_core::connection::Authentication,
    ) -> Result<Self, bonsaidb_core::Error> {
        let admin = self.admin();
        let mut loaded_user = None;
        match &authentication {
            #[cfg(feature = "token-authentication")]
            bonsaidb_core::connection::Authentication::Token { id, .. } => {
                self.check_permission(
                    bonsaidb_core::permissions::bonsai::authentication_token_resource_name(*id),
                    &BonsaiAction::Server(ServerAction::Authenticate(
                        bonsaidb_core::connection::AuthenticationMethod::Token,
                    )),
                )?;
            }
            #[cfg(feature = "password-hashing")]
            bonsaidb_core::connection::Authentication::Password { user, .. } => {
                let user =
                    User::load(user, &admin)?.ok_or(bonsaidb_core::Error::InvalidCredentials)?;
                self.check_permission(
                    user_resource_name(user.header.id),
                    &BonsaiAction::Server(ServerAction::Authenticate(
                        bonsaidb_core::connection::AuthenticationMethod::PasswordHash,
                    )),
                )?;
                loaded_user = Some(user);
            }
            #[cfg(feature = "token-authentication")]
            bonsaidb_core::connection::Authentication::TokenChallengeResponse(_) => {}
        }
        self.instance.authenticate_inner(
            authentication,
            loaded_user,
            self.authentication
                .as_ref()
                .and_then(|auth| auth.session.lock().id),
            &admin,
        )
    }
    fn assume_identity(
        &self,
        identity: IdentityReference<'_>,
    ) -> Result<Self::Authenticated, bonsaidb_core::Error> {
        match identity {
            IdentityReference::User(user) => {
                let admin = self.admin();
                let user =
                    User::load(user, &admin)?.ok_or(bonsaidb_core::Error::InvalidCredentials)?;
                self.check_permission(
                    user_resource_name(user.header.id),
                    &BonsaiAction::Server(ServerAction::AssumeIdentity),
                )?;
                self.instance.assume_user(user, &admin)
            }
            IdentityReference::Role(role) => {
                let admin = self.admin();
                let role =
                    Role::load(role, &admin)?.ok_or(bonsaidb_core::Error::InvalidCredentials)?;
                self.check_permission(
                    role_resource_name(role.header.id),
                    &BonsaiAction::Server(ServerAction::AssumeIdentity),
                )?;
                self.instance.assume_role(role, &admin)
            }
            _ => Err(bonsaidb_core::Error::InvalidCredentials),
        }
    }
    fn add_permission_group_to_user<
        'user,
        'group,
        U: Nameable<'user, u64> + Send + Sync,
        G: Nameable<'group, u64> + Send + Sync,
    >(
        &self,
        user: U,
        permission_group: G,
    ) -> Result<(), bonsaidb_core::Error> {
        self.instance
            .update_user_with_named_id::<PermissionGroup, _, _, _>(
                user,
                permission_group,
                |user, permission_group_id| {
                    self.check_permission(
                        user_resource_name(user.header.id),
                        &BonsaiAction::Server(ServerAction::ModifyUserPermissionGroups),
                    )?;
                    Ok(StorageInstance::add_permission_group_to_user_inner(
                        user,
                        permission_group_id,
                    ))
                },
            )
    }
    fn remove_permission_group_from_user<
        'user,
        'group,
        U: Nameable<'user, u64> + Send + Sync,
        G: Nameable<'group, u64> + Send + Sync,
    >(
        &self,
        user: U,
        permission_group: G,
    ) -> Result<(), bonsaidb_core::Error> {
        self.instance
            .update_user_with_named_id::<PermissionGroup, _, _, _>(
                user,
                permission_group,
                |user, permission_group_id| {
                    self.check_permission(
                        user_resource_name(user.header.id),
                        &BonsaiAction::Server(ServerAction::ModifyUserPermissionGroups),
                    )?;
                    Ok(StorageInstance::remove_permission_group_from_user_inner(
                        user,
                        permission_group_id,
                    ))
                },
            )
    }
    fn add_role_to_user<
        'user,
        'group,
        U: Nameable<'user, u64> + Send + Sync,
        G: Nameable<'group, u64> + Send + Sync,
    >(
        &self,
        user: U,
        role: G,
    ) -> Result<(), bonsaidb_core::Error> {
        self.instance
            .update_user_with_named_id::<PermissionGroup, _, _, _>(user, role, |user, role_id| {
                self.check_permission(
                    user_resource_name(user.header.id),
                    &BonsaiAction::Server(ServerAction::ModifyUserRoles),
                )?;
                Ok(StorageInstance::add_role_to_user_inner(user, role_id))
            })
    }
    fn remove_role_from_user<
        'user,
        'group,
        U: Nameable<'user, u64> + Send + Sync,
        G: Nameable<'group, u64> + Send + Sync,
    >(
        &self,
        user: U,
        role: G,
    ) -> Result<(), bonsaidb_core::Error> {
        self.instance
            .update_user_with_named_id::<Role, _, _, _>(user, role, |user, role_id| {
                self.check_permission(
                    user_resource_name(user.header.id),
                    &BonsaiAction::Server(ServerAction::ModifyUserRoles),
                )?;
                Ok(StorageInstance::remove_role_from_user_inner(user, role_id))
            })
    }
}
#[test]
fn name_validation_tests() {
    assert!(matches!(Storage::validate_name("azAZ09.-"), Ok(())));
    assert!(matches!(
        Storage::validate_name("_internal-names-work"),
        Ok(())
    ));
    assert!(matches!(
        Storage::validate_name("-alphaunmericfirstrequired"),
        Err(Error::Core(bonsaidb_core::Error::InvalidDatabaseName(_)))
    ));
    assert!(matches!(
        Storage::validate_name("\u{2661}"),
        Err(Error::Core(bonsaidb_core::Error::InvalidDatabaseName(_)))
    ));
}
#[derive(Clone, Copy, Eq, PartialEq, Hash)]
pub struct StorageId(u64);
impl StorageId {
    #[must_use]
    pub const fn as_u64(self) -> u64 {
        self.0
    }
}
impl Debug for StorageId {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "{:016x}", self.0)
    }
}
impl Display for StorageId {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        Debug::fmt(self, f)
    }
}
#[derive(Debug, Clone)]
#[cfg(any(feature = "compression", feature = "encryption"))]
pub(crate) struct TreeVault {
    #[cfg(feature = "compression")]
    compression: Option<Compression>,
    #[cfg(feature = "encryption")]
    pub key: Option<KeyId>,
    #[cfg(feature = "encryption")]
    pub vault: Arc<Vault>,
}
#[cfg(all(feature = "compression", feature = "encryption"))]
impl TreeVault {
    pub(crate) fn new_if_needed(
        key: Option<KeyId>,
        vault: &Arc<Vault>,
        compression: Option<Compression>,
    ) -> Option<Self> {
        if key.is_none() && compression.is_none() {
            None
        } else {
            Some(Self {
                key,
                compression,
                vault: vault.clone(),
            })
        }
    }
    fn header(&self, compressed: bool) -> u8 {
        let mut bits = if self.key.is_some() { 0b1000_0000 } else { 0 };
        if compressed {
            if let Some(compression) = self.compression {
                bits |= compression as u8;
            }
        }
        bits
    }
}
#[cfg(all(feature = "compression", feature = "encryption"))]
impl nebari::Vault for TreeVault {
    type Error = Error;
    fn encrypt(&self, payload: &[u8]) -> Result<Vec<u8>, Error> {
        let mut includes_compression = false;
        let compressed = match (payload.len(), self.compression) {
            (128..=usize::MAX, Some(Compression::Lz4)) => {
                includes_compression = true;
                Cow::Owned(lz4_flex::block::compress_prepend_size(payload))
            }
            _ => Cow::Borrowed(payload),
        };
        let mut complete = if let Some(key) = &self.key {
            self.vault.encrypt_payload(key, &compressed, None)?
        } else {
            compressed.into_owned()
        };
        let header = self.header(includes_compression);
        if header != 0 {
            let header = [b't', b'r', b'v', header];
            complete.splice(0..0, header);
        }
        Ok(complete)
    }
    fn decrypt(&self, payload: &[u8]) -> Result<Vec<u8>, Error> {
        if payload.len() >= 4 && &payload[0..3] == b"trv" {
            let header = payload[3];
            let payload = &payload[4..];
            let encrypted = (header & 0b1000_0000) != 0;
            let compression = header & 0b0111_1111;
            let decrypted = if encrypted {
                Cow::Owned(self.vault.decrypt_payload(payload, None)?)
            } else {
                Cow::Borrowed(payload)
            };
            #[allow(clippy::single_match)] return Ok(match Compression::from_u8(compression) {
                Some(Compression::Lz4) => {
                    lz4_flex::block::decompress_size_prepended(&decrypted).map_err(Error::from)?
                }
                None => decrypted.into_owned(),
            });
        }
        self.vault.decrypt_payload(payload, None)
    }
}
pub trait StorageNonBlocking: Sized {
    #[must_use]
    fn path(&self) -> &Path;
    fn assume_session(&self, session: Session) -> Result<Self, bonsaidb_core::Error>;
}
impl StorageNonBlocking for Storage {
    fn path(&self) -> &Path {
        &self.instance.data.path
    }
    fn assume_session(&self, session: Session) -> Result<Storage, bonsaidb_core::Error> {
        if self.authentication.is_some() {
            return Err(bonsaidb_core::Error::InvalidCredentials);
        }
        let Some(session_id) = session.id else {
            return Ok(Self {
                instance: self.instance.clone(),
                authentication: None,
                effective_session: Some(Arc::new(session)),
            });
        };
        let session_data = self.instance.data.sessions.read();
        let authentication = session_data
            .sessions
            .get(&session_id)
            .ok_or(bonsaidb_core::Error::InvalidCredentials)?;
        let authentication_session = authentication.session.lock();
        let effective_permissions =
            Permissions::merged([&session.permissions, &authentication_session.permissions]);
        let effective_session = Session {
            id: authentication_session.id,
            authentication: authentication_session.authentication.clone(),
            permissions: effective_permissions,
        };
        Ok(Self {
            instance: self.instance.clone(),
            authentication: Some(authentication.clone()),
            effective_session: Some(Arc::new(effective_session)),
        })
    }
}
#[cfg(all(feature = "compression", not(feature = "encryption")))]
impl TreeVault {
    pub(crate) fn new_if_needed(compression: Option<Compression>) -> Option<Self> {
        compression.map(|compression| Self {
            compression: Some(compression),
        })
    }
}
#[cfg(all(feature = "compression", not(feature = "encryption")))]
impl nebari::Vault for TreeVault {
    type Error = Error;
    fn encrypt(&self, payload: &[u8]) -> Result<Vec<u8>, Error> {
        Ok(match (payload.len(), self.compression) {
            (128..=usize::MAX, Some(Compression::Lz4)) => {
                let mut destination =
                    vec![0; lz4_flex::block::get_maximum_output_size(payload.len()) + 8];
                let compressed_length =
                    lz4_flex::block::compress_into(payload, &mut destination[8..])
                        .expect("lz4-flex documents this shouldn't fail");
                destination.truncate(compressed_length + 8);
                destination[0..4].copy_from_slice(&[b't', b'r', b'v', Compression::Lz4 as u8]);
                let uncompressed_length =
                    u32::try_from(payload.len()).expect("nebari doesn't support >32 bit blocks");
                destination[4..8].copy_from_slice(&uncompressed_length.to_le_bytes());
                destination
            }
            _ => payload.to_vec(),
        })
    }
    fn decrypt(&self, payload: &[u8]) -> Result<Vec<u8>, Error> {
        if payload.len() >= 4 && &payload[0..3] == b"trv" {
            let header = payload[3];
            let payload = &payload[4..];
            let encrypted = (header & 0b1000_0000) != 0;
            let compression = header & 0b0111_1111;
            if encrypted {
                return Err(Error::EncryptionDisabled);
            }
            #[allow(clippy::single_match)] return Ok(match Compression::from_u8(compression) {
                Some(Compression::Lz4) => {
                    lz4_flex::block::decompress_size_prepended(payload).map_err(Error::from)?
                }
                None => payload.to_vec(),
            });
        }
        Ok(payload.to_vec())
    }
}
#[cfg(all(not(feature = "compression"), feature = "encryption"))]
impl TreeVault {
    pub(crate) fn new_if_needed(key: Option<KeyId>, vault: &Arc<Vault>) -> Option<Self> {
        key.map(|key| Self {
            key: Some(key),
            vault: vault.clone(),
        })
    }
    #[allow(dead_code)] fn header(&self) -> u8 {
        if self.key.is_some() {
            0b1000_0000
        } else {
            0
        }
    }
}
#[cfg(all(not(feature = "compression"), feature = "encryption"))]
impl nebari::Vault for TreeVault {
    type Error = Error;
    fn encrypt(&self, payload: &[u8]) -> Result<Vec<u8>, Error> {
        if let Some(key) = &self.key {
            self.vault.encrypt_payload(key, payload, None)
        } else {
            Ok(payload.to_vec())
        }
    }
    fn decrypt(&self, payload: &[u8]) -> Result<Vec<u8>, Error> {
        self.vault.decrypt_payload(payload, None)
    }
}
#[derive(Clone, Debug)]
pub struct StorageLock(StorageId, Arc<LockData>);
impl StorageLock {
    pub const fn id(&self) -> StorageId {
        self.0
    }
}
#[derive(Debug)]
struct LockData(File);
impl StorageLock {
    fn new(id: StorageId, file: File) -> Self {
        Self(id, Arc::new(LockData(file)))
    }
}
impl Drop for LockData {
    fn drop(&mut self) {
        drop(self.0.unlock());
    }
}