1
use std::collections::HashMap;
2
use std::path::{Path, PathBuf};
3
use std::sync::Arc;
4
use std::time::Duration;
5

            
6
#[cfg(feature = "encryption")]
7
use bonsaidb_core::document::KeyId;
8
use bonsaidb_core::permissions::Permissions;
9
use bonsaidb_core::schema::{Schema, SchemaName};
10
use sysinfo::{CpuRefreshKind, RefreshKind, System, SystemExt};
11

            
12
use crate::storage::{DatabaseOpener, StorageSchemaOpener};
13
#[cfg(feature = "encryption")]
14
use crate::vault::AnyVaultKeyStorage;
15
use crate::Error;
16

            
17
#[cfg(feature = "password-hashing")]
18
mod argon;
19
#[cfg(feature = "password-hashing")]
20
pub use argon::*;
21

            
22
/// Configuration options for [`Storage`](crate::storage::Storage).
23
12
#[derive(Clone)]
24
#[non_exhaustive]
25
pub struct StorageConfiguration {
26
    /// The path to the database. Defaults to `db.bonsaidb` if not specified.
27
    pub path: Option<PathBuf>,
28

            
29
    /// Prevents storing data on the disk. This is intended for testing purposes
30
    /// primarily. Keep in mind that the underlying storage format is
31
    /// append-only.
32
    pub memory_only: bool,
33

            
34
    /// The unique id of the server. If not specified, the server will randomly
35
    /// generate a unique id on startup. If the server generated an id and this
36
    /// value is subsequently set, the generated id will be overridden by the
37
    /// one specified here.
38
    pub unique_id: Option<u64>,
39

            
40
    /// The vault key storage to use. If not specified,
41
    /// [`LocalVaultKeyStorage`](crate::vault::LocalVaultKeyStorage) will be
42
    /// used with the server's data folder as the path. This is **incredibly
43
    /// insecure and should not be used outside of testing**.
44
    ///
45
    /// For secure encryption, it is important to store the vault keys in a
46
    /// location that is separate from the database. If the keys are on the same
47
    /// hardware as the encrypted content, anyone with access to the disk will
48
    /// be able to decrypt the stored data.
49
    #[cfg(feature = "encryption")]
50
    pub vault_key_storage: Option<Arc<dyn AnyVaultKeyStorage>>,
51

            
52
    /// The default encryption key for the database. If specified, all documents
53
    /// will be stored encrypted at-rest using the key specified. Having this
54
    /// key specified will also encrypt views. Without this, views will be
55
    /// stored unencrypted.
56
    #[cfg(feature = "encryption")]
57
    pub default_encryption_key: Option<KeyId>,
58

            
59
    /// Configuration options related to background tasks.
60
    pub workers: Tasks,
61

            
62
    /// Configuration options related to views.
63
    pub views: Views,
64

            
65
    /// Controls how the key-value store persists keys, on a per-database basis.
66
    pub key_value_persistence: KeyValuePersistence,
67

            
68
    /// Sets the default compression algorithm.
69
    #[cfg(feature = "compression")]
70
    pub default_compression: Option<Compression>,
71

            
72
    /// The permissions granted to authenticated connections to this server.
73
    pub authenticated_permissions: Permissions,
74

            
75
    /// Password hashing configuration.
76
    #[cfg(feature = "password-hashing")]
77
    pub argon: ArgonConfiguration,
78

            
79
    pub(crate) initial_schemas: HashMap<SchemaName, Arc<dyn DatabaseOpener>>,
80
}
81

            
82
impl Default for StorageConfiguration {
83
5333
    fn default() -> Self {
84
5333
        let system_specs = RefreshKind::new()
85
5333
            .with_cpu(CpuRefreshKind::new())
86
5333
            .with_memory();
87
5333
        let mut system = System::new_with_specifics(system_specs);
88
5333
        system.refresh_specifics(system_specs);
89
5333
        Self {
90
5333
            path: None,
91
5333
            memory_only: false,
92
5333
            unique_id: None,
93
5333
            #[cfg(feature = "encryption")]
94
5333
            vault_key_storage: None,
95
5333
            #[cfg(feature = "encryption")]
96
5333
            default_encryption_key: None,
97
5333
            #[cfg(feature = "compression")]
98
5333
            default_compression: None,
99
5333
            workers: Tasks::default_for(&system),
100
5333
            views: Views::default(),
101
5333
            key_value_persistence: KeyValuePersistence::default(),
102
5333
            authenticated_permissions: Permissions::default(),
103
5333
            #[cfg(feature = "password-hashing")]
104
5333
            argon: ArgonConfiguration::default_for(&system),
105
5333
            initial_schemas: HashMap::default(),
106
5333
        }
107
5333
    }
108
}
109

            
110
impl std::fmt::Debug for StorageConfiguration {
111
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112
        let mut schemas = self.initial_schemas.keys().collect::<Vec<_>>();
113
        schemas.sort();
114
        let mut f = f.debug_struct("StorageConfiguration");
115
        f.field("path", &self.path)
116
            .field("memory_only", &self.memory_only)
117
            .field("unique_id", &self.unique_id)
118
            .field("workers", &self.workers)
119
            .field("views", &self.views)
120
            .field("key_value_persistence", &self.key_value_persistence)
121
            .field("authenticated_permissions", &self.authenticated_permissions)
122
            .field("initial_schemas", &schemas);
123

            
124
        #[cfg(feature = "encryption")]
125
        f.field("vault_key_storage", &self.vault_key_storage)
126
            .field("default_encryption_key", &self.default_encryption_key);
127

            
128
        #[cfg(feature = "compression")]
129
        f.field("default_compression", &self.default_compression);
130

            
131
        #[cfg(feature = "password-hashing")]
132
        f.field("argon", &self.argon);
133

            
134
        f.finish()
135
    }
136
}
137

            
138
impl StorageConfiguration {
139
    /// Registers the schema provided.
140
389
    pub fn register_schema<S: Schema>(&mut self) -> Result<(), Error> {
141
389
        // TODO this should error on duplicate registration.
142
389
        self.initial_schemas
143
389
            .insert(S::schema_name(), Arc::new(StorageSchemaOpener::<S>::new()?));
144
389
        Ok(())
145
389
    }
146
}
147

            
148
/// Configuration options for background tasks.
149
12
#[derive(Debug, Clone)]
150
pub struct Tasks {
151
    /// Defines how many workers should be spawned to process tasks. This
152
    /// defaults to the 2x the number of cpu cores available to the system or 2,
153
    /// whichever is larger.
154
    pub worker_count: usize,
155

            
156
    /// Defines how many simultaneous threads should be used when a task is
157
    /// parallelizable. This defaults to the nuber of cpu cores available to the
158
    /// system.
159
    pub parallelization: usize,
160
}
161

            
162
impl SystemDefault for Tasks {
163
5333
    fn default_for(system: &System) -> Self {
164
5333
        let num_cpus = system
165
5333
            .physical_core_count()
166
5333
            .unwrap_or(0)
167
5333
            .max(system.cpus().len())
168
5333
            .max(1);
169
5333
        Self {
170
5333
            worker_count: num_cpus * 2,
171
5333
            parallelization: num_cpus,
172
5333
        }
173
5333
    }
174
}
175

            
176
/// Configuration options for views.
177
5333
#[derive(Clone, Debug, Default)]
178
pub struct Views {
179
    /// If true, the database will scan all views during the call to
180
    /// `open_local`. This will cause database opening to take longer, but once
181
    /// the database is open, no request will need to wait for the integrity to
182
    /// be checked. However, for faster startup time, you may wish to delay the
183
    /// integrity scan. Default value is `false`.
184
    pub check_integrity_on_open: bool,
185
}
186

            
187
/// Rules for persisting key-value changes. Default persistence is to
188
/// immediately persist all changes. While this ensures data integrity, the
189
/// overhead of the key-value store can be significantly reduced by utilizing
190
/// lazy persistence strategies that delay writing changes until certain
191
/// thresholds have been met.
192
///
193
/// ## Immediate persistence
194
///
195
/// The default persistence mode will trigger commits always:
196
///
197
/// ```rust
198
/// # use bonsaidb_local::config::KeyValuePersistence;
199
/// # use std::time::Duration;
200
/// assert!(!KeyValuePersistence::default().should_commit(0, Duration::ZERO));
201
/// assert!(KeyValuePersistence::default().should_commit(1, Duration::ZERO));
202
/// ```
203
///
204
/// ## Lazy persistence
205
///
206
/// Lazy persistence allows setting multiple thresholds, allowing for customized
207
/// behavior that can help tune performance, especially under write-heavy loads.
208
///
209
/// It is good practice to include one [`PersistenceThreshold`] that has no
210
/// duration, as it will ensure that the in-memory cache cannot exceed a certain
211
/// size. This number is counted for each database indepenently.
212
///
213
/// ```rust
214
/// # use bonsaidb_local::config::{KeyValuePersistence, PersistenceThreshold};
215
/// # use std::time::Duration;
216
/// #
217
/// let persistence = KeyValuePersistence::lazy([
218
///     PersistenceThreshold::after_changes(1).and_duration(Duration::from_secs(120)),
219
///     PersistenceThreshold::after_changes(10).and_duration(Duration::from_secs(10)),
220
///     PersistenceThreshold::after_changes(100),
221
/// ]);
222
///
223
/// // After 1 change and 60 seconds, no changes would be committed:
224
/// assert!(!persistence.should_commit(1, Duration::from_secs(60)));
225
/// // But on or after 120 seconds, that change will be committed:
226
/// assert!(persistence.should_commit(1, Duration::from_secs(120)));
227
///
228
/// // After 10 changes and 10 seconds, changes will be committed:
229
/// assert!(persistence.should_commit(10, Duration::from_secs(10)));
230
///
231
/// // Once 100 changes have been accumulated, this ruleset will always commit
232
/// // regardless of duration.
233
/// assert!(persistence.should_commit(100, Duration::ZERO));
234
/// ```
235
35858
#[derive(Debug, Clone)]
236
#[must_use]
237
pub struct KeyValuePersistence(KeyValuePersistenceInner);
238

            
239
35858
#[derive(Debug, Clone)]
240
enum KeyValuePersistenceInner {
241
    Immediate,
242
    Lazy(Vec<PersistenceThreshold>),
243
}
244

            
245
impl Default for KeyValuePersistence {
246
    /// Returns [`KeyValuePersistence::immediate()`].
247
5338
    fn default() -> Self {
248
5338
        Self::immediate()
249
5338
    }
250
}
251

            
252
impl KeyValuePersistence {
253
    /// Returns a ruleset that commits all changes immediately.
254
5634
    pub const fn immediate() -> Self {
255
5634
        Self(KeyValuePersistenceInner::Immediate)
256
5634
    }
257

            
258
    /// Returns a ruleset that lazily commits data based on a list of thresholds.
259
13
    pub fn lazy<II>(rules: II) -> Self
260
13
    where
261
13
        II: IntoIterator<Item = PersistenceThreshold>,
262
13
    {
263
13
        let mut rules = rules.into_iter().collect::<Vec<_>>();
264
13
        rules.sort_by(|a, b| a.number_of_changes.cmp(&b.number_of_changes));
265
13
        Self(KeyValuePersistenceInner::Lazy(rules))
266
13
    }
267

            
268
    /// Returns true if these rules determine that the outstanding changes should be persisted.
269
    #[must_use]
270
100287
    pub fn should_commit(
271
100287
        &self,
272
100287
        number_of_changes: usize,
273
100287
        elapsed_since_last_commit: Duration,
274
100287
    ) -> bool {
275
100287
        self.duration_until_next_commit(number_of_changes, elapsed_since_last_commit)
276
100287
            == Some(Duration::ZERO)
277
100287
    }
278

            
279
201405
    pub(crate) fn duration_until_next_commit(
280
201405
        &self,
281
201405
        number_of_changes: usize,
282
201405
        elapsed_since_last_commit: Duration,
283
201405
    ) -> Option<Duration> {
284
201405
        if number_of_changes == 0 {
285
33245
            None
286
        } else {
287
168160
            match &self.0 {
288
167039
                KeyValuePersistenceInner::Immediate => Some(Duration::ZERO),
289
1121
                KeyValuePersistenceInner::Lazy(rules) => {
290
1121
                    let mut shortest_duration = Duration::MAX;
291
1121
                    for rule in rules
292
1121
                        .iter()
293
1125
                        .take_while(|rule| rule.number_of_changes <= number_of_changes)
294
                    {
295
7
                        let remaining_time =
296
7
                            rule.duration.saturating_sub(elapsed_since_last_commit);
297
7
                        shortest_duration = shortest_duration.min(remaining_time);
298
7

            
299
7
                        if shortest_duration == Duration::ZERO {
300
3
                            break;
301
4
                        }
302
                    }
303
1121
                    (shortest_duration < Duration::MAX).then_some(shortest_duration)
304
                }
305
            }
306
        }
307
201405
    }
308
}
309

            
310
/// A threshold controlling lazy commits. For a threshold to apply, both
311
/// `number_of_changes` must be met or surpassed and `duration` must have
312
/// elpased since the last commit.
313
///
314
/// A threshold with a duration of zero will not wait any time to persist
315
/// changes once the specified `number_of_changes` has been met or surpassed.
316
#[derive(Debug, Copy, Clone)]
317
#[must_use]
318
pub struct PersistenceThreshold {
319
    /// The minimum number of changes that must have occurred for this threshold to apply.
320
    pub number_of_changes: usize,
321
    /// The amount of time that must elapse since the last write for this threshold to apply.
322
    pub duration: Duration,
323
}
324

            
325
impl PersistenceThreshold {
326
    /// Returns a threshold that applies after a number of changes have elapsed.
327
374
    pub const fn after_changes(number_of_changes: usize) -> Self {
328
374
        Self {
329
374
            number_of_changes,
330
374
            duration: Duration::ZERO,
331
374
        }
332
374
    }
333

            
334
    /// Sets the duration of this threshold to `duration` and returns self.
335
1
    pub const fn and_duration(mut self, duration: Duration) -> Self {
336
1
        self.duration = duration;
337
1
        self
338
1
    }
339
}
340

            
341
/// Storage configuration builder methods.
342
pub trait Builder: Sized {
343
    /// Creates a default configuration with `path` set.
344
    #[must_use]
345
293
    fn new<P: AsRef<Path>>(path: P) -> Self
346
293
    where
347
293
        Self: Default,
348
293
    {
349
293
        Self::default().path(path)
350
293
    }
351
    /// Registers the schema and returns self.
352
    fn with_schema<S: Schema>(self) -> Result<Self, Error>;
353

            
354
    /// Sets [`StorageConfiguration::memory_only`](StorageConfiguration#structfield.memory_only) to true and returns self.
355
    #[must_use]
356
    fn memory_only(self) -> Self;
357
    /// Sets [`StorageConfiguration::path`](StorageConfiguration#structfield.path) to `path` and returns self.
358
    #[must_use]
359
    fn path<P: AsRef<Path>>(self, path: P) -> Self;
360
    /// Sets [`StorageConfiguration::unique_id`](StorageConfiguration#structfield.unique_id) to `unique_id` and returns self.
361
    #[must_use]
362
    fn unique_id(self, unique_id: u64) -> Self;
363
    /// Sets [`StorageConfiguration::vault_key_storage`](StorageConfiguration#structfield.vault_key_storage) to `key_storage` and returns self.
364
    #[cfg(feature = "encryption")]
365
    #[must_use]
366
    fn vault_key_storage<VaultKeyStorage: AnyVaultKeyStorage>(
367
        self,
368
        key_storage: VaultKeyStorage,
369
    ) -> Self;
370
    /// Sets [`StorageConfiguration::default_encryption_key`](StorageConfiguration#structfield.default_encryption_key) to `path` and returns self.
371
    #[cfg(feature = "encryption")]
372
    #[must_use]
373
    fn default_encryption_key(self, key: KeyId) -> Self;
374
    /// Sets [`Tasks::worker_count`] to `worker_count` and returns self.
375
    #[must_use]
376
    fn tasks_worker_count(self, worker_count: usize) -> Self;
377
    /// Sets [`Tasks::parallelization`] to `parallelization` and returns self.
378
    #[must_use]
379
    fn tasks_parallelization(self, parallelization: usize) -> Self;
380
    /// Sets [`Views::check_integrity_on_open`] to `check` and returns self.
381
    #[must_use]
382
    fn check_view_integrity_on_open(self, check: bool) -> Self;
383
    /// Sets [`StorageConfiguration::default_compression`](StorageConfiguration#structfield.default_compression) to `path` and returns self.
384
    #[cfg(feature = "compression")]
385
    #[must_use]
386
    fn default_compression(self, compression: Compression) -> Self;
387
    /// Sets [`StorageConfiguration::key_value_persistence`](StorageConfiguration#structfield.key_value_persistence) to `persistence` and returns self.
388
    #[must_use]
389
    fn key_value_persistence(self, persistence: KeyValuePersistence) -> Self;
390
    /// Sets [`Self::authenticated_permissions`](Self#structfield.authenticated_permissions) to `authenticated_permissions` and returns self.
391
    #[must_use]
392
    fn authenticated_permissions<P: Into<Permissions>>(self, authenticated_permissions: P) -> Self;
393
    /// Sets [`StorageConfiguration::argon`](StorageConfiguration#structfield.argon) to `argon` and returns self.
394
    #[cfg(feature = "password-hashing")]
395
    #[must_use]
396
    fn argon(self, argon: ArgonConfiguration) -> Self;
397
}
398

            
399
impl Builder for StorageConfiguration {
400
    fn with_schema<S: Schema>(mut self) -> Result<Self, Error> {
401
299
        self.register_schema::<S>()?;
402
299
        Ok(self)
403
299
    }
404

            
405
71
    fn memory_only(mut self) -> Self {
406
71
        self.memory_only = true;
407
71
        self
408
71
    }
409

            
410
201
    fn path<P: AsRef<Path>>(mut self, path: P) -> Self {
411
201
        self.path = Some(path.as_ref().to_owned());
412
201
        self
413
201
    }
414

            
415
    fn unique_id(mut self, unique_id: u64) -> Self {
416
        self.unique_id = Some(unique_id);
417
        self
418
    }
419

            
420
    #[cfg(feature = "encryption")]
421
6
    fn vault_key_storage<VaultKeyStorage: AnyVaultKeyStorage>(
422
6
        mut self,
423
6
        key_storage: VaultKeyStorage,
424
6
    ) -> Self {
425
6
        self.vault_key_storage = Some(Arc::new(key_storage));
426
6
        self
427
6
    }
428

            
429
    #[cfg(feature = "encryption")]
430
222
    fn default_encryption_key(mut self, key: KeyId) -> Self {
431
222
        self.default_encryption_key = Some(key);
432
222
        self
433
222
    }
434

            
435
    #[cfg(feature = "compression")]
436
328
    fn default_compression(mut self, compression: Compression) -> Self {
437
328
        self.default_compression = Some(compression);
438
328
        self
439
328
    }
440

            
441
    fn tasks_worker_count(mut self, worker_count: usize) -> Self {
442
        self.workers.worker_count = worker_count;
443
        self
444
    }
445

            
446
    fn tasks_parallelization(mut self, parallelization: usize) -> Self {
447
        self.workers.parallelization = parallelization;
448
        self
449
    }
450

            
451
1
    fn check_view_integrity_on_open(mut self, check: bool) -> Self {
452
1
        self.views.check_integrity_on_open = check;
453
1
        self
454
1
    }
455

            
456
1
    fn key_value_persistence(mut self, persistence: KeyValuePersistence) -> Self {
457
1
        self.key_value_persistence = persistence;
458
1
        self
459
1
    }
460

            
461
    fn authenticated_permissions<P: Into<Permissions>>(
462
        mut self,
463
        authenticated_permissions: P,
464
    ) -> Self {
465
        self.authenticated_permissions = authenticated_permissions.into();
466
        self
467
    }
468

            
469
    #[cfg(feature = "password-hashing")]
470
    fn argon(mut self, argon: ArgonConfiguration) -> Self {
471
        self.argon = argon;
472
        self
473
    }
474
}
475

            
476
pub(crate) trait SystemDefault: Sized {
477
    fn default_for(system: &System) -> Self;
478
1
    fn default() -> Self {
479
1
        let system_specs = RefreshKind::new()
480
1
            .with_cpu(CpuRefreshKind::new())
481
1
            .with_memory();
482
1
        let mut system = System::new_with_specifics(system_specs);
483
1
        system.refresh_specifics(system_specs);
484
1
        Self::default_for(&system)
485
1
    }
486
}
487

            
488
/// All available compression algorithms.
489
2661158
#[derive(Debug, Clone, Copy)]
490
pub enum Compression {
491
    /// Compress data using the
492
    /// [lz4](https://en.wikipedia.org/wiki/LZ4_(compression_algorithm))
493
    /// algorithm. This is powered by
494
    /// [lz4_flex](https://crates.io/crates/lz4_flex).
495
    Lz4 = 1,
496
}
497

            
498
impl Compression {
499
    #[must_use]
500
    #[cfg(feature = "compression")]
501
755684
    pub(crate) fn from_u8(value: u8) -> Option<Self> {
502
755684
        match value {
503
744368
            1 => Some(Self::Lz4),
504
11316
            _ => None,
505
        }
506
755684
    }
507
}