1
use std::{
2
    borrow::Cow,
3
    collections::{HashMap, HashSet},
4
    fmt::{Debug, Display},
5
    fs::{self, File},
6
    io::{Read, Write},
7
    marker::PhantomData,
8
    path::{Path, PathBuf},
9
    sync::{Arc, Weak},
10
};
11

            
12
pub use bonsaidb_core::circulate::Relay;
13
#[cfg(any(feature = "encryption", feature = "compression"))]
14
use bonsaidb_core::document::KeyId;
15
use bonsaidb_core::{
16
    admin::{
17
        self,
18
        database::{self, ByName, Database as DatabaseRecord},
19
        user::User,
20
        Admin, PermissionGroup, Role, ADMIN_DATABASE_NAME,
21
    },
22
    circulate,
23
    connection::{
24
        self, Connection, HasSession, Identity, IdentityReference, LowLevelConnection, Session,
25
        SessionId, StorageConnection,
26
    },
27
    document::CollectionDocument,
28
    permissions::{
29
        bonsai::{
30
            bonsaidb_resource_name, database_resource_name, role_resource_name, user_resource_name,
31
            BonsaiAction, ServerAction,
32
        },
33
        Permissions,
34
    },
35
    schema::{Nameable, NamedCollection, Schema, SchemaName, Schematic},
36
};
37
#[cfg(feature = "password-hashing")]
38
use bonsaidb_core::{connection::Authentication, permissions::bonsai::AuthenticationMethod};
39
use itertools::Itertools;
40
use nebari::{
41
    io::{
42
        any::{AnyFile, AnyFileManager},
43
        FileManager,
44
    },
45
    ChunkCache, ThreadPool,
46
};
47
use parking_lot::{Mutex, RwLock};
48
use rand::{thread_rng, Rng};
49

            
50
#[cfg(feature = "compression")]
51
use crate::config::Compression;
52
#[cfg(feature = "encryption")]
53
use crate::vault::{self, LocalVaultKeyStorage, Vault};
54
use crate::{
55
    config::{KeyValuePersistence, StorageConfiguration},
56
    database::Context,
57
    tasks::{manager::Manager, TaskManager},
58
    Database, Error,
59
};
60

            
61
#[cfg(feature = "password-hashing")]
62
mod argon;
63

            
64
mod backup;
65
mod pubsub;
66
pub use backup::{AnyBackupLocation, BackupLocation};
67

            
68
/// A file-based, multi-database, multi-user database engine. This type blocks
69
/// the current thread when used. See [`AsyncStorage`](crate::AsyncStorage) for
70
/// this type's async counterpart.
71
///
72
/// ## Converting between Blocking and Async Types
73
///
74
/// [`AsyncStorage`](crate::AsyncStorage) and [`Storage`] can be converted to
75
/// and from each other using:
76
///
77
/// - [`AsyncStorage::into_blocking()`](crate::AsyncStorage::into_blocking)
78
/// - [`AsyncStorage::to_blocking()`](crate::AsyncStorage::to_blocking)
79
/// - [`AsyncStorage::as_blocking()`](crate::AsyncStorage::as_blocking)
80
/// - [`Storage::into_async()`]
81
/// - [`Storage::to_async()`]
82
/// - [`Storage::into_async_with_runtime()`]
83
/// - [`Storage::to_async_with_runtime()`]
84
///
85
/// ## Converting from `Database::open` to `Storage::open`
86
///
87
/// [`Database::open`](Database::open) is a simple method that uses `Storage` to
88
/// create a database named `default` with the schema provided. These two ways
89
/// of opening the database are the same:
90
///
91
/// ```rust
92
/// // `bonsaidb_core` is re-exported to `bonsaidb::core` or `bonsaidb_local::core`.
93
/// use bonsaidb_core::{connection::StorageConnection, schema::Schema};
94
/// // `bonsaidb_local` is re-exported to `bonsaidb::local` if using the omnibus crate.
95
/// use bonsaidb_local::{
96
///     config::{Builder, StorageConfiguration},
97
///     Database, Storage,
98
/// };
99
/// # fn open<MySchema: Schema>() -> anyhow::Result<()> {
100
/// // This creates a Storage instance, creates a database, and returns it.
101
/// let db = Database::open::<MySchema>(StorageConfiguration::new("my-db.bonsaidb"))?;
102
///
103
/// // This is the equivalent code being executed:
104
/// let storage =
105
///     Storage::open(StorageConfiguration::new("my-db.bonsaidb").with_schema::<MySchema>()?)?;
106
/// storage.create_database::<MySchema>("default", true)?;
107
/// let db = storage.database::<MySchema>("default")?;
108
/// #     Ok(())
109
/// # }
110
/// ```
111
///
112
/// ## Using multiple databases
113
///
114
/// This example shows how to use `Storage` to create and use multiple databases
115
/// with multiple schemas:
116
///
117
/// ```rust
118
/// use bonsaidb_core::{
119
///     connection::StorageConnection,
120
///     schema::{Collection, Schema},
121
/// };
122
/// use bonsaidb_local::{
123
///     config::{Builder, StorageConfiguration},
124
///     Storage,
125
/// };
126
/// use serde::{Deserialize, Serialize};
127
///
128
/// #[derive(Debug, Schema)]
129
/// #[schema(name = "my-schema", collections = [BlogPost, Author])]
130
/// # #[schema(core = bonsaidb_core)]
131
/// struct MySchema;
132
///
133
/// #[derive(Debug, Serialize, Deserialize, Collection)]
134
/// #[collection(name = "blog-posts")]
135
/// # #[collection(core = bonsaidb_core)]
136
/// struct BlogPost {
137
///     pub title: String,
138
///     pub contents: String,
139
///     pub author_id: u64,
140
/// }
141
///
142
/// #[derive(Debug, Serialize, Deserialize, Collection)]
143
/// #[collection(name = "blog-posts")]
144
/// # #[collection(core = bonsaidb_core)]
145
/// struct Author {
146
///     pub name: String,
147
/// }
148
///
149
/// # fn open() -> anyhow::Result<()> {
150
/// let storage = Storage::open(
151
///     StorageConfiguration::new("my-db.bonsaidb")
152
///         .with_schema::<BlogPost>()?
153
///         .with_schema::<MySchema>()?,
154
/// )?;
155
///
156
/// storage.create_database::<BlogPost>("ectons-blog", true)?;
157
/// let ectons_blog = storage.database::<BlogPost>("ectons-blog")?;
158
/// storage.create_database::<MySchema>("another-db", true)?;
159
/// let another_db = storage.database::<MySchema>("another-db")?;
160
/// # Ok(())
161
/// # }
162
/// ```
163
8743393
#[derive(Debug, Clone)]
164
#[must_use]
165
pub struct Storage {
166
    pub(crate) instance: StorageInstance,
167
    pub(crate) authentication: Option<Arc<AuthenticatedSession>>,
168
    effective_session: Option<Arc<Session>>,
169
}
170

            
171
#[derive(Debug, Clone)]
172
pub struct AuthenticatedSession {
173
    // TODO: client_data,
174
    storage: Weak<Data>,
175
    pub session: Session,
176
}
177

            
178
3407
#[derive(Debug, Default)]
179
pub struct SessionSubscribers {
180
    pub subscribers: HashMap<u64, SessionSubscriber>,
181
    pub subscribers_by_session: HashMap<SessionId, HashSet<u64>>,
182
    pub last_id: u64,
183
}
184

            
185
impl SessionSubscribers {
186
    pub fn unregister(&mut self, subscriber_id: u64) {
187
692
        if let Some(session_id) = self
188
692
            .subscribers
189
692
            .remove(&subscriber_id)
190
692
            .and_then(|sub| sub.session_id)
191
        {
192
            if let Some(session_subscribers) = self.subscribers_by_session.get_mut(&session_id) {
193
                session_subscribers.remove(&subscriber_id);
194
            }
195
692
        }
196
692
    }
197
}
198

            
199
#[derive(Debug)]
200
pub struct SessionSubscriber {
201
    pub session_id: Option<SessionId>,
202
    pub subscriber: circulate::Subscriber,
203
}
204

            
205
impl Drop for AuthenticatedSession {
206
    fn drop(&mut self) {
207
210
        if let Some(id) = self.session.id.take() {
208
210
            if let Some(storage) = self.storage.upgrade() {
209
                // Deregister the session id once dropped.
210
                let mut sessions = storage.sessions.write();
211
                sessions.sessions.remove(&id);
212

            
213
                // Remove all subscribers.
214
                let mut sessions = storage.subscribers.write();
215
                for id in sessions
216
                    .subscribers_by_session
217
                    .remove(&id)
218
                    .into_iter()
219
                    .flatten()
220
                {
221
                    sessions.subscribers.remove(&id);
222
                }
223
210
            }
224
        }
225
210
    }
226
}
227

            
228
3377
#[derive(Debug, Default)]
229
struct AuthenticatedSessions {
230
    sessions: HashMap<SessionId, Arc<AuthenticatedSession>>,
231
    last_session_id: u64,
232
}
233

            
234
10930212
#[derive(Debug, Clone)]
235
pub struct StorageInstance {
236
    data: Arc<Data>,
237
}
238

            
239
impl From<StorageInstance> for Storage {
240
67857
    fn from(instance: StorageInstance) -> Self {
241
67857
        Self {
242
67857
            instance,
243
67857
            authentication: None,
244
67857
            effective_session: None,
245
67857
        }
246
67857
    }
247
}
248

            
249
#[derive(Debug)]
250
struct Data {
251
    id: StorageId,
252
    path: PathBuf,
253
    parallelization: usize,
254
    threadpool: ThreadPool<AnyFile>,
255
    file_manager: AnyFileManager,
256
    pub(crate) tasks: TaskManager,
257
    schemas: RwLock<HashMap<SchemaName, Arc<dyn DatabaseOpener>>>,
258
    available_databases: RwLock<HashMap<String, SchemaName>>,
259
    open_roots: Mutex<HashMap<String, Context>>,
260
    // cfg check matches `Connection::authenticate`
261
    authenticated_permissions: Permissions,
262
    sessions: RwLock<AuthenticatedSessions>,
263
    pub(crate) subscribers: Arc<RwLock<SessionSubscribers>>,
264
    #[cfg(feature = "password-hashing")]
265
    argon: argon::Hasher,
266
    #[cfg(feature = "encryption")]
267
    pub(crate) vault: Arc<Vault>,
268
    #[cfg(feature = "encryption")]
269
    default_encryption_key: Option<KeyId>,
270
    #[cfg(any(feature = "compression", feature = "encryption"))]
271
    tree_vault: Option<TreeVault>,
272
    pub(crate) key_value_persistence: KeyValuePersistence,
273
    chunk_cache: ChunkCache,
274
    pub(crate) check_view_integrity_on_database_open: bool,
275
    relay: Relay,
276
}
277

            
278
impl Storage {
279
    /// Creates or opens a multi-database [`Storage`] with its data stored in `directory`.
280
3437
    pub fn open(configuration: StorageConfiguration) -> Result<Self, Error> {
281
3437
        let owned_path = configuration
282
3437
            .path
283
3437
            .clone()
284
3437
            .unwrap_or_else(|| PathBuf::from("db.bonsaidb"));
285
3437
        let file_manager = if configuration.memory_only {
286
62
            AnyFileManager::memory()
287
        } else {
288
3375
            AnyFileManager::std()
289
        };
290

            
291
3437
        let manager = Manager::default();
292
13868
        for _ in 0..configuration.workers.worker_count {
293
13868
            manager.spawn_worker();
294
13868
        }
295
3467
        let tasks = TaskManager::new(manager);
296
3467

            
297
3467
        fs::create_dir_all(&owned_path)?;
298

            
299
3467
        let id = Self::lookup_or_create_id(&configuration, &owned_path)?;
300

            
301
        #[cfg(feature = "encryption")]
302
3407
        let vault = {
303
3467
            let vault_key_storage = match configuration.vault_key_storage {
304
180
                Some(storage) => storage,
305
                None => Arc::new(
306
3287
                    LocalVaultKeyStorage::new(owned_path.join("vault-keys"))
307
3287
                        .map_err(|err| Error::Vault(vault::Error::Initializing(err.to_string())))?,
308
                ),
309
            };
310

            
311
3467
            Arc::new(Vault::initialize(id, &owned_path, vault_key_storage)?)
312
        };
313

            
314
3407
        let parallelization = configuration.workers.parallelization;
315
3407
        let check_view_integrity_on_database_open = configuration.views.check_integrity_on_open;
316
3407
        let key_value_persistence = configuration.key_value_persistence;
317
3407
        #[cfg(feature = "password-hashing")]
318
3407
        let argon = argon::Hasher::new(configuration.argon);
319
3407
        #[cfg(feature = "encryption")]
320
3407
        let default_encryption_key = configuration.default_encryption_key;
321
3407
        #[cfg(all(feature = "compression", feature = "encryption"))]
322
3407
        let tree_vault = TreeVault::new_if_needed(
323
3407
            default_encryption_key.clone(),
324
3407
            &vault,
325
3407
            configuration.default_compression,
326
3407
        );
327
3407
        #[cfg(all(not(feature = "compression"), feature = "encryption"))]
328
3407
        let tree_vault = TreeVault::new_if_needed(default_encryption_key.clone(), &vault);
329
3407
        #[cfg(all(feature = "compression", not(feature = "encryption")))]
330
3407
        let tree_vault = TreeVault::new_if_needed(configuration.default_compression);
331
3407

            
332
3407
        let authenticated_permissions = configuration.authenticated_permissions;
333
3407

            
334
3407
        let storage = Self {
335
3407
            instance: StorageInstance {
336
3407
                data: Arc::new(Data {
337
3407
                    id,
338
3407
                    tasks,
339
3407
                    parallelization,
340
3407
                    subscribers: Arc::default(),
341
3407
                    authenticated_permissions,
342
3407
                    sessions: RwLock::default(),
343
3407
                    #[cfg(feature = "password-hashing")]
344
3407
                    argon,
345
3407
                    #[cfg(feature = "encryption")]
346
3407
                    vault,
347
3407
                    #[cfg(feature = "encryption")]
348
3407
                    default_encryption_key,
349
3407
                    #[cfg(any(feature = "compression", feature = "encryption"))]
350
3407
                    tree_vault,
351
3407
                    path: owned_path,
352
3407
                    file_manager,
353
3407
                    chunk_cache: ChunkCache::new(2000, 160_384),
354
3407
                    threadpool: ThreadPool::new(parallelization),
355
3407
                    schemas: RwLock::new(configuration.initial_schemas),
356
3407
                    available_databases: RwLock::default(),
357
3407
                    open_roots: Mutex::default(),
358
3407
                    key_value_persistence,
359
3407
                    check_view_integrity_on_database_open,
360
3407
                    relay: Relay::default(),
361
3407
                }),
362
3407
            },
363
3407
            authentication: None,
364
3407
            effective_session: None,
365
3407
        };
366
3407

            
367
3407
        storage.cache_available_databases()?;
368

            
369
3407
        storage.create_admin_database_if_needed()?;
370

            
371
3407
        Ok(storage)
372
3467
    }
373

            
374
    #[cfg(feature = "internal-apis")]
375
    #[doc(hidden)]
376
2082180
    pub fn database_without_schema(&self, name: &str) -> Result<Database, Error> {
377
2082180
        let name = name.to_owned();
378
2082180
        self.instance
379
2082180
            .database_without_schema(&name, Some(self), None)
380
2082180
    }
381

            
382
3467
    fn lookup_or_create_id(
383
3467
        configuration: &StorageConfiguration,
384
3467
        path: &Path,
385
3467
    ) -> Result<StorageId, Error> {
386
3467
        Ok(StorageId(if let Some(id) = configuration.unique_id {
387
            // The configuraiton id override is not persisted to disk. This is
388
            // mostly to prevent someone from accidentally adding this
389
            // configuration, realizing it breaks things, and then wanting to
390
            // revert. This makes reverting to the old value easier.
391
            id
392
        } else {
393
            // Load/Store a randomly generated id into a file. While the value
394
            // is numerical, the file contents are the ascii decimal, making it
395
            // easier for a human to view, and if needed, edit.
396
3467
            let id_path = path.join("server-id");
397
3467

            
398
3467
            if id_path.exists() {
399
                // This value is important enought to not allow launching the
400
                // server if the file can't be read or contains unexpected data.
401
487
                let existing_id = String::from_utf8(
402
487
                    File::open(id_path)
403
487
                        .and_then(|mut f| {
404
487
                            let mut bytes = Vec::new();
405
487
                            f.read_to_end(&mut bytes).map(|_| bytes)
406
487
                        })
407
487
                        .expect("error reading server-id file"),
408
487
                )
409
487
                .expect("server-id contains invalid data");
410
487

            
411
487
                existing_id.parse().expect("server-id isn't numeric")
412
            } else {
413
2980
                let id = { thread_rng().gen::<u64>() };
414
2980
                File::create(id_path)
415
2980
                    .and_then(|mut file| {
416
2980
                        let id = id.to_string();
417
2980
                        file.write_all(id.as_bytes())
418
2980
                    })
419
2980
                    .map_err(|err| {
420
                        Error::Core(bonsaidb_core::Error::Configuration(format!(
421
                            "Error writing server-id file: {}",
422
                            err
423
                        )))
424
2980
                    })?;
425
2980
                id
426
            }
427
        }))
428
3467
    }
429

            
430
3407
    fn cache_available_databases(&self) -> Result<(), Error> {
431
3407
        let available_databases = self
432
3407
            .admin()
433
3407
            .view::<ByName>()
434
3407
            .query()?
435
3407
            .into_iter()
436
3407
            .map(|map| (map.key, map.value))
437
3407
            .collect();
438
3407
        let mut storage_databases = self.instance.data.available_databases.write();
439
3407
        *storage_databases = available_databases;
440
3407
        Ok(())
441
3407
    }
442

            
443
    fn create_admin_database_if_needed(&self) -> Result<(), Error> {
444
3377
        self.register_schema::<Admin>()?;
445
3377
        match self.database::<Admin>(ADMIN_DATABASE_NAME) {
446
427
            Ok(_) => {}
447
            Err(bonsaidb_core::Error::DatabaseNotFound(_)) => {
448
2980
                drop(self.create_database::<Admin>(ADMIN_DATABASE_NAME, true)?);
449
            }
450
            Err(err) => return Err(Error::Core(err)),
451
        }
452
3407
        Ok(())
453
3407
    }
454

            
455
    /// Returns the unique id of the server.
456
    ///
457
    /// This value is set from the [`StorageConfiguration`] or randomly
458
    /// generated when creating a server. It shouldn't be changed after a server
459
    /// is in use, as doing can cause issues. For example, the vault that
460
    /// manages encrypted storage uses the server ID to store the vault key. If
461
    /// the server ID changes, the vault key storage will need to be updated
462
    /// with the new server ID.
463
    #[must_use]
464
    pub fn unique_id(&self) -> StorageId {
465
        self.instance.data.id
466
    }
467

            
468
    #[must_use]
469
114956
    pub(crate) fn parallelization(&self) -> usize {
470
114956
        self.instance.data.parallelization
471
114956
    }
472

            
473
    #[must_use]
474
    #[cfg(feature = "encryption")]
475
1667063
    pub(crate) fn vault(&self) -> &Arc<Vault> {
476
1667063
        &self.instance.data.vault
477
1667063
    }
478

            
479
    #[must_use]
480
    #[cfg(any(feature = "encryption", feature = "compression"))]
481
2954305
    pub(crate) fn tree_vault(&self) -> Option<&TreeVault> {
482
2954305
        self.instance.data.tree_vault.as_ref()
483
2954305
    }
484

            
485
    #[must_use]
486
    #[cfg(feature = "encryption")]
487
2887648
    pub(crate) fn default_encryption_key(&self) -> Option<&KeyId> {
488
2887648
        self.instance.data.default_encryption_key.as_ref()
489
2887648
    }
490

            
491
    #[must_use]
492
    #[cfg(all(feature = "compression", not(feature = "encryption")))]
493
    #[allow(clippy::unused_self)]
494
    pub(crate) fn default_encryption_key(&self) -> Option<&KeyId> {
495
        None
496
    }
497

            
498
    /// Registers a schema for use within the server.
499
3377
    pub fn register_schema<DB: Schema>(&self) -> Result<(), Error> {
500
3377
        let mut schemas = self.instance.data.schemas.write();
501
3377
        if schemas
502
3377
            .insert(
503
3377
                DB::schema_name(),
504
3377
                Arc::new(StorageSchemaOpener::<DB>::new()?),
505
            )
506
3377
            .is_none()
507
        {
508
3407
            Ok(())
509
        } else {
510
            Err(Error::Core(bonsaidb_core::Error::SchemaAlreadyRegistered(
511
                DB::schema_name(),
512
            )))
513
        }
514
3407
    }
515

            
516
27234
    fn validate_name(name: &str) -> Result<(), Error> {
517
27234
        if name.chars().enumerate().all(|(index, c)| {
518
215954
            c.is_ascii_alphanumeric()
519
10821
                || (index == 0 && c == '_')
520
5318
                || (index > 0 && (c == '.' || c == '-'))
521
215954
        }) {
522
27108
            Ok(())
523
        } else {
524
126
            Err(Error::Core(bonsaidb_core::Error::InvalidDatabaseName(
525
126
                name.to_owned(),
526
126
            )))
527
        }
528
27234
    }
529

            
530
    /// Restricts an unauthenticated instance to having `effective_permissions`.
531
    /// Returns `None` if a session has already been established.
532
    #[must_use]
533
    pub fn with_effective_permissions(&self, effective_permissions: Permissions) -> Option<Self> {
534
        if self.effective_session.is_some() {
535
            None
536
        } else {
537
            Some(Self {
538
                instance: self.instance.clone(),
539
                authentication: self.authentication.clone(),
540
                effective_session: Some(Arc::new(Session {
541
                    id: None,
542
                    identity: None,
543
                    permissions: effective_permissions,
544
                })),
545
            })
546
        }
547
    }
548

            
549
    /// Converts this instance into its blocking version, which is able to be
550
    /// used without async. The returned instance uses the current Tokio runtime
551
    /// handle to spawn blocking tasks.
552
    ///
553
    /// # Panics
554
    ///
555
    /// Panics if called outside the context of a Tokio runtime.
556
    #[cfg(feature = "async")]
557
2792
    pub fn into_async(self) -> crate::AsyncStorage {
558
2792
        self.into_async_with_runtime(tokio::runtime::Handle::current())
559
2792
    }
560

            
561
    /// Converts this instance into its blocking version, which is able to be
562
    /// used without async. The returned instance uses the provided runtime
563
    /// handle to spawn blocking tasks.
564
    #[cfg(feature = "async")]
565
2792
    pub fn into_async_with_runtime(self, runtime: tokio::runtime::Handle) -> crate::AsyncStorage {
566
2792
        crate::AsyncStorage {
567
2792
            storage: self,
568
2792
            runtime: Arc::new(runtime),
569
2792
        }
570
2792
    }
571

            
572
    /// Converts this instance into its blocking version, which is able to be
573
    /// used without async. The returned instance uses the current Tokio runtime
574
    /// handle to spawn blocking tasks.
575
    ///
576
    /// # Panics
577
    ///
578
    /// Panics if called outside the context of a Tokio runtime.
579
    #[cfg(feature = "async")]
580
    pub fn to_async(&self) -> crate::AsyncStorage {
581
        self.clone().into_async()
582
    }
583

            
584
    /// Converts this instance into its blocking version, which is able to be
585
    /// used without async. The returned instance uses the provided runtime
586
    /// handle to spawn blocking tasks.
587
    #[cfg(feature = "async")]
588
    pub fn to_async_with_runtime(&self, runtime: tokio::runtime::Handle) -> crate::AsyncStorage {
589
        self.clone().into_async_with_runtime(runtime)
590
    }
591
}
592

            
593
impl StorageInstance {
594
    #[cfg_attr(
595
        not(any(feature = "encryption", feature = "compression")),
596
        allow(unused_mut)
597
    )]
598
2150223
    pub(crate) fn open_roots(&self, name: &str) -> Result<Context, Error> {
599
2150223
        let mut open_roots = self.data.open_roots.lock();
600
2150223
        if let Some(roots) = open_roots.get(name) {
601
2123353
            Ok(roots.clone())
602
        } else {
603
26870
            let task_name = name.to_string();
604
26870

            
605
26870
            let mut config = nebari::Config::new(self.data.path.join(task_name))
606
26870
                .file_manager(self.data.file_manager.clone())
607
26870
                .cache(self.data.chunk_cache.clone())
608
26870
                .shared_thread_pool(&self.data.threadpool);
609

            
610
            #[cfg(any(feature = "encryption", feature = "compression"))]
611
26870
            if let Some(vault) = self.data.tree_vault.clone() {
612
6858
                config = config.vault(vault);
613
20238
            }
614

            
615
26870
            let roots = config.open().map_err(Error::from)?;
616
26870
            let context = Context::new(roots, self.data.key_value_persistence.clone());
617
26870

            
618
26870
            open_roots.insert(name.to_owned(), context.clone());
619
26870

            
620
26870
            Ok(context)
621
        }
622
2150223
    }
623

            
624
2701854
    pub(crate) fn tasks(&self) -> &'_ TaskManager {
625
2701854
        &self.data.tasks
626
2701854
    }
627

            
628
2150223
    pub(crate) fn check_view_integrity_on_database_open(&self) -> bool {
629
2150223
        self.data.check_view_integrity_on_database_open
630
2150223
    }
631

            
632
3136
    pub(crate) fn relay(&self) -> &'_ Relay {
633
3136
        &self.data.relay
634
3136
    }
635

            
636
    /// Opens a database through a generic-free trait.
637
2105482
    pub(crate) fn database_without_schema(
638
2105482
        &self,
639
2105482
        name: &str,
640
2105482
        storage: Option<&Storage>,
641
2105482
        expected_schema: Option<SchemaName>,
642
2105482
    ) -> Result<Database, Error> {
643
        // TODO switch to upgradable read now that we are on parking_lot
644
2102502
        let stored_schema = {
645
2105482
            let available_databases = self.data.available_databases.read();
646
2105482
            available_databases
647
2105482
                .get(name)
648
2105482
                .ok_or_else(|| {
649
2980
                    Error::Core(bonsaidb_core::Error::DatabaseNotFound(name.to_string()))
650
2105482
                })?
651
2102502
                .clone()
652
        };
653

            
654
2102502
        if let Some(expected_schema) = expected_schema {
655
20136
            if stored_schema != expected_schema {
656
                return Err(Error::Core(bonsaidb_core::Error::SchemaMismatch {
657
                    database_name: name.to_owned(),
658
                    schema: expected_schema,
659
                    stored_schema,
660
                }));
661
20136
            }
662
2082366
        }
663

            
664
2102502
        let mut schemas = self.data.schemas.write();
665
2102502
        let storage =
666
2102502
            storage.map_or_else(|| Cow::Owned(Storage::from(self.clone())), Cow::Borrowed);
667
2102502
        if let Some(schema) = schemas.get_mut(&stored_schema) {
668
2102502
            let db = schema.open(name.to_string(), storage.as_ref())?;
669
2102502
            Ok(db)
670
        } else {
671
            // The schema was stored, the user is requesting the same schema,
672
            // but it isn't registerd with the storage currently.
673
            Err(Error::Core(bonsaidb_core::Error::SchemaNotRegistered(
674
                stored_schema,
675
            )))
676
        }
677
2105482
    }
678

            
679
68
    fn update_user_with_named_id<
680
68
        'user,
681
68
        'other,
682
68
        Col: NamedCollection<PrimaryKey = u64>,
683
68
        U: Nameable<'user, u64> + Send + Sync,
684
68
        O: Nameable<'other, u64> + Send + Sync,
685
68
        F: FnOnce(&mut CollectionDocument<User>, u64) -> Result<bool, bonsaidb_core::Error>,
686
68
    >(
687
68
        &self,
688
68
        user: U,
689
68
        other: O,
690
68
        callback: F,
691
68
    ) -> Result<(), bonsaidb_core::Error> {
692
68
        let admin = self.admin();
693
68
        let other = other.name()?;
694
68
        let user = User::load(user.name()?, &admin)?;
695
68
        let other = other.id::<Col, _>(&admin)?;
696
68
        match (user, other) {
697
68
            (Some(mut user), Some(other)) => {
698
68
                if callback(&mut user, other)? {
699
34
                    user.update(&admin)?;
700
34
                }
701
68
                Ok(())
702
            }
703
            // TODO make this a generic not found with a name parameter.
704
            _ => Err(bonsaidb_core::Error::UserNotFound),
705
        }
706
68
    }
707

            
708
    #[cfg(feature = "password-hashing")]
709
150
    fn authenticate_inner(
710
150
        &self,
711
150
        user: CollectionDocument<User>,
712
150
        authentication: Authentication,
713
150
        admin: &Database,
714
150
    ) -> Result<Storage, bonsaidb_core::Error> {
715
150
        match authentication {
716
150
            Authentication::Password(password) => {
717
150
                let saved_hash = user
718
150
                    .contents
719
150
                    .argon_hash
720
150
                    .clone()
721
150
                    .ok_or(bonsaidb_core::Error::InvalidCredentials)?;
722

            
723
150
                self.data
724
150
                    .argon
725
150
                    .verify(user.header.id, password, saved_hash)?;
726
150
                self.assume_user(user, admin)
727
            }
728
        }
729
150
    }
730

            
731
150
    fn assume_user(
732
150
        &self,
733
150
        user: CollectionDocument<User>,
734
150
        admin: &Database,
735
150
    ) -> Result<Storage, bonsaidb_core::Error> {
736
150
        let permissions = user.contents.effective_permissions(
737
150
            admin,
738
150
            &admin.storage().instance.data.authenticated_permissions,
739
150
        )?;
740

            
741
150
        let mut sessions = self.data.sessions.write();
742
150
        sessions.last_session_id += 1;
743
150
        let session_id = SessionId(sessions.last_session_id);
744
150
        let session = Session {
745
150
            id: Some(session_id),
746
150
            identity: Some(Arc::new(Identity::User {
747
150
                id: user.header.id,
748
150
                username: user.contents.username,
749
150
            })),
750
150
            permissions,
751
150
        };
752
150
        let authentication = Arc::new(AuthenticatedSession {
753
150
            storage: Arc::downgrade(&self.data),
754
150
            session: session.clone(),
755
150
        });
756
150
        sessions.sessions.insert(session_id, authentication.clone());
757
150

            
758
150
        Ok(Storage {
759
150
            instance: self.clone(),
760
150
            authentication: Some(authentication),
761
150
            effective_session: Some(Arc::new(session)),
762
150
        })
763
150
    }
764

            
765
60
    fn assume_role(
766
60
        &self,
767
60
        role: CollectionDocument<Role>,
768
60
        admin: &Database,
769
60
    ) -> Result<Storage, bonsaidb_core::Error> {
770
60
        let permissions = role.contents.effective_permissions(
771
60
            admin,
772
60
            &admin.storage().instance.data.authenticated_permissions,
773
60
        )?;
774

            
775
60
        let mut sessions = self.data.sessions.write();
776
60
        sessions.last_session_id += 1;
777
60
        let session_id = SessionId(sessions.last_session_id);
778
60
        let session = Session {
779
60
            id: Some(session_id),
780
60
            identity: Some(Arc::new(Identity::Role {
781
60
                id: role.header.id,
782
60
                name: role.contents.name,
783
60
            })),
784
60
            permissions,
785
60
        };
786
60
        let authentication = Arc::new(AuthenticatedSession {
787
60
            storage: Arc::downgrade(&self.data),
788
60
            session: session.clone(),
789
60
        });
790
60
        sessions.sessions.insert(session_id, authentication.clone());
791
60

            
792
60
        Ok(Storage {
793
60
            instance: self.clone(),
794
60
            authentication: Some(authentication),
795
60
            effective_session: Some(Arc::new(session)),
796
60
        })
797
60
    }
798

            
799
368
    fn add_permission_group_to_user_inner(
800
368
        user: &mut CollectionDocument<User>,
801
368
        permission_group_id: u64,
802
368
    ) -> bool {
803
368
        if user.contents.groups.contains(&permission_group_id) {
804
184
            false
805
        } else {
806
184
            user.contents.groups.push(permission_group_id);
807
184
            true
808
        }
809
368
    }
810

            
811
248
    fn remove_permission_group_from_user_inner(
812
248
        user: &mut CollectionDocument<User>,
813
248
        permission_group_id: u64,
814
248
    ) -> bool {
815
248
        let old_len = user.contents.groups.len();
816
248
        user.contents.groups.retain(|id| id != &permission_group_id);
817
248
        old_len != user.contents.groups.len()
818
248
    }
819

            
820
248
    fn add_role_to_user_inner(user: &mut CollectionDocument<User>, role_id: u64) -> bool {
821
248
        if user.contents.roles.contains(&role_id) {
822
124
            false
823
        } else {
824
124
            user.contents.roles.push(role_id);
825
124
            true
826
        }
827
248
    }
828

            
829
248
    fn remove_role_from_user_inner(user: &mut CollectionDocument<User>, role_id: u64) -> bool {
830
248
        let old_len = user.contents.roles.len();
831
248
        user.contents.roles.retain(|id| id != &role_id);
832
248
        old_len != user.contents.roles.len()
833
248
    }
834
}
835

            
836
pub trait DatabaseOpener: Send + Sync + Debug {
837
    fn schematic(&self) -> &'_ Schematic;
838
    fn open(&self, name: String, storage: &Storage) -> Result<Database, Error>;
839
}
840

            
841
#[derive(Debug)]
842
pub struct StorageSchemaOpener<DB: Schema> {
843
    schematic: Schematic,
844
    _phantom: PhantomData<DB>,
845
}
846

            
847
impl<DB> StorageSchemaOpener<DB>
848
where
849
    DB: Schema,
850
{
851
3740
    pub fn new() -> Result<Self, Error> {
852
3740
        let schematic = DB::schematic()?;
853
3740
        Ok(Self {
854
3740
            schematic,
855
3740
            _phantom: PhantomData::default(),
856
3740
        })
857
3740
    }
858
}
859

            
860
impl<DB> DatabaseOpener for StorageSchemaOpener<DB>
861
where
862
    DB: Schema,
863
{
864
    fn schematic(&self) -> &'_ Schematic {
865
        &self.schematic
866
    }
867

            
868
74329
    fn open(&self, name: String, storage: &Storage) -> Result<Database, Error> {
869
74329
        let roots = storage.instance.open_roots(&name)?;
870
74329
        let db = Database::new::<DB, _>(name, roots, storage)?;
871
74329
        Ok(db)
872
74329
    }
873
}
874

            
875
impl HasSession for StorageInstance {
876
    fn session(&self) -> Option<&Session> {
877
        None
878
    }
879
}
880

            
881
impl StorageConnection for StorageInstance {
882
    type Database = Database;
883
    type Authenticated = Storage;
884

            
885
47721
    fn admin(&self) -> Self::Database {
886
47721
        Database::new::<Admin, _>(
887
47721
            ADMIN_DATABASE_NAME,
888
47721
            self.open_roots(ADMIN_DATABASE_NAME).unwrap(),
889
47721
            &Storage::from(self.clone()),
890
47721
        )
891
47721
        .unwrap()
892
47721
    }
893

            
894
    #[cfg_attr(
895
        feature = "tracing",
896
54460
        tracing::instrument(skip(name, schema, only_if_needed))
897
    )]
898
    fn create_database_with_schema(
899
        &self,
900
        name: &str,
901
        schema: SchemaName,
902
        only_if_needed: bool,
903
    ) -> Result<(), bonsaidb_core::Error> {
904
        Storage::validate_name(name)?;
905

            
906
        {
907
            let schemas = self.data.schemas.read();
908
            if !schemas.contains_key(&schema) {
909
                return Err(bonsaidb_core::Error::SchemaNotRegistered(schema));
910
            }
911
        }
912

            
913
        let mut available_databases = self.data.available_databases.write();
914
        let admin = self.admin();
915
        if !available_databases.contains_key(name) {
916
            admin
917
                .collection::<DatabaseRecord>()
918
                .push(&admin::Database {
919
                    name: name.to_string(),
920
                    schema: schema.clone(),
921
                })?;
922
            available_databases.insert(name.to_string(), schema);
923
        } else if !only_if_needed {
924
            return Err(bonsaidb_core::Error::DatabaseNameAlreadyTaken(
925
                name.to_string(),
926
            ));
927
        }
928

            
929
        Ok(())
930
    }
931

            
932
7137
    fn database<DB: Schema>(&self, name: &str) -> Result<Self::Database, bonsaidb_core::Error> {
933
7137
        self.database_without_schema(name, None, Some(DB::schema_name()))
934
7137
            .map_err(bonsaidb_core::Error::from)
935
7137
    }
936

            
937
30496
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(name)))]
938
    fn delete_database(&self, name: &str) -> Result<(), bonsaidb_core::Error> {
939
        let admin = self.admin();
940
        let mut available_databases = self.data.available_databases.write();
941
        available_databases.remove(name);
942

            
943
        let mut open_roots = self.data.open_roots.lock();
944
        open_roots.remove(name);
945

            
946
        let database_folder = self.data.path.join(name);
947
        if database_folder.exists() {
948
            let file_manager = self.data.file_manager.clone();
949
            file_manager
950
                .delete_directory(&database_folder)
951
                .map_err(Error::Nebari)?;
952
        }
953

            
954
        if let Some(entry) = admin
955
            .view::<database::ByName>()
956
            .with_key(name.to_ascii_lowercase())
957
            .query()?
958
            .first()
959
        {
960
            admin.delete::<DatabaseRecord, _>(&entry.source)?;
961

            
962
            Ok(())
963
        } else {
964
            Err(bonsaidb_core::Error::DatabaseNotFound(name.to_string()))
965
        }
966
    }
967

            
968
248
    #[cfg_attr(feature = "tracing", tracing::instrument)]
969
124
    fn list_databases(&self) -> Result<Vec<connection::Database>, bonsaidb_core::Error> {
970
        let available_databases = self.data.available_databases.read();
971
        Ok(available_databases
972
            .iter()
973
4058
            .map(|(name, schema)| connection::Database {
974
4058
                name: name.to_string(),
975
4058
                schema: schema.clone(),
976
4058
            })
977
            .collect())
978
    }
979

            
980
248
    #[cfg_attr(feature = "tracing", tracing::instrument)]
981
124
    fn list_available_schemas(&self) -> Result<Vec<SchemaName>, bonsaidb_core::Error> {
982
        let available_databases = self.data.available_databases.read();
983
        Ok(available_databases.values().unique().cloned().collect())
984
    }
985

            
986
548
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(username)))]
987
    fn create_user(&self, username: &str) -> Result<u64, bonsaidb_core::Error> {
988
        let result = self
989
            .admin()
990
            .collection::<User>()
991
            .push(&User::default_with_username(username))?;
992
        Ok(result.id)
993
    }
994

            
995
16
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(user)))]
996
    fn delete_user<'user, U: Nameable<'user, u64> + Send + Sync>(
997
        &self,
998
        user: U,
999
    ) -> Result<(), bonsaidb_core::Error> {
        let admin = self.admin();
        let user = User::load(user, &admin)?.ok_or(bonsaidb_core::Error::UserNotFound)?;
        user.delete(&admin)?;

            
        Ok(())
    }

            
    #[cfg(feature = "password-hashing")]
6
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(user, password)))]
    fn set_user_password<'user, U: Nameable<'user, u64> + Send + Sync>(
        &self,
        user: U,
        password: bonsaidb_core::connection::SensitiveString,
    ) -> Result<(), bonsaidb_core::Error> {
        let admin = self.admin();
        let mut user = User::load(user, &admin)?.ok_or(bonsaidb_core::Error::UserNotFound)?;
        user.contents.argon_hash = Some(self.data.argon.hash(user.header.id, password)?);
        user.update(&admin)
    }

            
    #[cfg(feature = "password-hashing")]
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(user)))]
    fn authenticate<'user, U: Nameable<'user, u64> + Send + Sync>(
        &self,
        user: U,
        authentication: Authentication,
    ) -> Result<Self::Authenticated, bonsaidb_core::Error> {
        let admin = self.admin();
        let user = User::load(user, &admin)?.ok_or(bonsaidb_core::Error::InvalidCredentials)?;
        self.authenticate_inner(user, authentication, &admin)
            .map(Storage::from)
    }

            
    fn assume_identity(
        &self,
        identity: IdentityReference<'_>,
    ) -> Result<Self::Authenticated, bonsaidb_core::Error> {
        let admin = self.admin();
        match identity {
            IdentityReference::User(user) => {
                let user =
                    User::load(user, &admin)?.ok_or(bonsaidb_core::Error::InvalidCredentials)?;
                self.assume_user(user, &admin).map(Storage::from)
            }
            IdentityReference::Role(role) => {
                let role =
                    Role::load(role, &admin)?.ok_or(bonsaidb_core::Error::InvalidCredentials)?;
                self.assume_role(role, &admin).map(Storage::from)
            }
            _ => Err(bonsaidb_core::Error::InvalidCredentials),
        }
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(user, permission_group)))]
    fn add_permission_group_to_user<
        'user,
        'group,
        U: Nameable<'user, u64> + Send + Sync,
        G: Nameable<'group, u64> + Send + Sync,
    >(
        &self,
        user: U,
        permission_group: G,
    ) -> Result<(), bonsaidb_core::Error> {
        self.update_user_with_named_id::<PermissionGroup, _, _, _>(
            user,
            permission_group,
            |user, permission_group_id| {
                Ok(Self::add_permission_group_to_user_inner(
                    user,
                    permission_group_id,
                ))
            },
        )
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(user, permission_group)))]
    fn remove_permission_group_from_user<
        'user,
        'group,
        U: Nameable<'user, u64> + Send + Sync,
        G: Nameable<'group, u64> + Send + Sync,
    >(
        &self,
        user: U,
        permission_group: G,
    ) -> Result<(), bonsaidb_core::Error> {
        self.update_user_with_named_id::<PermissionGroup, _, _, _>(
            user,
            permission_group,
            |user, permission_group_id| {
                Ok(Self::remove_permission_group_from_user_inner(
                    user,
                    permission_group_id,
                ))
            },
        )
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(user, role)))]
    fn add_role_to_user<
        'user,
        'group,
        U: Nameable<'user, u64> + Send + Sync,
        G: Nameable<'group, u64> + Send + Sync,
    >(
        &self,
        user: U,
        role: G,
    ) -> Result<(), bonsaidb_core::Error> {
        self.update_user_with_named_id::<PermissionGroup, _, _, _>(user, role, |user, role_id| {
            Ok(Self::add_role_to_user_inner(user, role_id))
        })
    }

            
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(user, role)))]
    fn remove_role_from_user<
        'user,
        'group,
        U: Nameable<'user, u64> + Send + Sync,
        G: Nameable<'group, u64> + Send + Sync,
    >(
        &self,
        user: U,
        role: G,
    ) -> Result<(), bonsaidb_core::Error> {
        self.update_user_with_named_id::<Role, _, _, _>(user, role, |user, role_id| {
            Ok(Self::remove_role_from_user_inner(user, role_id))
        })
    }
}

            
impl HasSession for Storage {
4088609
    fn session(&self) -> Option<&Session> {
4088609
        self.effective_session.as_deref()
4088609
    }
}

            
impl StorageConnection for Storage {
    type Database = Database;
    type Authenticated = Self;

            
3891
    fn admin(&self) -> Self::Database {
3891
        self.instance.admin()
3891
    }

            
    #[cfg_attr(
        feature = "tracing",
54460
        tracing::instrument(skip(name, schema, only_if_needed))
    )]
    fn create_database_with_schema(
        &self,
        name: &str,
        schema: SchemaName,
        only_if_needed: bool,
    ) -> Result<(), bonsaidb_core::Error> {
        self.check_permission(
            database_resource_name(name),
            &BonsaiAction::Server(ServerAction::CreateDatabase),
        )?;
        self.instance
            .create_database_with_schema(name, schema, only_if_needed)
    }

            
7136
    fn database<DB: Schema>(&self, name: &str) -> Result<Self::Database, bonsaidb_core::Error> {
7136
        self.instance.database::<DB>(name)
7136
    }

            
30496
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(name)))]
    fn delete_database(&self, name: &str) -> Result<(), bonsaidb_core::Error> {
        self.check_permission(
            database_resource_name(name),
            &BonsaiAction::Server(ServerAction::DeleteDatabase),
        )?;
        self.instance.delete_database(name)
    }

            
248
    #[cfg_attr(feature = "tracing", tracing::instrument)]
124
    fn list_databases(&self) -> Result<Vec<connection::Database>, bonsaidb_core::Error> {
        self.check_permission(
            bonsaidb_resource_name(),
            &BonsaiAction::Server(ServerAction::ListDatabases),
        )?;
        self.instance.list_databases()
    }

            
248
    #[cfg_attr(feature = "tracing", tracing::instrument)]
124
    fn list_available_schemas(&self) -> Result<Vec<SchemaName>, bonsaidb_core::Error> {
        self.check_permission(
            bonsaidb_resource_name(),
            &BonsaiAction::Server(ServerAction::ListAvailableSchemas),
        )?;
        self.instance.list_available_schemas()
    }

            
608
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(username)))]
    fn create_user(&self, username: &str) -> Result<u64, bonsaidb_core::Error> {
        self.check_permission(
            bonsaidb_resource_name(),
            &BonsaiAction::Server(ServerAction::CreateUser),
        )?;
        self.instance.create_user(username)
    }

            
16
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(user)))]
    fn delete_user<'user, U: Nameable<'user, u64> + Send + Sync>(
        &self,
        user: U,
    ) -> Result<(), bonsaidb_core::Error> {
        let admin = self.admin();
        let user = user.name()?;
        let user_id = user
            .id::<User, _>(&admin)?
            .ok_or(bonsaidb_core::Error::UserNotFound)?;
        self.check_permission(
            user_resource_name(user_id),
            &BonsaiAction::Server(ServerAction::DeleteUser),
        )?;
        self.instance.delete_user(user)
    }

            
    #[cfg(feature = "password-hashing")]
6
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(user, password)))]
    fn set_user_password<'user, U: Nameable<'user, u64> + Send + Sync>(
        &self,
        user: U,
        password: bonsaidb_core::connection::SensitiveString,
    ) -> Result<(), bonsaidb_core::Error> {
        let admin = self.admin();
        let user = user.name()?;
        let user_id = user
            .id::<User, _>(&admin)?
            .ok_or(bonsaidb_core::Error::UserNotFound)?;
        self.check_permission(
            user_resource_name(user_id),
            &BonsaiAction::Server(ServerAction::SetPassword),
        )?;
        self.instance.set_user_password(user, password)
    }

            
    #[cfg(feature = "password-hashing")]
10
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(user)))]
    fn authenticate<'user, U: Nameable<'user, u64> + Send + Sync>(
        &self,
        user: U,
        authentication: Authentication,
    ) -> Result<Self, bonsaidb_core::Error> {
        let admin = self.admin();
        let user = User::load(user, &admin)?.ok_or(bonsaidb_core::Error::InvalidCredentials)?;
        match &authentication {
            Authentication::Password(_) => {
                self.check_permission(
                    user_resource_name(user.header.id),
                    &BonsaiAction::Server(ServerAction::Authenticate(
                        AuthenticationMethod::PasswordHash,
                    )),
                )?;
            }
        }
        // TODO merge session permissions
        self.instance
            .authenticate_inner(user, authentication, &admin)
    }

            
60
    fn assume_identity(
60
        &self,
60
        identity: IdentityReference<'_>,
60
    ) -> Result<Self::Authenticated, bonsaidb_core::Error> {
60
        match identity {
            IdentityReference::User(user) => {
                let admin = self.admin();
                let user =
                    User::load(user, &admin)?.ok_or(bonsaidb_core::Error::InvalidCredentials)?;
                self.check_permission(
                    user_resource_name(user.header.id),
                    &BonsaiAction::Server(ServerAction::AssumeIdentity),
                )?;
                self.instance.assume_user(user, &admin)
            }
60
            IdentityReference::Role(role) => {
60
                let admin = self.admin();
60
                let role =
60
                    Role::load(role, &admin)?.ok_or(bonsaidb_core::Error::InvalidCredentials)?;
60
                self.check_permission(
60
                    role_resource_name(role.header.id),
60
                    &BonsaiAction::Server(ServerAction::AssumeIdentity),
60
                )?;
60
                self.instance.assume_role(role, &admin)
            }

            
            _ => Err(bonsaidb_core::Error::InvalidCredentials),
        }
60
    }

            
40
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(user, permission_group)))]
    fn add_permission_group_to_user<
        'user,
        'group,
        U: Nameable<'user, u64> + Send + Sync,
        G: Nameable<'group, u64> + Send + Sync,
    >(
        &self,
        user: U,
        permission_group: G,
    ) -> Result<(), bonsaidb_core::Error> {
        self.instance
            .update_user_with_named_id::<PermissionGroup, _, _, _>(
                user,
                permission_group,
                |user, permission_group_id| {
20
                    self.check_permission(
20
                        user_resource_name(user.header.id),
20
                        &BonsaiAction::Server(ServerAction::ModifyUserPermissionGroups),
20
                    )?;
20
                    Ok(StorageInstance::add_permission_group_to_user_inner(
20
                        user,
20
                        permission_group_id,
20
                    ))
20
                },
            )
    }

            
32
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(user, permission_group)))]
    fn remove_permission_group_from_user<
        'user,
        'group,
        U: Nameable<'user, u64> + Send + Sync,
        G: Nameable<'group, u64> + Send + Sync,
    >(
        &self,
        user: U,
        permission_group: G,
    ) -> Result<(), bonsaidb_core::Error> {
        self.instance
            .update_user_with_named_id::<PermissionGroup, _, _, _>(
                user,
                permission_group,
                |user, permission_group_id| {
16
                    self.check_permission(
16
                        user_resource_name(user.header.id),
16
                        &BonsaiAction::Server(ServerAction::ModifyUserPermissionGroups),
16
                    )?;
16
                    Ok(StorageInstance::remove_permission_group_from_user_inner(
16
                        user,
16
                        permission_group_id,
16
                    ))
16
                },
            )
    }

            
32
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(user, role)))]
    fn add_role_to_user<
        'user,
        'group,
        U: Nameable<'user, u64> + Send + Sync,
        G: Nameable<'group, u64> + Send + Sync,
    >(
        &self,
        user: U,
        role: G,
    ) -> Result<(), bonsaidb_core::Error> {
        self.instance
            .update_user_with_named_id::<PermissionGroup, _, _, _>(user, role, |user, role_id| {
16
                self.check_permission(
16
                    user_resource_name(user.header.id),
16
                    &BonsaiAction::Server(ServerAction::ModifyUserRoles),
16
                )?;
16
                Ok(StorageInstance::add_role_to_user_inner(user, role_id))
16
            })
    }

            
32
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(user, role)))]
    fn remove_role_from_user<
        'user,
        'group,
        U: Nameable<'user, u64> + Send + Sync,
        G: Nameable<'group, u64> + Send + Sync,
    >(
        &self,
        user: U,
        role: G,
    ) -> Result<(), bonsaidb_core::Error> {
        self.instance
            .update_user_with_named_id::<Role, _, _, _>(user, role, |user, role_id| {
16
                self.check_permission(
16
                    user_resource_name(user.header.id),
16
                    &BonsaiAction::Server(ServerAction::ModifyUserRoles),
16
                )?;
16
                Ok(StorageInstance::remove_role_from_user_inner(user, role_id))
16
            })
    }
}

            
1
#[test]
1
fn name_validation_tests() {
1
    assert!(matches!(Storage::validate_name("azAZ09.-"), Ok(())));
1
    assert!(matches!(
1
        Storage::validate_name("_internal-names-work"),
        Ok(())
    ));
1
    assert!(matches!(
1
        Storage::validate_name("-alphaunmericfirstrequired"),
        Err(Error::Core(bonsaidb_core::Error::InvalidDatabaseName(_)))
    ));
1
    assert!(matches!(
1
        Storage::validate_name("\u{2661}"),
        Err(Error::Core(bonsaidb_core::Error::InvalidDatabaseName(_)))
    ));
1
}

            
/// The unique id of a [`Storage`] instance.
#[derive(Clone, Copy, Eq, PartialEq, Hash)]
pub struct StorageId(u64);

            
impl StorageId {
    /// Returns the id as a u64.
    #[must_use]
    pub const fn as_u64(self) -> u64 {
        self.0
    }
}

            
impl Debug for StorageId {
6448
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
6448
        // let formatted_length = format!();
6448
        write!(f, "{:016x}", self.0)
6448
    }
}

            
impl Display for StorageId {
6448
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
6448
        Debug::fmt(self, f)
6448
    }
}

            
1647035
#[derive(Debug, Clone)]
#[cfg(any(feature = "compression", feature = "encryption"))]
pub(crate) struct TreeVault {
    #[cfg(feature = "compression")]
    compression: Option<Compression>,
    #[cfg(feature = "encryption")]
    pub key: Option<KeyId>,
    #[cfg(feature = "encryption")]
    pub vault: Arc<Vault>,
}

            
#[cfg(all(feature = "compression", feature = "encryption"))]
impl TreeVault {
1670470
    pub(crate) fn new_if_needed(
1670470
        key: Option<KeyId>,
1670470
        vault: &Arc<Vault>,
1670470
        compression: Option<Compression>,
1670470
    ) -> Option<Self> {
1670470
        if key.is_none() && compression.is_none() {
1634399
            None
        } else {
36041
            Some(Self {
36041
                key,
36041
                compression,
36041
                vault: vault.clone(),
36041
            })
        }
1670440
    }

            
4931552
    fn header(&self, compressed: bool) -> u8 {
4931552
        let mut bits = if self.key.is_some() { 0b1000_0000 } else { 0 };

            
4931552
        if compressed {
2415559
            if let Some(compression) = self.compression {
2415557
                bits |= compression as u8;
2415557
            }
2515993
        }

            
4931550
        bits
4931550
    }
}

            
#[cfg(all(feature = "compression", feature = "encryption"))]
impl nebari::Vault for TreeVault {
    type Error = Error;

            
4931432
    fn encrypt(&self, payload: &[u8]) -> Result<Vec<u8>, Error> {
4931432
        // TODO this allocates too much. The vault should be able to do an
4931432
        // in-place encryption operation so that we can use a single buffer.
4931432
        let mut includes_compression = false;
4931432
        let compressed = match (payload.len(), self.compression) {
2418766
            (128..=usize::MAX, Some(Compression::Lz4)) => {
2415495
                includes_compression = true;
2415495
                Cow::Owned(lz4_flex::block::compress_prepend_size(payload))
            }
2515937
            _ => Cow::Borrowed(payload),
        };

            
4931432
        let mut complete = if let Some(key) = &self.key {
52355
            self.vault.encrypt_payload(key, &compressed, None)?
        } else {
4879077
            compressed.into_owned()
        };

            
4931432
        let header = self.header(includes_compression);
4931432
        if header != 0 {
2463866
            let header = [b't', b'r', b'v', header];
2463866
            complete.splice(0..0, header);
2474816
        }

            
4931971
        Ok(complete)
4931971
    }

            
483986
    fn decrypt(&self, payload: &[u8]) -> Result<Vec<u8>, Error> {
483986
        if payload.len() >= 4 && &payload[0..3] == b"trv" {
364625
            let header = payload[3];
364625
            let payload = &payload[4..];
364625
            let encrypted = (header & 0b1000_0000) != 0;
364625
            let compression = header & 0b0111_1111;
364625
            let decrypted = if encrypted {
11342
                Cow::Owned(self.vault.decrypt_payload(payload, None)?)
            } else {
353283
                Cow::Borrowed(payload)
            };
            #[allow(clippy::single_match)] // Make it an error when we add a new algorithm
364624
            return Ok(match Compression::from_u8(compression) {
                Some(Compression::Lz4) => {
356613
                    lz4_flex::block::decompress_size_prepended(&decrypted).map_err(Error::from)?
                }
8011
                None => decrypted.into_owned(),
            });
119361
        }
119361
        self.vault.decrypt_payload(payload, None)
483986
    }
}

            
/// Functionality that is available on both [`Storage`] and
/// [`AsyncStorage`](crate::AsyncStorage).
pub trait StorageNonBlocking: Sized {
    /// Returns the path of the database storage.
    #[must_use]
    fn path(&self) -> &Path;

            
    /// Returns a new instance of [`Storage`] with `session` as the effective
    /// authentication session. This call will only succeed if there is no
    /// current session.
    fn assume_session(&self, session: Session) -> Result<Self, bonsaidb_core::Error>;
}

            
impl StorageNonBlocking for Storage {
4560
    fn path(&self) -> &Path {
4560
        &self.instance.data.path
4560
    }

            
2118870
    fn assume_session(&self, session: Session) -> Result<Storage, bonsaidb_core::Error> {
2118870
        if self.authentication.is_some() {
            // TODO better error
            return Err(bonsaidb_core::Error::InvalidCredentials);
2118870
        }

            
2118870
        let session_id = match session.id {
390
            Some(id) => id,
            None => {
2118480
                return Ok(Self {
2118480
                    instance: self.instance.clone(),
2118480
                    authentication: None,
2118480
                    effective_session: Some(Arc::new(session)),
2118480
                })
            }
        };

            
390
        let session_data = self.instance.data.sessions.read();
        // TODO better error
390
        let authentication = session_data
390
            .sessions
390
            .get(&session_id)
390
            .ok_or(bonsaidb_core::Error::InvalidCredentials)?;

            
390
        let effective_permissions =
390
            Permissions::merged([&session.permissions, &authentication.session.permissions]);
390
        let effective_session = Session {
390
            id: authentication.session.id,
390
            identity: authentication.session.identity.clone(),
390
            permissions: effective_permissions,
390
        };
390

            
390
        Ok(Self {
390
            instance: self.instance.clone(),
390
            authentication: Some(authentication.clone()),
390
            effective_session: Some(Arc::new(effective_session)),
390
        })
2118870
    }
}

            
#[cfg(all(feature = "compression", not(feature = "encryption")))]
impl TreeVault {
    pub(crate) fn new_if_needed(compression: Option<Compression>) -> Option<Self> {
        compression.map(|compression| Self {
            compression: Some(compression),
        })
    }
}

            
#[cfg(all(feature = "compression", not(feature = "encryption")))]
impl nebari::Vault for TreeVault {
    type Error = Error;

            
    fn encrypt(&self, payload: &[u8]) -> Result<Vec<u8>, Error> {
        Ok(match (payload.len(), self.compression) {
            (128..=usize::MAX, Some(Compression::Lz4)) => {
                let mut destination =
                    vec![0; lz4_flex::block::get_maximum_output_size(payload.len()) + 8];
                let compressed_length =
                    lz4_flex::block::compress_into(payload, &mut destination[8..])
                        .expect("lz4-flex documents this shouldn't fail");
                destination.truncate(compressed_length + 8);
                destination[0..4].copy_from_slice(&[b't', b'r', b'v', Compression::Lz4 as u8]);
                // to_le_bytes() makes it compatible with lz4-flex decompress_size_prepended.
                let uncompressed_length =
                    u32::try_from(payload.len()).expect("nebari doesn't support >32 bit blocks");
                destination[4..8].copy_from_slice(&uncompressed_length.to_le_bytes());
                destination
            }
            // TODO this shouldn't copy
            _ => payload.to_vec(),
        })
    }

            
    fn decrypt(&self, payload: &[u8]) -> Result<Vec<u8>, Error> {
        if payload.len() >= 4 && &payload[0..3] == b"trv" {
            let header = payload[3];
            let payload = &payload[4..];
            let encrypted = (header & 0b1000_0000) != 0;
            let compression = header & 0b0111_1111;
            if encrypted {
                return Err(Error::EncryptionDisabled);
            }

            
            #[allow(clippy::single_match)] // Make it an error when we add a new algorithm
            return Ok(match Compression::from_u8(compression) {
                Some(Compression::Lz4) => {
                    lz4_flex::block::decompress_size_prepended(payload).map_err(Error::from)?
                }
                None => payload.to_vec(),
            });
        }
        Ok(payload.to_vec())
    }
}

            
#[cfg(all(not(feature = "compression"), feature = "encryption"))]
impl TreeVault {
    pub(crate) fn new_if_needed(key: Option<KeyId>, vault: &Arc<Vault>) -> Option<Self> {
        key.map(|key| Self {
            key: Some(key),
            vault: vault.clone(),
        })
    }

            
    #[allow(dead_code)] // This implementation is sort of documentation for what it would be. But our Vault payload already can detect if a parsing error occurs, so we don't need a header if only encryption is enabled.
    fn header(&self) -> u8 {
        if self.key.is_some() {
            0b1000_0000
        } else {
            0
        }
    }
}

            
#[cfg(all(not(feature = "compression"), feature = "encryption"))]
impl nebari::Vault for TreeVault {
    type Error = Error;

            
    fn encrypt(&self, payload: &[u8]) -> Result<Vec<u8>, Error> {
        if let Some(key) = &self.key {
            self.vault.encrypt_payload(key, payload, None)
        } else {
            // TODO does this need to copy?
            Ok(payload.to_vec())
        }
    }

            
    fn decrypt(&self, payload: &[u8]) -> Result<Vec<u8>, Error> {
        self.vault.decrypt_payload(payload, None)
    }
}