1
use std::borrow::Cow;
2
use std::collections::{HashMap, HashSet};
3
use std::fmt::{Debug, Display};
4
use std::fs::{self, File};
5
use std::io::{Read, Write};
6
use std::marker::PhantomData;
7
use std::path::{Path, PathBuf};
8
use std::sync::{Arc, Weak};
9

            
10
use bonsaidb_core::admin::database::{self, ByName, Database as DatabaseRecord};
11
use bonsaidb_core::admin::user::User;
12
use bonsaidb_core::admin::{self, Admin, PermissionGroup, Role, ADMIN_DATABASE_NAME};
13
use bonsaidb_core::circulate;
14
pub use bonsaidb_core::circulate::Relay;
15
use bonsaidb_core::connection::{
16
    self, Connection, HasSession, Identity, IdentityReference, LowLevelConnection, Session,
17
    SessionAuthentication, SessionId, StorageConnection,
18
};
19
use bonsaidb_core::document::CollectionDocument;
20
#[cfg(any(feature = "encryption", feature = "compression"))]
21
use bonsaidb_core::document::KeyId;
22
use bonsaidb_core::permissions::bonsai::{
23
    bonsaidb_resource_name, database_resource_name, role_resource_name, user_resource_name,
24
    BonsaiAction, ServerAction,
25
};
26
use bonsaidb_core::permissions::Permissions;
27
use bonsaidb_core::schema::{
28
    Nameable, NamedCollection, Schema, SchemaName, SchemaSummary, Schematic,
29
};
30
use fs2::FileExt;
31
use itertools::Itertools;
32
use nebari::io::any::{AnyFile, AnyFileManager};
33
use nebari::io::FileManager;
34
use nebari::{ChunkCache, ThreadPool};
35
use parking_lot::{Mutex, RwLock};
36
use rand::{thread_rng, Rng};
37

            
38
#[cfg(feature = "compression")]
39
use crate::config::Compression;
40
use crate::config::{KeyValuePersistence, StorageConfiguration};
41
use crate::database::Context;
42
use crate::tasks::manager::Manager;
43
use crate::tasks::TaskManager;
44
#[cfg(feature = "encryption")]
45
use crate::vault::{self, LocalVaultKeyStorage, Vault};
46
use crate::{Database, Error};
47

            
48
#[cfg(feature = "password-hashing")]
49
mod argon;
50
#[cfg(feature = "token-authentication")]
51
mod token_authentication;
52

            
53
mod backup;
54
mod pubsub;
55
pub use backup::{AnyBackupLocation, BackupLocation};
56

            
57
/// A file-based, multi-database, multi-user database engine. This type blocks
58
/// the current thread when used. See [`AsyncStorage`](crate::AsyncStorage) for
59
/// this type's async counterpart.
60
///
61
/// ## Converting between Blocking and Async Types
62
///
63
/// [`AsyncStorage`](crate::AsyncStorage) and [`Storage`] can be converted to
64
/// and from each other using:
65
///
66
/// - [`AsyncStorage::into_blocking()`](crate::AsyncStorage::into_blocking)
67
/// - [`AsyncStorage::to_blocking()`](crate::AsyncStorage::to_blocking)
68
/// - [`AsyncStorage::as_blocking()`](crate::AsyncStorage::as_blocking)
69
/// - [`Storage::into_async()`]
70
/// - [`Storage::to_async()`]
71
/// - [`Storage::into_async_with_runtime()`]
72
/// - [`Storage::to_async_with_runtime()`]
73
///
74
/// ## Converting from `Database::open` to `Storage::open`
75
///
76
/// [`Database::open`](Database::open) is a simple method that uses `Storage` to
77
/// create a database named `default` with the schema provided. These two ways
78
/// of opening the database are the same:
79
///
80
/// ```rust
81
/// // `bonsaidb_core` is re-exported to `bonsaidb::core` or `bonsaidb_local::core`.
82
/// use bonsaidb_core::connection::StorageConnection;
83
/// use bonsaidb_core::schema::Schema;
84
/// // `bonsaidb_local` is re-exported to `bonsaidb::local` if using the omnibus crate.
85
/// use bonsaidb_local::{
86
///     config::{Builder, StorageConfiguration},
87
///     Database, Storage,
88
/// };
89
/// # fn open<MySchema: Schema>() -> anyhow::Result<()> {
90
/// // This creates a Storage instance, creates a database, and returns it.
91
/// let db = Database::open::<MySchema>(StorageConfiguration::new("my-db.bonsaidb"))?;
92
///
93
/// // This is the equivalent code being executed:
94
/// let storage =
95
///     Storage::open(StorageConfiguration::new("my-db.bonsaidb").with_schema::<MySchema>()?)?;
96
/// storage.create_database::<MySchema>("default", true)?;
97
/// let db = storage.database::<MySchema>("default")?;
98
/// #     Ok(())
99
/// # }
100
/// ```
101
///
102
/// ## Using multiple databases
103
///
104
/// This example shows how to use `Storage` to create and use multiple databases
105
/// with multiple schemas:
106
///
107
/// ```rust
108
/// use bonsaidb_core::connection::StorageConnection;
109
/// use bonsaidb_core::schema::{Collection, Schema};
110
/// use bonsaidb_local::config::{Builder, StorageConfiguration};
111
/// use bonsaidb_local::Storage;
112
/// use serde::{Deserialize, Serialize};
113
///
114
/// #[derive(Debug, Schema)]
115
/// #[schema(name = "my-schema", collections = [BlogPost, Author])]
116
/// # #[schema(core = bonsaidb_core)]
117
/// struct MySchema;
118
///
119
/// #[derive(Debug, Serialize, Deserialize, Collection)]
120
/// #[collection(name = "blog-posts")]
121
/// # #[collection(core = bonsaidb_core)]
122
/// struct BlogPost {
123
///     pub title: String,
124
///     pub contents: String,
125
///     pub author_id: u64,
126
/// }
127
///
128
/// #[derive(Debug, Serialize, Deserialize, Collection)]
129
/// #[collection(name = "blog-posts")]
130
/// # #[collection(core = bonsaidb_core)]
131
/// struct Author {
132
///     pub name: String,
133
/// }
134
///
135
/// # fn open() -> anyhow::Result<()> {
136
/// let storage = Storage::open(
137
///     StorageConfiguration::new("my-db.bonsaidb")
138
///         .with_schema::<BlogPost>()?
139
///         .with_schema::<MySchema>()?,
140
/// )?;
141
///
142
/// storage.create_database::<BlogPost>("ectons-blog", true)?;
143
/// let ectons_blog = storage.database::<BlogPost>("ectons-blog")?;
144
/// storage.create_database::<MySchema>("another-db", true)?;
145
/// let another_db = storage.database::<MySchema>("another-db")?;
146
/// # Ok(())
147
/// # }
148
/// ```
149
11440320
#[derive(Debug, Clone)]
150
#[must_use]
151
pub struct Storage {
152
    pub(crate) instance: StorageInstance,
153
    pub(crate) authentication: Option<Arc<AuthenticatedSession>>,
154
    effective_session: Option<Arc<Session>>,
155
}
156

            
157
#[derive(Debug)]
158
pub struct AuthenticatedSession {
159
    // TODO: client_data,
160
    storage: Weak<Data>,
161
    pub session: Mutex<Session>,
162
}
163

            
164
5125
#[derive(Debug, Default)]
165
pub struct SessionSubscribers {
166
    pub subscribers: HashMap<u64, SessionSubscriber>,
167
    pub subscribers_by_session: HashMap<SessionId, HashSet<u64>>,
168
    pub last_id: u64,
169
}
170

            
171
impl SessionSubscribers {
172
824
    pub fn unregister(&mut self, subscriber_id: u64) {
173
824
        if let Some(session_id) = self
174
824
            .subscribers
175
824
            .remove(&subscriber_id)
176
824
            .and_then(|sub| sub.session_id)
177
        {
178
            if let Some(session_subscribers) = self.subscribers_by_session.get_mut(&session_id) {
179
                session_subscribers.remove(&subscriber_id);
180
            }
181
824
        }
182
824
    }
183
}
184

            
185
#[derive(Debug)]
186
pub struct SessionSubscriber {
187
    pub session_id: Option<SessionId>,
188
    pub subscriber: circulate::Subscriber,
189
}
190

            
191
impl Drop for AuthenticatedSession {
192
700
    fn drop(&mut self) {
193
700
        let mut session = self.session.lock();
194
700
        if let Some(id) = session.id.take() {
195
700
            if let Some(storage) = self.storage.upgrade() {
196
                // Deregister the session id once dropped.
197
296
                let mut sessions = storage.sessions.write();
198
296
                sessions.sessions.remove(&id);
199
296

            
200
296
                // Remove all subscribers.
201
296
                let mut sessions = storage.subscribers.write();
202
296
                for id in sessions
203
296
                    .subscribers_by_session
204
296
                    .remove(&id)
205
296
                    .into_iter()
206
296
                    .flatten()
207
                {
208
                    sessions.subscribers.remove(&id);
209
                }
210
404
            }
211
        }
212
700
    }
213
}
214

            
215
5125
#[derive(Debug, Default)]
216
struct AuthenticatedSessions {
217
    sessions: HashMap<SessionId, Arc<AuthenticatedSession>>,
218
    last_session_id: u64,
219
}
220

            
221
14125360
#[derive(Debug, Clone)]
222
pub struct StorageInstance {
223
    data: Arc<Data>,
224
}
225

            
226
impl From<StorageInstance> for Storage {
227
90360
    fn from(instance: StorageInstance) -> Self {
228
90360
        Self {
229
90360
            instance,
230
90360
            authentication: None,
231
90360
            effective_session: None,
232
90360
        }
233
90360
    }
234
}
235

            
236
struct Data {
237
    lock: StorageLock,
238
    path: PathBuf,
239
    parallelization: usize,
240
    threadpool: ThreadPool<AnyFile>,
241
    file_manager: AnyFileManager,
242
    pub(crate) tasks: TaskManager,
243
    schemas: RwLock<HashMap<SchemaName, Arc<dyn DatabaseOpener>>>,
244
    available_databases: RwLock<HashMap<String, SchemaName>>,
245
    open_roots: Mutex<HashMap<String, Context>>,
246
    // cfg check matches `Connection::authenticate`
247
    authenticated_permissions: Permissions,
248
    sessions: RwLock<AuthenticatedSessions>,
249
    pub(crate) subscribers: Arc<RwLock<SessionSubscribers>>,
250
    #[cfg(feature = "password-hashing")]
251
    argon: argon::Hasher,
252
    #[cfg(feature = "encryption")]
253
    pub(crate) vault: Arc<Vault>,
254
    #[cfg(feature = "encryption")]
255
    default_encryption_key: Option<KeyId>,
256
    #[cfg(any(feature = "compression", feature = "encryption"))]
257
    tree_vault: Option<TreeVault>,
258
    pub(crate) key_value_persistence: KeyValuePersistence,
259
    chunk_cache: ChunkCache,
260
    pub(crate) check_view_integrity_on_database_open: bool,
261
    relay: Relay,
262
}
263

            
264
impl Storage {
265
    /// Creates or opens a multi-database [`Storage`] with its data stored in `directory`.
266
5197
    pub fn open(configuration: StorageConfiguration) -> Result<Self, Error> {
267
5197
        let owned_path = configuration
268
5197
            .path
269
5197
            .clone()
270
5197
            .unwrap_or_else(|| PathBuf::from("db.bonsaidb"));
271
5197
        let file_manager = if configuration.memory_only {
272
71
            AnyFileManager::memory()
273
        } else {
274
5126
            AnyFileManager::std()
275
        };
276

            
277
5197
        let manager = Manager::default();
278
41576
        for _ in 0..configuration.workers.worker_count {
279
41576
            manager.spawn_worker();
280
41576
        }
281
5197
        let tasks = TaskManager::new(manager);
282
5197

            
283
5197
        fs::create_dir_all(&owned_path)?;
284

            
285
5197
        let storage_lock = Self::lookup_or_create_id(&configuration, &owned_path)?;
286

            
287
        #[cfg(feature = "encryption")]
288
5125
        let vault = {
289
5197
            let vault_key_storage = match configuration.vault_key_storage {
290
216
                Some(storage) => storage,
291
                None => Arc::new(
292
4981
                    LocalVaultKeyStorage::new(owned_path.join("vault-keys"))
293
4981
                        .map_err(|err| Error::Vault(vault::Error::Initializing(err.to_string())))?,
294
                ),
295
            };
296

            
297
5197
            Arc::new(Vault::initialize(
298
5197
                storage_lock.id(),
299
5197
                &owned_path,
300
5197
                vault_key_storage,
301
5197
            )?)
302
        };
303

            
304
5125
        let parallelization = configuration.workers.parallelization;
305
5125
        let check_view_integrity_on_database_open = configuration.views.check_integrity_on_open;
306
5125
        let key_value_persistence = configuration.key_value_persistence;
307
5125
        #[cfg(feature = "password-hashing")]
308
5125
        let argon = argon::Hasher::new(configuration.argon);
309
5125
        #[cfg(feature = "encryption")]
310
5125
        let default_encryption_key = configuration.default_encryption_key;
311
5125
        #[cfg(all(feature = "compression", feature = "encryption"))]
312
5125
        let tree_vault = TreeVault::new_if_needed(
313
5125
            default_encryption_key.clone(),
314
5125
            &vault,
315
5125
            configuration.default_compression,
316
5125
        );
317
5125
        #[cfg(all(not(feature = "compression"), feature = "encryption"))]
318
5125
        let tree_vault = TreeVault::new_if_needed(default_encryption_key.clone(), &vault);
319
5125
        #[cfg(all(feature = "compression", not(feature = "encryption")))]
320
5125
        let tree_vault = TreeVault::new_if_needed(configuration.default_compression);
321
5125

            
322
5125
        let authenticated_permissions = configuration.authenticated_permissions;
323
5125

            
324
5125
        let storage = Self {
325
5125
            instance: StorageInstance {
326
5125
                data: Arc::new(Data {
327
5125
                    lock: storage_lock,
328
5125
                    tasks,
329
5125
                    parallelization,
330
5125
                    subscribers: Arc::default(),
331
5125
                    authenticated_permissions,
332
5125
                    sessions: RwLock::default(),
333
5125
                    #[cfg(feature = "password-hashing")]
334
5125
                    argon,
335
5125
                    #[cfg(feature = "encryption")]
336
5125
                    vault,
337
5125
                    #[cfg(feature = "encryption")]
338
5125
                    default_encryption_key,
339
5125
                    #[cfg(any(feature = "compression", feature = "encryption"))]
340
5125
                    tree_vault,
341
5125
                    path: owned_path,
342
5125
                    file_manager,
343
5125
                    chunk_cache: ChunkCache::new(2000, 160_384),
344
5125
                    threadpool: ThreadPool::new(parallelization),
345
5125
                    schemas: RwLock::new(configuration.initial_schemas),
346
5125
                    available_databases: RwLock::default(),
347
5125
                    open_roots: Mutex::default(),
348
5125
                    key_value_persistence,
349
5125
                    check_view_integrity_on_database_open,
350
5125
                    relay: Relay::default(),
351
5125
                }),
352
5125
            },
353
5125
            authentication: None,
354
5125
            effective_session: None,
355
5125
        };
356
5125

            
357
5125
        storage.cache_available_databases()?;
358

            
359
5125
        storage.create_admin_database_if_needed()?;
360

            
361
5125
        Ok(storage)
362
5197
    }
363

            
364
    #[cfg(feature = "internal-apis")]
365
    #[doc(hidden)]
366
2548296
    pub fn database_without_schema(&self, name: &str) -> Result<Database, Error> {
367
2548296
        let name = name.to_owned();
368
2548296
        self.instance
369
2548296
            .database_without_schema(&name, Some(self), None)
370
2548296
    }
371

            
372
5197
    fn lookup_or_create_id(
373
5197
        configuration: &StorageConfiguration,
374
5197
        path: &Path,
375
5197
    ) -> Result<StorageLock, Error> {
376
5197
        let id_path = {
377
5197
            let storage_id = path.join("server-id");
378
5197
            if storage_id.exists() {
379
2
                storage_id
380
            } else {
381
5195
                path.join("storage-id")
382
            }
383
        };
384

            
385
5197
        let (id, file) = if let Some(id) = configuration.unique_id {
386
            // The configuraiton id override is not persisted to disk. This is
387
            // mostly to prevent someone from accidentally adding this
388
            // configuration, realizing it breaks things, and then wanting to
389
            // revert. This makes reverting to the old value easier.
390
            let file = if id_path.exists() {
391
                File::open(id_path)?
392
            } else {
393
                let mut file = File::create(id_path)?;
394
                let id = id.to_string();
395
                file.write_all(id.as_bytes())?;
396
                file
397
            };
398
            file.lock_exclusive()?;
399
            (id, file)
400
        } else {
401
            // Load/Store a randomly generated id into a file. While the value
402
            // is numerical, the file contents are the ascii decimal, making it
403
            // easier for a human to view, and if needed, edit.
404

            
405
5197
            if id_path.exists() {
406
                // This value is important enought to not allow launching the
407
                // server if the file can't be read or contains unexpected data.
408
692
                let mut file = File::open(id_path)?;
409
692
                file.lock_exclusive()?;
410
692
                let mut bytes = Vec::new();
411
692
                file.read_to_end(&mut bytes)?;
412
692
                let existing_id =
413
692
                    String::from_utf8(bytes).expect("server-id contains invalid data");
414
692

            
415
692
                (existing_id.parse().expect("server-id isn't numeric"), file)
416
            } else {
417
4505
                let id = { thread_rng().gen::<u64>() };
418
4505
                let mut file = File::create(id_path)?;
419
4505
                file.lock_exclusive()?;
420

            
421
4505
                file.write_all(id.to_string().as_bytes())?;
422

            
423
4505
                (id, file)
424
            }
425
        };
426
5197
        Ok(StorageLock::new(StorageId(id), file))
427
5197
    }
428

            
429
5125
    fn cache_available_databases(&self) -> Result<(), Error> {
430
5125
        let available_databases = self
431
5125
            .admin()
432
5125
            .view::<ByName>()
433
5125
            .query()?
434
5125
            .into_iter()
435
5125
            .map(|map| (map.key, map.value))
436
5125
            .collect();
437
5125
        let mut storage_databases = self.instance.data.available_databases.write();
438
5125
        *storage_databases = available_databases;
439
5125
        Ok(())
440
5125
    }
441

            
442
5125
    fn create_admin_database_if_needed(&self) -> Result<(), Error> {
443
5125
        self.register_schema::<Admin>()?;
444
5125
        match self.database::<Admin>(ADMIN_DATABASE_NAME) {
445
620
            Ok(_) => {}
446
            Err(bonsaidb_core::Error::DatabaseNotFound(_)) => {
447
4505
                drop(self.create_database::<Admin>(ADMIN_DATABASE_NAME, true)?);
448
            }
449
            Err(err) => return Err(Error::Core(err)),
450
        }
451
5125
        Ok(())
452
5125
    }
453

            
454
    /// Returns the unique id of the server.
455
    ///
456
    /// This value is set from the [`StorageConfiguration`] or randomly
457
    /// generated when creating a server. It shouldn't be changed after a server
458
    /// is in use, as doing can cause issues. For example, the vault that
459
    /// manages encrypted storage uses the server ID to store the vault key. If
460
    /// the server ID changes, the vault key storage will need to be updated
461
    /// with the new server ID.
462
    #[must_use]
463
    pub fn unique_id(&self) -> StorageId {
464
        self.instance.data.lock.id()
465
    }
466

            
467
    #[must_use]
468
460982
    pub(crate) fn parallelization(&self) -> usize {
469
460982
        self.instance.data.parallelization
470
460982
    }
471

            
472
    #[must_use]
473
    #[cfg(feature = "encryption")]
474
2129166
    pub(crate) fn vault(&self) -> &Arc<Vault> {
475
2129166
        &self.instance.data.vault
476
2129166
    }
477

            
478
    #[must_use]
479
    #[cfg(any(feature = "encryption", feature = "compression"))]
480
3908698
    pub(crate) fn tree_vault(&self) -> Option<&TreeVault> {
481
3908698
        self.instance.data.tree_vault.as_ref()
482
3908698
    }
483

            
484
    #[must_use]
485
    #[cfg(feature = "encryption")]
486
3817739
    pub(crate) fn default_encryption_key(&self) -> Option<&KeyId> {
487
3817739
        self.instance.data.default_encryption_key.as_ref()
488
3817739
    }
489

            
490
    #[must_use]
491
    #[cfg(all(feature = "compression", not(feature = "encryption")))]
492
    #[allow(clippy::unused_self)]
493
    pub(crate) fn default_encryption_key(&self) -> Option<&KeyId> {
494
        None
495
    }
496

            
497
    /// Registers a schema for use within the server.
498
5125
    pub fn register_schema<DB: Schema>(&self) -> Result<(), Error> {
499
5125
        let mut schemas = self.instance.data.schemas.write();
500
5125
        if schemas
501
5125
            .insert(
502
5125
                DB::schema_name(),
503
5125
                Arc::new(StorageSchemaOpener::<DB>::new()?),
504
            )
505
5125
            .is_none()
506
        {
507
5125
            Ok(())
508
        } else {
509
            Err(Error::Core(bonsaidb_core::Error::SchemaAlreadyRegistered(
510
                DB::schema_name(),
511
            )))
512
        }
513
5125
    }
514

            
515
35325
    fn validate_name(name: &str) -> Result<(), Error> {
516
285496
        if name.chars().enumerate().all(|(index, c)| {
517
285496
            c.is_ascii_alphanumeric()
518
14914
                || (index == 0 && c == '_')
519
7022
                || (index > 0 && (c == '.' || c == '-'))
520
285496
        }) {
521
35175
            Ok(())
522
        } else {
523
150
            Err(Error::Core(bonsaidb_core::Error::InvalidDatabaseName(
524
150
                name.to_owned(),
525
150
            )))
526
        }
527
35325
    }
528

            
529
    /// Restricts an unauthenticated instance to having `effective_permissions`.
530
    /// Returns `None` if a session has already been established.
531
    #[must_use]
532
    pub fn with_effective_permissions(&self, effective_permissions: Permissions) -> Option<Self> {
533
        if self.effective_session.is_some() {
534
            None
535
        } else {
536
            Some(Self {
537
                instance: self.instance.clone(),
538
                authentication: self.authentication.clone(),
539
                effective_session: Some(Arc::new(Session {
540
                    id: None,
541
                    authentication: SessionAuthentication::None,
542
                    permissions: effective_permissions,
543
                })),
544
            })
545
        }
546
    }
547

            
548
    /// Converts this instance into its blocking version, which is able to be
549
    /// used without async. The returned instance uses the current Tokio runtime
550
    /// handle to spawn blocking tasks.
551
    ///
552
    /// # Panics
553
    ///
554
    /// Panics if called outside the context of a Tokio runtime.
555
    #[cfg(feature = "async")]
556
4362
    pub fn into_async(self) -> crate::AsyncStorage {
557
4362
        self.into_async_with_runtime(tokio::runtime::Handle::current())
558
4362
    }
559

            
560
    /// Converts this instance into its blocking version, which is able to be
561
    /// used without async. The returned instance uses the provided runtime
562
    /// handle to spawn blocking tasks.
563
    #[cfg(feature = "async")]
564
4362
    pub fn into_async_with_runtime(self, runtime: tokio::runtime::Handle) -> crate::AsyncStorage {
565
4362
        crate::AsyncStorage {
566
4362
            storage: self,
567
4362
            runtime: Arc::new(runtime),
568
4362
        }
569
4362
    }
570

            
571
    /// Converts this instance into its blocking version, which is able to be
572
    /// used without async. The returned instance uses the current Tokio runtime
573
    /// handle to spawn blocking tasks.
574
    ///
575
    /// # Panics
576
    ///
577
    /// Panics if called outside the context of a Tokio runtime.
578
    #[cfg(feature = "async")]
579
    pub fn to_async(&self) -> crate::AsyncStorage {
580
        self.clone().into_async()
581
    }
582

            
583
    /// Converts this instance into its blocking version, which is able to be
584
    /// used without async. The returned instance uses the provided runtime
585
    /// handle to spawn blocking tasks.
586
    #[cfg(feature = "async")]
587
    pub fn to_async_with_runtime(&self, runtime: tokio::runtime::Handle) -> crate::AsyncStorage {
588
        self.clone().into_async_with_runtime(runtime)
589
    }
590
}
591

            
592
impl Debug for Data {
593
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
594
        let mut f = f.debug_struct("Data");
595
        f.field("lock", &self.lock)
596
            .field("path", &self.path)
597
            .field("parallelization", &self.parallelization)
598
            .field("threadpool", &self.threadpool)
599
            .field("file_manager", &self.file_manager)
600
            .field("tasks", &self.tasks)
601
            .field("available_databases", &self.available_databases)
602
            .field("open_roots", &self.open_roots)
603
            .field("authenticated_permissions", &self.authenticated_permissions)
604
            .field("sessions", &self.sessions)
605
            .field("subscribers", &self.subscribers)
606
            .field("key_value_persistence", &self.key_value_persistence)
607
            .field("chunk_cache", &self.chunk_cache)
608
            .field(
609
                "check_view_integrity_on_database_open",
610
                &self.check_view_integrity_on_database_open,
611
            )
612
            .field("relay", &self.relay);
613

            
614
        if let Some(schemas) = self.schemas.try_read() {
615
            let mut schemas = schemas.keys().collect::<Vec<_>>();
616
            schemas.sort();
617
            f.field("schemas", &schemas);
618
        } else {
619
            f.field("schemas", &"RwLock locked");
620
        }
621

            
622
        #[cfg(feature = "password-hashing")]
623
        f.field("argon", &self.argon);
624
        #[cfg(feature = "encryption")]
625
        {
626
            f.field("vault", &self.vault)
627
                .field("default_encryption_key", &self.default_encryption_key);
628
        }
629
        #[cfg(any(feature = "compression", feature = "encryption"))]
630
        f.field("tree_vault", &self.tree_vault);
631

            
632
        f.finish()
633
    }
634
}
635

            
636
impl StorageInstance {
637
    #[cfg_attr(
638
        not(any(feature = "encryption", feature = "compression")),
639
        allow(unused_mut)
640
    )]
641
2638878
    pub(crate) fn open_roots(&self, name: &str) -> Result<Context, Error> {
642
2638878
        let mut open_roots = self.data.open_roots.lock();
643
2638878
        if let Some(roots) = open_roots.get(name) {
644
2603988
            Ok(roots.clone())
645
        } else {
646
34890
            let task_name = name.to_string();
647
34890

            
648
34890
            let mut config = nebari::Config::new(self.data.path.join(task_name))
649
34890
                .file_manager(self.data.file_manager.clone())
650
34890
                .cache(self.data.chunk_cache.clone())
651
34890
                .shared_thread_pool(&self.data.threadpool);
652

            
653
            #[cfg(any(feature = "encryption", feature = "compression"))]
654
34890
            if let Some(vault) = self.data.tree_vault.clone() {
655
9006
                config = config.vault(vault);
656
26142
            }
657

            
658
34890
            let roots = config.open().map_err(Error::from)?;
659
34890
            let context = Context::new(
660
34890
                roots,
661
34890
                self.data.key_value_persistence.clone(),
662
34890
                Some(self.data.lock.clone()),
663
34890
            );
664
34890

            
665
34890
            open_roots.insert(name.to_owned(), context.clone());
666
34890

            
667
34890
            Ok(context)
668
        }
669
2638878
    }
670

            
671
4333734
    pub(crate) fn tasks(&self) -> &'_ TaskManager {
672
4333734
        &self.data.tasks
673
4333734
    }
674

            
675
2638878
    pub(crate) fn check_view_integrity_on_database_open(&self) -> bool {
676
2638878
        self.data.check_view_integrity_on_database_open
677
2638878
    }
678

            
679
3712
    pub(crate) fn relay(&self) -> &'_ Relay {
680
3712
        &self.data.relay
681
3712
    }
682

            
683
    /// Opens a database through a generic-free trait.
684
2581381
    pub(crate) fn database_without_schema(
685
2581381
        &self,
686
2581381
        name: &str,
687
2581381
        storage: Option<&Storage>,
688
2581381
        expected_schema: Option<SchemaName>,
689
2581381
    ) -> Result<Database, Error> {
690
        // TODO switch to upgradable read now that we are on parking_lot
691
2576876
        let stored_schema = {
692
2581381
            let available_databases = self.data.available_databases.read();
693
2581381
            available_databases
694
2581381
                .get(name)
695
2581381
                .ok_or_else(|| {
696
4505
                    Error::Core(bonsaidb_core::Error::DatabaseNotFound(name.to_string()))
697
2581381
                })?
698
2576876
                .clone()
699
        };
700

            
701
2576876
        if let Some(expected_schema) = expected_schema {
702
28358
            if stored_schema != expected_schema {
703
                return Err(Error::Core(bonsaidb_core::Error::SchemaMismatch {
704
                    database_name: name.to_owned(),
705
                    schema: expected_schema,
706
                    stored_schema,
707
                }));
708
28358
            }
709
2548518
        }
710

            
711
2576876
        let mut schemas = self.data.schemas.write();
712
2576876
        let storage =
713
2576876
            storage.map_or_else(|| Cow::Owned(Storage::from(self.clone())), Cow::Borrowed);
714
2576876
        if let Some(schema) = schemas.get_mut(&stored_schema) {
715
2576876
            let db = schema.open(name.to_string(), storage.as_ref())?;
716
2576876
            Ok(db)
717
        } else {
718
            // The schema was stored, the user is requesting the same schema,
719
            // but it isn't registerd with the storage currently.
720
            Err(Error::Core(bonsaidb_core::Error::SchemaNotRegistered(
721
                stored_schema,
722
            )))
723
        }
724
2581381
    }
725

            
726
68
    fn update_user_with_named_id<
727
68
        'user,
728
68
        'other,
729
68
        Col: NamedCollection<PrimaryKey = u64>,
730
68
        U: Nameable<'user, u64> + Send + Sync,
731
68
        O: Nameable<'other, u64> + Send + Sync,
732
68
        F: FnOnce(&mut CollectionDocument<User>, u64) -> Result<bool, bonsaidb_core::Error>,
733
68
    >(
734
68
        &self,
735
68
        user: U,
736
68
        other: O,
737
68
        callback: F,
738
68
    ) -> Result<(), bonsaidb_core::Error> {
739
68
        let admin = self.admin();
740
68
        let other = other.name()?;
741
68
        let user = User::load(user.name()?, &admin)?;
742
68
        let other = other.id::<Col, _>(&admin)?;
743
68
        match (user, other) {
744
68
            (Some(mut user), Some(other)) => {
745
68
                if callback(&mut user, other)? {
746
34
                    user.update(&admin)?;
747
34
                }
748
68
                Ok(())
749
            }
750
            // TODO make this a generic not found with a name parameter.
751
            _ => Err(bonsaidb_core::Error::UserNotFound),
752
        }
753
68
    }
754

            
755
    #[cfg(any(feature = "token-authentication", feature = "password-hashing"))]
756
    #[cfg_attr(
757
        any(
758
            not(feature = "token-authentication"),
759
            not(feature = "password-hashing")
760
        ),
761
        allow(unused_variables, clippy::needless_pass_by_value)
762
    )]
763
844
    fn authenticate_inner(
764
844
        &self,
765
844
        authentication: bonsaidb_core::connection::Authentication,
766
844
        loaded_user: Option<CollectionDocument<User>>,
767
844
        current_session_id: Option<SessionId>,
768
844
        admin: &Database,
769
844
    ) -> Result<Storage, bonsaidb_core::Error> {
770
844
        use bonsaidb_core::connection::Authentication;
771
844
        match authentication {
772
            #[cfg(feature = "token-authentication")]
773
            Authentication::Token {
774
296
                id,
775
296
                now,
776
296
                now_hash,
777
296
                algorithm,
778
296
            } => self.begin_token_authentication(id, now, &now_hash, algorithm, admin),
779
            #[cfg(feature = "token-authentication")]
780
296
            Authentication::TokenChallengeResponse(hash) => {
781
296
                let session_id =
782
296
                    current_session_id.ok_or(bonsaidb_core::Error::InvalidCredentials)?;
783
296
                self.finish_token_authentication(session_id, &hash, admin)
784
            }
785
            #[cfg(feature = "password-hashing")]
786
252
            Authentication::Password { user, password } => {
787
252
                let user = match loaded_user {
788
252
                    Some(user) => user,
789
                    None => {
790
                        User::load(user, admin)?.ok_or(bonsaidb_core::Error::InvalidCredentials)?
791
                    }
792
                };
793
252
                let saved_hash = user
794
252
                    .contents
795
252
                    .argon_hash
796
252
                    .clone()
797
252
                    .ok_or(bonsaidb_core::Error::InvalidCredentials)?;
798

            
799
252
                self.data
800
252
                    .argon
801
252
                    .verify(user.header.id, password, saved_hash)?;
802
252
                self.assume_user(user, admin)
803
            }
804
        }
805
844
    }
806

            
807
400
    fn assume_user(
808
400
        &self,
809
400
        user: CollectionDocument<User>,
810
400
        admin: &Database,
811
400
    ) -> Result<Storage, bonsaidb_core::Error> {
812
400
        let permissions = user.contents.effective_permissions(
813
400
            admin,
814
400
            &admin.storage().instance.data.authenticated_permissions,
815
400
        )?;
816

            
817
400
        let mut sessions = self.data.sessions.write();
818
400
        sessions.last_session_id += 1;
819
400
        let session_id = SessionId(sessions.last_session_id);
820
400
        let session = Session {
821
400
            id: Some(session_id),
822
400
            authentication: SessionAuthentication::Identity(Arc::new(Identity::User {
823
400
                id: user.header.id,
824
400
                username: user.contents.username,
825
400
            })),
826
400
            permissions,
827
400
        };
828
400
        let authentication = Arc::new(AuthenticatedSession {
829
400
            storage: Arc::downgrade(&self.data),
830
400
            session: Mutex::new(session.clone()),
831
400
        });
832
400
        sessions.sessions.insert(session_id, authentication.clone());
833
400

            
834
400
        Ok(Storage {
835
400
            instance: self.clone(),
836
400
            authentication: Some(authentication),
837
400
            effective_session: Some(Arc::new(session)),
838
400
        })
839
400
    }
840

            
841
220
    fn assume_role(
842
220
        &self,
843
220
        role: CollectionDocument<Role>,
844
220
        admin: &Database,
845
220
    ) -> Result<Storage, bonsaidb_core::Error> {
846
220
        let permissions = role.contents.effective_permissions(
847
220
            admin,
848
220
            &admin.storage().instance.data.authenticated_permissions,
849
220
        )?;
850

            
851
220
        let mut sessions = self.data.sessions.write();
852
220
        sessions.last_session_id += 1;
853
220
        let session_id = SessionId(sessions.last_session_id);
854
220
        let session = Session {
855
220
            id: Some(session_id),
856
220
            authentication: SessionAuthentication::Identity(Arc::new(Identity::Role {
857
220
                id: role.header.id,
858
220
                name: role.contents.name,
859
220
            })),
860
220
            permissions,
861
220
        };
862
220
        let authentication = Arc::new(AuthenticatedSession {
863
220
            storage: Arc::downgrade(&self.data),
864
220
            session: Mutex::new(session.clone()),
865
220
        });
866
220
        sessions.sessions.insert(session_id, authentication.clone());
867
220

            
868
220
        Ok(Storage {
869
220
            instance: self.clone(),
870
220
            authentication: Some(authentication),
871
220
            effective_session: Some(Arc::new(session)),
872
220
        })
873
220
    }
874

            
875
440
    fn add_permission_group_to_user_inner(
876
440
        user: &mut CollectionDocument<User>,
877
440
        permission_group_id: u64,
878
440
    ) -> bool {
879
440
        if user.contents.groups.contains(&permission_group_id) {
880
220
            false
881
        } else {
882
220
            user.contents.groups.push(permission_group_id);
883
220
            true
884
        }
885
440
    }
886

            
887
296
    fn remove_permission_group_from_user_inner(
888
296
        user: &mut CollectionDocument<User>,
889
296
        permission_group_id: u64,
890
296
    ) -> bool {
891
296
        let old_len = user.contents.groups.len();
892
296
        user.contents.groups.retain(|id| id != &permission_group_id);
893
296
        old_len != user.contents.groups.len()
894
296
    }
895

            
896
296
    fn add_role_to_user_inner(user: &mut CollectionDocument<User>, role_id: u64) -> bool {
897
296
        if user.contents.roles.contains(&role_id) {
898
148
            false
899
        } else {
900
148
            user.contents.roles.push(role_id);
901
148
            true
902
        }
903
296
    }
904

            
905
296
    fn remove_role_from_user_inner(user: &mut CollectionDocument<User>, role_id: u64) -> bool {
906
296
        let old_len = user.contents.roles.len();
907
296
        user.contents.roles.retain(|id| id != &role_id);
908
296
        old_len != user.contents.roles.len()
909
296
    }
910
}
911

            
912
pub trait DatabaseOpener: Send + Sync {
913
    fn schematic(&self) -> &'_ Schematic;
914
    fn open(&self, name: String, storage: &Storage) -> Result<Database, Error>;
915
}
916

            
917
pub struct StorageSchemaOpener<DB: Schema> {
918
    schematic: Schematic,
919
    _phantom: PhantomData<DB>,
920
}
921

            
922
impl<DB> StorageSchemaOpener<DB>
923
where
924
    DB: Schema,
925
{
926
5516
    pub fn new() -> Result<Self, Error> {
927
5516
        let schematic = DB::schematic()?;
928
5516
        Ok(Self {
929
5516
            schematic,
930
5516
            _phantom: PhantomData,
931
5516
        })
932
5516
    }
933
}
934

            
935
impl<DB> DatabaseOpener for StorageSchemaOpener<DB>
936
where
937
    DB: Schema,
938
{
939
160
    fn schematic(&self) -> &'_ Schematic {
940
160
        &self.schematic
941
160
    }
942

            
943
78156
    fn open(&self, name: String, storage: &Storage) -> Result<Database, Error> {
944
78156
        let roots = storage.instance.open_roots(&name)?;
945
78156
        let db = Database::new::<DB, _>(name, roots, storage)?;
946
78156
        Ok(db)
947
78156
    }
948
}
949

            
950
impl HasSession for StorageInstance {
951
    fn session(&self) -> Option<&Session> {
952
        None
953
    }
954
}
955

            
956
impl StorageConnection for StorageInstance {
957
    type Authenticated = Storage;
958
    type Database = Database;
959

            
960
62002
    fn admin(&self) -> Self::Database {
961
62002
        Database::new::<Admin, _>(
962
62002
            ADMIN_DATABASE_NAME,
963
62002
            self.open_roots(ADMIN_DATABASE_NAME).unwrap(),
964
62002
            &Storage::from(self.clone()),
965
62002
        )
966
62002
        .unwrap()
967
62002
    }
968

            
969
    #[cfg_attr(feature = "tracing", tracing::instrument(
970
        level = "trace",
971
        skip(self, schema),
972
        fields(
973
            schema.authority = schema.authority.as_ref(),
974
            schema.name = schema.name.as_ref(),
975
        )
976
    ))]
977
35321
    fn create_database_with_schema(
978
35321
        &self,
979
35321
        name: &str,
980
35321
        schema: SchemaName,
981
35321
        only_if_needed: bool,
982
35321
    ) -> Result<(), bonsaidb_core::Error> {
983
35321
        Storage::validate_name(name)?;
984

            
985
        {
986
35173
            let schemas = self.data.schemas.read();
987
35173
            if !schemas.contains_key(&schema) {
988
148
                return Err(bonsaidb_core::Error::SchemaNotRegistered(schema));
989
35025
            }
990
35025
        }
991
35025

            
992
35025
        let mut available_databases = self.data.available_databases.write();
993
35025
        let admin = self.admin();
994
35025
        if !available_databases.contains_key(name) {
995
33929
            admin
996
33929
                .collection::<DatabaseRecord>()
997
33929
                .push(&admin::Database {
998
33929
                    name: name.to_string(),
999
33929
                    schema: schema.clone(),
33929
                })?;
33929
            available_databases.insert(name.to_string(), schema);
1096
        } else if !only_if_needed {
148
            return Err(bonsaidb_core::Error::DatabaseNameAlreadyTaken(
148
                name.to_string(),
148
            ));
948
        }

            
34877
        Ok(())
35321
    }

            
10533
    fn database<DB: Schema>(&self, name: &str) -> Result<Self::Database, bonsaidb_core::Error> {
10533
        self.database_without_schema(name, None, Some(DB::schema_name()))
10533
            .map_err(bonsaidb_core::Error::from)
10533
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip(self)))]
18296
    fn delete_database(&self, name: &str) -> Result<(), bonsaidb_core::Error> {
18296
        let admin = self.admin();
18296
        let mut available_databases = self.data.available_databases.write();
18296
        available_databases.remove(name);
18296

            
18296
        let mut open_roots = self.data.open_roots.lock();
18296
        open_roots.remove(name);
18296

            
18296
        let database_folder = self.data.path.join(name);
18296
        if database_folder.exists() {
18042
            let file_manager = self.data.file_manager.clone();
18042
            file_manager
18042
                .delete_directory(&database_folder)
18042
                .map_err(Error::Nebari)?;
254
        }

            
18296
        if let Some(entry) = admin
18296
            .view::<database::ByName>()
18296
            .with_key(name)
18296
            .query()?
18296
            .first()
        {
18148
            admin.delete::<DatabaseRecord, _>(&entry.source)?;

            
18148
            Ok(())
        } else {
148
            Err(bonsaidb_core::Error::DatabaseNotFound(name.to_string()))
        }
18296
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
148
    fn list_databases(&self) -> Result<Vec<connection::Database>, bonsaidb_core::Error> {
148
        let available_databases = self.data.available_databases.read();
148
        Ok(available_databases
148
            .iter()
5444
            .map(|(name, schema)| connection::Database {
5444
                name: name.to_string(),
5444
                schema: schema.clone(),
5444
            })
148
            .collect())
148
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
148
    fn list_available_schemas(&self) -> Result<Vec<SchemaSummary>, bonsaidb_core::Error> {
148
        let available_databases = self.data.available_databases.read();
148
        let schemas = self.data.schemas.read();
148

            
148
        Ok(available_databases
148
            .values()
148
            .unique()
440
            .filter_map(|name| {
440
                schemas
440
                    .get(name)
440
                    .map(|opener| SchemaSummary::from(opener.schematic()))
440
            })
148
            .collect())
148
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
584
    fn create_user(&self, username: &str) -> Result<u64, bonsaidb_core::Error> {
584
        let result = self
584
            .admin()
584
            .collection::<User>()
584
            .push(&User::default_with_username(username))?;
548
        Ok(result.id)
584
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
8
    fn delete_user<'user, U: Nameable<'user, u64> + Send + Sync>(
8
        &self,
8
        user: U,
8
    ) -> Result<(), bonsaidb_core::Error> {
8
        let admin = self.admin();
8
        let user = User::load(user, &admin)?.ok_or(bonsaidb_core::Error::UserNotFound)?;
8
        user.delete(&admin)?;

            
8
        Ok(())
8
    }

            
    #[cfg(feature = "password-hashing")]
    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
180
    fn set_user_password<'user, U: Nameable<'user, u64> + Send + Sync>(
180
        &self,
180
        user: U,
180
        password: bonsaidb_core::connection::SensitiveString,
180
    ) -> Result<(), bonsaidb_core::Error> {
180
        let admin = self.admin();
180
        let mut user = User::load(user, &admin)?.ok_or(bonsaidb_core::Error::UserNotFound)?;
180
        user.contents.argon_hash = Some(self.data.argon.hash(user.header.id, password)?);
180
        user.update(&admin)
180
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
    #[cfg(any(feature = "token-authentication", feature = "password-hashing"))]
    fn authenticate(
        &self,
        authentication: bonsaidb_core::connection::Authentication,
    ) -> Result<Self::Authenticated, bonsaidb_core::Error> {
        let admin = self.admin();
        self.authenticate_inner(authentication, None, None, &admin)
            .map(Storage::from)
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
    fn assume_identity(
        &self,
        identity: IdentityReference<'_>,
    ) -> Result<Self::Authenticated, bonsaidb_core::Error> {
        let admin = self.admin();
        match identity {
            IdentityReference::User(user) => {
                let user =
                    User::load(user, &admin)?.ok_or(bonsaidb_core::Error::InvalidCredentials)?;
                self.assume_user(user, &admin).map(Storage::from)
            }
            IdentityReference::Role(role) => {
                let role =
                    Role::load(role, &admin)?.ok_or(bonsaidb_core::Error::InvalidCredentials)?;
                self.assume_role(role, &admin).map(Storage::from)
            }
            _ => Err(bonsaidb_core::Error::InvalidCredentials),
        }
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
    fn add_permission_group_to_user<
        'user,
        'group,
        U: Nameable<'user, u64> + Send + Sync,
        G: Nameable<'group, u64> + Send + Sync,
    >(
        &self,
        user: U,
        permission_group: G,
    ) -> Result<(), bonsaidb_core::Error> {
        self.update_user_with_named_id::<PermissionGroup, _, _, _>(
            user,
            permission_group,
            |user, permission_group_id| {
                Ok(Self::add_permission_group_to_user_inner(
                    user,
                    permission_group_id,
                ))
            },
        )
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
    fn remove_permission_group_from_user<
        'user,
        'group,
        U: Nameable<'user, u64> + Send + Sync,
        G: Nameable<'group, u64> + Send + Sync,
    >(
        &self,
        user: U,
        permission_group: G,
    ) -> Result<(), bonsaidb_core::Error> {
        self.update_user_with_named_id::<PermissionGroup, _, _, _>(
            user,
            permission_group,
            |user, permission_group_id| {
                Ok(Self::remove_permission_group_from_user_inner(
                    user,
                    permission_group_id,
                ))
            },
        )
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
    fn add_role_to_user<
        'user,
        'group,
        U: Nameable<'user, u64> + Send + Sync,
        G: Nameable<'group, u64> + Send + Sync,
    >(
        &self,
        user: U,
        role: G,
    ) -> Result<(), bonsaidb_core::Error> {
        self.update_user_with_named_id::<PermissionGroup, _, _, _>(user, role, |user, role_id| {
            Ok(Self::add_role_to_user_inner(user, role_id))
        })
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
    fn remove_role_from_user<
        'user,
        'group,
        U: Nameable<'user, u64> + Send + Sync,
        G: Nameable<'group, u64> + Send + Sync,
    >(
        &self,
        user: U,
        role: G,
    ) -> Result<(), bonsaidb_core::Error> {
        self.update_user_with_named_id::<Role, _, _, _>(user, role, |user, role_id| {
            Ok(Self::remove_role_from_user_inner(user, role_id))
        })
    }
}

            
impl HasSession for Storage {
5215983
    fn session(&self) -> Option<&Session> {
5215983
        self.effective_session.as_deref()
5215983
    }
}

            
impl StorageConnection for Storage {
    type Authenticated = Self;
    type Database = Database;

            
6441
    fn admin(&self) -> Self::Database {
6441
        self.instance.admin()
6441
    }

            
35321
    fn create_database_with_schema(
35321
        &self,
35321
        name: &str,
35321
        schema: SchemaName,
35321
        only_if_needed: bool,
35321
    ) -> Result<(), bonsaidb_core::Error> {
35321
        self.check_permission(
35321
            database_resource_name(name),
35321
            &BonsaiAction::Server(ServerAction::CreateDatabase),
35321
        )?;
35321
        self.instance
35321
            .create_database_with_schema(name, schema, only_if_needed)
35321
    }

            
10533
    fn database<DB: Schema>(&self, name: &str) -> Result<Self::Database, bonsaidb_core::Error> {
10533
        self.instance.database::<DB>(name)
10533
    }

            
18296
    fn delete_database(&self, name: &str) -> Result<(), bonsaidb_core::Error> {
18296
        self.check_permission(
18296
            database_resource_name(name),
18296
            &BonsaiAction::Server(ServerAction::DeleteDatabase),
18296
        )?;
18296
        self.instance.delete_database(name)
18296
    }

            
148
    fn list_databases(&self) -> Result<Vec<connection::Database>, bonsaidb_core::Error> {
148
        self.check_permission(
148
            bonsaidb_resource_name(),
148
            &BonsaiAction::Server(ServerAction::ListDatabases),
148
        )?;
148
        self.instance.list_databases()
148
    }

            
148
    fn list_available_schemas(&self) -> Result<Vec<SchemaSummary>, bonsaidb_core::Error> {
148
        self.check_permission(
148
            bonsaidb_resource_name(),
148
            &BonsaiAction::Server(ServerAction::ListAvailableSchemas),
148
        )?;
148
        self.instance.list_available_schemas()
148
    }

            
620
    fn create_user(&self, username: &str) -> Result<u64, bonsaidb_core::Error> {
620
        self.check_permission(
620
            bonsaidb_resource_name(),
620
            &BonsaiAction::Server(ServerAction::CreateUser),
620
        )?;
584
        self.instance.create_user(username)
620
    }

            
8
    fn delete_user<'user, U: Nameable<'user, u64> + Send + Sync>(
8
        &self,
8
        user: U,
8
    ) -> Result<(), bonsaidb_core::Error> {
8
        let admin = self.admin();
8
        let user = user.name()?;
8
        let user_id = user
8
            .id::<User, _>(&admin)?
8
            .ok_or(bonsaidb_core::Error::UserNotFound)?;
8
        self.check_permission(
8
            user_resource_name(user_id),
8
            &BonsaiAction::Server(ServerAction::DeleteUser),
8
        )?;
8
        self.instance.delete_user(user)
8
    }

            
    #[cfg(feature = "password-hashing")]
5
    fn set_user_password<'user, U: Nameable<'user, u64> + Send + Sync>(
5
        &self,
5
        user: U,
5
        password: bonsaidb_core::connection::SensitiveString,
5
    ) -> Result<(), bonsaidb_core::Error> {
5
        let admin = self.admin();
5
        let user = user.name()?;
5
        let user_id = user
5
            .id::<User, _>(&admin)?
5
            .ok_or(bonsaidb_core::Error::UserNotFound)?;
5
        self.check_permission(
5
            user_resource_name(user_id),
5
            &BonsaiAction::Server(ServerAction::SetPassword),
5
        )?;
5
        self.instance.set_user_password(user, password)
5
    }

            
    #[cfg(any(feature = "token-authentication", feature = "password-hashing"))]
    #[cfg_attr(not(feature = "token-authentication"), allow(unused_assignments))]
    #[cfg_attr(not(feature = "password-hashing"), allow(unused_mut))]
844
    fn authenticate(
844
        &self,
844
        authentication: bonsaidb_core::connection::Authentication,
844
    ) -> Result<Self, bonsaidb_core::Error> {
844
        let admin = self.admin();
844
        let mut loaded_user = None;
844
        match &authentication {
            #[cfg(feature = "token-authentication")]
296
            bonsaidb_core::connection::Authentication::Token { id, .. } => {
296
                self.check_permission(
296
                    bonsaidb_core::permissions::bonsai::authentication_token_resource_name(*id),
296
                    &BonsaiAction::Server(ServerAction::Authenticate(
296
                        bonsaidb_core::connection::AuthenticationMethod::Token,
296
                    )),
296
                )?;
            }
            #[cfg(feature = "password-hashing")]
252
            bonsaidb_core::connection::Authentication::Password { user, .. } => {
252
                let user =
252
                    User::load(user, &admin)?.ok_or(bonsaidb_core::Error::InvalidCredentials)?;
252
                self.check_permission(
252
                    user_resource_name(user.header.id),
252
                    &BonsaiAction::Server(ServerAction::Authenticate(
252
                        bonsaidb_core::connection::AuthenticationMethod::PasswordHash,
252
                    )),
252
                )?;
252
                loaded_user = Some(user);
            }
            #[cfg(feature = "token-authentication")]
296
            bonsaidb_core::connection::Authentication::TokenChallengeResponse(_) => {}
        }
844
        self.instance.authenticate_inner(
844
            authentication,
844
            loaded_user,
844
            self.authentication
844
                .as_ref()
844
                .and_then(|auth| auth.session.lock().id),
844
            &admin,
844
        )
844
    }

            
72
    fn assume_identity(
72
        &self,
72
        identity: IdentityReference<'_>,
72
    ) -> Result<Self::Authenticated, bonsaidb_core::Error> {
72
        match identity {
            IdentityReference::User(user) => {
                let admin = self.admin();
                let user =
                    User::load(user, &admin)?.ok_or(bonsaidb_core::Error::InvalidCredentials)?;
                self.check_permission(
                    user_resource_name(user.header.id),
                    &BonsaiAction::Server(ServerAction::AssumeIdentity),
                )?;
                self.instance.assume_user(user, &admin)
            }
72
            IdentityReference::Role(role) => {
72
                let admin = self.admin();
72
                let role =
72
                    Role::load(role, &admin)?.ok_or(bonsaidb_core::Error::InvalidCredentials)?;
72
                self.check_permission(
72
                    role_resource_name(role.header.id),
72
                    &BonsaiAction::Server(ServerAction::AssumeIdentity),
72
                )?;
72
                self.instance.assume_role(role, &admin)
            }

            
            _ => Err(bonsaidb_core::Error::InvalidCredentials),
        }
72
    }

            
20
    fn add_permission_group_to_user<
20
        'user,
20
        'group,
20
        U: Nameable<'user, u64> + Send + Sync,
20
        G: Nameable<'group, u64> + Send + Sync,
20
    >(
20
        &self,
20
        user: U,
20
        permission_group: G,
20
    ) -> Result<(), bonsaidb_core::Error> {
20
        self.instance
20
            .update_user_with_named_id::<PermissionGroup, _, _, _>(
20
                user,
20
                permission_group,
20
                |user, permission_group_id| {
20
                    self.check_permission(
20
                        user_resource_name(user.header.id),
20
                        &BonsaiAction::Server(ServerAction::ModifyUserPermissionGroups),
20
                    )?;
20
                    Ok(StorageInstance::add_permission_group_to_user_inner(
20
                        user,
20
                        permission_group_id,
20
                    ))
20
                },
20
            )
20
    }

            
16
    fn remove_permission_group_from_user<
16
        'user,
16
        'group,
16
        U: Nameable<'user, u64> + Send + Sync,
16
        G: Nameable<'group, u64> + Send + Sync,
16
    >(
16
        &self,
16
        user: U,
16
        permission_group: G,
16
    ) -> Result<(), bonsaidb_core::Error> {
16
        self.instance
16
            .update_user_with_named_id::<PermissionGroup, _, _, _>(
16
                user,
16
                permission_group,
16
                |user, permission_group_id| {
16
                    self.check_permission(
16
                        user_resource_name(user.header.id),
16
                        &BonsaiAction::Server(ServerAction::ModifyUserPermissionGroups),
16
                    )?;
16
                    Ok(StorageInstance::remove_permission_group_from_user_inner(
16
                        user,
16
                        permission_group_id,
16
                    ))
16
                },
16
            )
16
    }

            
16
    fn add_role_to_user<
16
        'user,
16
        'group,
16
        U: Nameable<'user, u64> + Send + Sync,
16
        G: Nameable<'group, u64> + Send + Sync,
16
    >(
16
        &self,
16
        user: U,
16
        role: G,
16
    ) -> Result<(), bonsaidb_core::Error> {
16
        self.instance
16
            .update_user_with_named_id::<PermissionGroup, _, _, _>(user, role, |user, role_id| {
16
                self.check_permission(
16
                    user_resource_name(user.header.id),
16
                    &BonsaiAction::Server(ServerAction::ModifyUserRoles),
16
                )?;
16
                Ok(StorageInstance::add_role_to_user_inner(user, role_id))
16
            })
16
    }

            
16
    fn remove_role_from_user<
16
        'user,
16
        'group,
16
        U: Nameable<'user, u64> + Send + Sync,
16
        G: Nameable<'group, u64> + Send + Sync,
16
    >(
16
        &self,
16
        user: U,
16
        role: G,
16
    ) -> Result<(), bonsaidb_core::Error> {
16
        self.instance
16
            .update_user_with_named_id::<Role, _, _, _>(user, role, |user, role_id| {
16
                self.check_permission(
16
                    user_resource_name(user.header.id),
16
                    &BonsaiAction::Server(ServerAction::ModifyUserRoles),
16
                )?;
16
                Ok(StorageInstance::remove_role_from_user_inner(user, role_id))
16
            })
16
    }
}

            
1
#[test]
1
fn name_validation_tests() {
1
    assert!(matches!(Storage::validate_name("azAZ09.-"), Ok(())));
1
    assert!(matches!(
1
        Storage::validate_name("_internal-names-work"),
        Ok(())
    ));
1
    assert!(matches!(
1
        Storage::validate_name("-alphaunmericfirstrequired"),
        Err(Error::Core(bonsaidb_core::Error::InvalidDatabaseName(_)))
    ));
1
    assert!(matches!(
1
        Storage::validate_name("\u{2661}"),
        Err(Error::Core(bonsaidb_core::Error::InvalidDatabaseName(_)))
    ));
1
}

            
/// The unique id of a [`Storage`] instance.
34890
#[derive(Clone, Copy, Eq, PartialEq, Hash)]
pub struct StorageId(u64);

            
impl StorageId {
    /// Returns the id as a u64.
    #[must_use]
    pub const fn as_u64(self) -> u64 {
        self.0
    }
}

            
impl Debug for StorageId {
9703
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
9703
        // let formatted_length = format!();
9703
        write!(f, "{:016x}", self.0)
9703
    }
}

            
impl Display for StorageId {
9703
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
9703
        Debug::fmt(self, f)
9703
    }
}

            
2610413
#[derive(Debug, Clone)]
#[cfg(any(feature = "compression", feature = "encryption"))]
pub(crate) struct TreeVault {
    #[cfg(feature = "compression")]
    compression: Option<Compression>,
    #[cfg(feature = "encryption")]
    pub key: Option<KeyId>,
    #[cfg(feature = "encryption")]
    pub vault: Arc<Vault>,
}

            
#[cfg(all(feature = "compression", feature = "encryption"))]
impl TreeVault {
2134291
    pub(crate) fn new_if_needed(
2134291
        key: Option<KeyId>,
2134291
        vault: &Arc<Vault>,
2134291
        compression: Option<Compression>,
2134291
    ) -> Option<Self> {
2134291
        if key.is_none() && compression.is_none() {
2084642
            None
        } else {
49649
            Some(Self {
49649
                key,
49649
                compression,
49649
                vault: vault.clone(),
49649
            })
        }
2134291
    }

            
8174505
    fn header(&self, compressed: bool) -> u8 {
8174505
        let mut bits = if self.key.is_some() { 0b1000_0000 } else { 0 };

            
8174505
        if compressed {
3765981
            if let Some(compression) = self.compression {
3765981
                bits |= compression as u8;
3765981
            }
4408524
        }

            
8174505
        bits
8174505
    }
}

            
#[cfg(all(feature = "compression", feature = "encryption"))]
impl nebari::Vault for TreeVault {
    type Error = Error;

            
8174505
    fn encrypt(&self, payload: &[u8]) -> Result<Vec<u8>, Error> {
8174505
        // TODO this allocates too much. The vault should be able to do an
8174505
        // in-place encryption operation so that we can use a single buffer.
8174505
        let mut includes_compression = false;
8174505
        let compressed = match (payload.len(), self.compression) {
3770446
            (128..=usize::MAX, Some(Compression::Lz4)) => {
3765981
                includes_compression = true;
3765981
                Cow::Owned(lz4_flex::block::compress_prepend_size(payload))
            }
4408524
            _ => Cow::Borrowed(payload),
        };

            
8174505
        let mut complete = if let Some(key) = &self.key {
72315
            self.vault.encrypt_payload(key, &compressed, None)?
        } else {
8102190
            compressed.into_owned()
        };

            
8174505
        let header = self.header(includes_compression);
8174505
        if header != 0 {
3832054
            let header = [b't', b'r', b'v', header];
3832054
            complete.splice(0..0, header);
4342451
        }

            
8174505
        Ok(complete)
8174505
    }

            
994656
    fn decrypt(&self, payload: &[u8]) -> Result<Vec<u8>, Error> {
994656
        if payload.len() >= 4 && &payload[0..3] == b"trv" {
756460
            let header = payload[3];
756460
            let payload = &payload[4..];
756460
            let encrypted = (header & 0b1000_0000) != 0;
756460
            let compression = header & 0b0111_1111;
756460
            let decrypted = if encrypted {
15658
                Cow::Owned(self.vault.decrypt_payload(payload, None)?)
            } else {
740802
                Cow::Borrowed(payload)
            };
            #[allow(clippy::single_match)] // Make it an error when we add a new algorithm
756459
            return Ok(match Compression::from_u8(compression) {
                Some(Compression::Lz4) => {
745412
                    lz4_flex::block::decompress_size_prepended(&decrypted).map_err(Error::from)?
                }
11047
                None => decrypted.into_owned(),
            });
238196
        }
238196
        self.vault.decrypt_payload(payload, None)
994656
    }
}

            
/// Functionality that is available on both [`Storage`] and
/// [`AsyncStorage`](crate::AsyncStorage).
pub trait StorageNonBlocking: Sized {
    /// Returns the path of the database storage.
    #[must_use]
    fn path(&self) -> &Path;

            
    /// Returns a new instance of [`Storage`] with `session` as the effective
    /// authentication session. This call will only succeed if there is no
    /// current session.
    fn assume_session(&self, session: Session) -> Result<Self, bonsaidb_core::Error>;
}

            
impl StorageNonBlocking for Storage {
6048
    fn path(&self) -> &Path {
6048
        &self.instance.data.path
6048
    }

            
2593908
    fn assume_session(&self, session: Session) -> Result<Storage, bonsaidb_core::Error> {
2593908
        if self.authentication.is_some() {
            // TODO better error
            return Err(bonsaidb_core::Error::InvalidCredentials);
2593908
        }

            
2593908
        let Some(session_id) = session.id else {
2592972
            return Ok(Self {
2592972
                instance: self.instance.clone(),
2592972
                authentication: None,
2592972
                effective_session: Some(Arc::new(session)),
2592972
            });
        };

            
936
        let session_data = self.instance.data.sessions.read();
        // TODO better error
936
        let authentication = session_data
936
            .sessions
936
            .get(&session_id)
936
            .ok_or(bonsaidb_core::Error::InvalidCredentials)?;

            
792
        let authentication_session = authentication.session.lock();
792
        let effective_permissions =
792
            Permissions::merged([&session.permissions, &authentication_session.permissions]);
792
        let effective_session = Session {
792
            id: authentication_session.id,
792
            authentication: authentication_session.authentication.clone(),
792
            permissions: effective_permissions,
792
        };
792

            
792
        Ok(Self {
792
            instance: self.instance.clone(),
792
            authentication: Some(authentication.clone()),
792
            effective_session: Some(Arc::new(effective_session)),
792
        })
2593908
    }
}

            
#[cfg(all(feature = "compression", not(feature = "encryption")))]
impl TreeVault {
    pub(crate) fn new_if_needed(compression: Option<Compression>) -> Option<Self> {
        compression.map(|compression| Self {
            compression: Some(compression),
        })
    }
}

            
#[cfg(all(feature = "compression", not(feature = "encryption")))]
impl nebari::Vault for TreeVault {
    type Error = Error;

            
    fn encrypt(&self, payload: &[u8]) -> Result<Vec<u8>, Error> {
        Ok(match (payload.len(), self.compression) {
            (128..=usize::MAX, Some(Compression::Lz4)) => {
                let mut destination =
                    vec![0; lz4_flex::block::get_maximum_output_size(payload.len()) + 8];
                let compressed_length =
                    lz4_flex::block::compress_into(payload, &mut destination[8..])
                        .expect("lz4-flex documents this shouldn't fail");
                destination.truncate(compressed_length + 8);
                destination[0..4].copy_from_slice(&[b't', b'r', b'v', Compression::Lz4 as u8]);
                // to_le_bytes() makes it compatible with lz4-flex decompress_size_prepended.
                let uncompressed_length =
                    u32::try_from(payload.len()).expect("nebari doesn't support >32 bit blocks");
                destination[4..8].copy_from_slice(&uncompressed_length.to_le_bytes());
                destination
            }
            // TODO this shouldn't copy
            _ => payload.to_vec(),
        })
    }

            
    fn decrypt(&self, payload: &[u8]) -> Result<Vec<u8>, Error> {
        if payload.len() >= 4 && &payload[0..3] == b"trv" {
            let header = payload[3];
            let payload = &payload[4..];
            let encrypted = (header & 0b1000_0000) != 0;
            let compression = header & 0b0111_1111;
            if encrypted {
                return Err(Error::EncryptionDisabled);
            }

            
            #[allow(clippy::single_match)] // Make it an error when we add a new algorithm
            return Ok(match Compression::from_u8(compression) {
                Some(Compression::Lz4) => {
                    lz4_flex::block::decompress_size_prepended(payload).map_err(Error::from)?
                }
                None => payload.to_vec(),
            });
        }
        Ok(payload.to_vec())
    }
}

            
#[cfg(all(not(feature = "compression"), feature = "encryption"))]
impl TreeVault {
    pub(crate) fn new_if_needed(key: Option<KeyId>, vault: &Arc<Vault>) -> Option<Self> {
        key.map(|key| Self {
            key: Some(key),
            vault: vault.clone(),
        })
    }

            
    #[allow(dead_code)] // This implementation is sort of documentation for what it would be. But our Vault payload already can detect if a parsing error occurs, so we don't need a header if only encryption is enabled.
    fn header(&self) -> u8 {
        if self.key.is_some() {
            0b1000_0000
        } else {
            0
        }
    }
}

            
#[cfg(all(not(feature = "compression"), feature = "encryption"))]
impl nebari::Vault for TreeVault {
    type Error = Error;

            
    fn encrypt(&self, payload: &[u8]) -> Result<Vec<u8>, Error> {
        if let Some(key) = &self.key {
            self.vault.encrypt_payload(key, payload, None)
        } else {
            // TODO does this need to copy?
            Ok(payload.to_vec())
        }
    }

            
    fn decrypt(&self, payload: &[u8]) -> Result<Vec<u8>, Error> {
        self.vault.decrypt_payload(payload, None)
    }
}

            
34890
#[derive(Clone, Debug)]
pub struct StorageLock(StorageId, Arc<LockData>);

            
impl StorageLock {
5197
    pub const fn id(&self) -> StorageId {
5197
        self.0
5197
    }
}

            
#[derive(Debug)]
struct LockData(File);

            
impl StorageLock {
5197
    fn new(id: StorageId, file: File) -> Self {
5197
        Self(id, Arc::new(LockData(file)))
5197
    }
}

            
impl Drop for LockData {
5053
    fn drop(&mut self) {
5053
        drop(self.0.unlock());
5053
    }
}