1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
use std::{
    collections::HashMap,
    path::{Path, PathBuf},
    sync::Arc,
    time::Duration,
};

#[cfg(feature = "encryption")]
use bonsaidb_core::document::KeyId;
use bonsaidb_core::{
    permissions::Permissions,
    schema::{Schema, SchemaName},
};
use sysinfo::{RefreshKind, System, SystemExt};

#[cfg(feature = "encryption")]
use crate::vault::AnyVaultKeyStorage;
use crate::{
    storage::{DatabaseOpener, StorageSchemaOpener},
    Error,
};

#[cfg(feature = "password-hashing")]
mod argon;
#[cfg(feature = "password-hashing")]
pub use argon::*;

/// Configuration options for [`Storage`](crate::storage::Storage).
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct StorageConfiguration {
    /// The path to the database. Defaults to `db.bonsaidb` if not specified.
    pub path: Option<PathBuf>,

    /// Prevents storing data on the disk. This is intended for testing purposes
    /// primarily. Keep in mind that the underlying storage format is
    /// append-only.
    pub memory_only: bool,

    /// The unique id of the server. If not specified, the server will randomly
    /// generate a unique id on startup. If the server generated an id and this
    /// value is subsequently set, the generated id will be overridden by the
    /// one specified here.
    pub unique_id: Option<u64>,

    /// The vault key storage to use. If not specified,
    /// [`LocalVaultKeyStorage`](crate::vault::LocalVaultKeyStorage) will be
    /// used with the server's data folder as the path. This is **incredibly
    /// insecure and should not be used outside of testing**.
    ///
    /// For secure encryption, it is important to store the vault keys in a
    /// location that is separate from the database. If the keys are on the same
    /// hardware as the encrypted content, anyone with access to the disk will
    /// be able to decrypt the stored data.
    #[cfg(feature = "encryption")]
    pub vault_key_storage: Option<Arc<dyn AnyVaultKeyStorage>>,

    /// The default encryption key for the database. If specified, all documents
    /// will be stored encrypted at-rest using the key specified. Having this
    /// key specified will also encrypt views. Without this, views will be
    /// stored unencrypted.
    #[cfg(feature = "encryption")]
    pub default_encryption_key: Option<KeyId>,

    /// Configuration options related to background tasks.
    pub workers: Tasks,

    /// Configuration options related to views.
    pub views: Views,

    /// Controls how the key-value store persists keys, on a per-database basis.
    pub key_value_persistence: KeyValuePersistence,

    /// Sets the default compression algorithm.
    #[cfg(feature = "compression")]
    pub default_compression: Option<Compression>,

    /// The permissions granted to authenticated connections to this server.
    pub authenticated_permissions: Permissions,

    /// Password hashing configuration.
    #[cfg(feature = "password-hashing")]
    pub argon: ArgonConfiguration,

    pub(crate) initial_schemas: HashMap<SchemaName, Arc<dyn DatabaseOpener>>,
}

impl Default for StorageConfiguration {
    fn default() -> Self {
        let system_specs = RefreshKind::new().with_cpu().with_memory();
        let mut system = System::new_with_specifics(system_specs);
        system.refresh_specifics(system_specs);
        Self {
            path: None,
            memory_only: false,
            unique_id: None,
            #[cfg(feature = "encryption")]
            vault_key_storage: None,
            #[cfg(feature = "encryption")]
            default_encryption_key: None,
            #[cfg(feature = "compression")]
            default_compression: None,
            workers: Tasks::default_for(&system),
            views: Views::default(),
            key_value_persistence: KeyValuePersistence::default(),
            authenticated_permissions: Permissions::default(),
            #[cfg(feature = "password-hashing")]
            argon: ArgonConfiguration::default_for(&system),
            initial_schemas: HashMap::default(),
        }
    }
}

impl StorageConfiguration {
    /// Registers the schema provided.
    pub fn register_schema<S: Schema>(&mut self) -> Result<(), Error> {
        // TODO this should error on duplicate registration.
        self.initial_schemas
            .insert(S::schema_name(), Arc::new(StorageSchemaOpener::<S>::new()?));
        Ok(())
    }
}

/// Configuration options for background tasks.
#[derive(Debug, Clone)]
pub struct Tasks {
    /// Defines how many workers should be spawned to process tasks. This
    /// defaults to the 2x the number of cpu cores available to the system or 2,
    /// whichever is larger.
    pub worker_count: usize,

    /// Defines how many simultaneous threads should be used when a task is
    /// parallelizable. This defaults to the nuber of cpu cores available to the
    /// system.
    pub parallelization: usize,
}

impl SystemDefault for Tasks {
    fn default_for(system: &System) -> Self {
        let num_cpus = system
            .physical_core_count()
            .unwrap_or(0)
            .max(system.processors().len())
            .max(1);
        Self {
            worker_count: num_cpus * 2,
            parallelization: num_cpus,
        }
    }
}

/// Configuration options for views.
#[derive(Clone, Debug, Default)]
pub struct Views {
    /// If true, the database will scan all views during the call to
    /// `open_local`. This will cause database opening to take longer, but once
    /// the database is open, no request will need to wait for the integrity to
    /// be checked. However, for faster startup time, you may wish to delay the
    /// integrity scan. Default value is `false`.
    pub check_integrity_on_open: bool,
}

/// Rules for persisting key-value changes. Default persistence is to
/// immediately persist all changes. While this ensures data integrity, the
/// overhead of the key-value store can be significantly reduced by utilizing
/// lazy persistence strategies that delay writing changes until certain
/// thresholds have been met.
///
/// ## Immediate persistence
///
/// The default persistence mode will trigger commits always:
///
/// ```rust
/// # use bonsaidb_local::config::KeyValuePersistence;
/// # use std::time::Duration;
/// assert!(!KeyValuePersistence::default().should_commit(0, Duration::ZERO));
/// assert!(KeyValuePersistence::default().should_commit(1, Duration::ZERO));
/// ```
///
/// ## Lazy persistence
///
/// Lazy persistence allows setting multiple thresholds, allowing for customized
/// behavior that can help tune performance, especially under write-heavy loads.
///
/// It is good practice to include one [`PersistenceThreshold`] that has no
/// duration, as it will ensure that the in-memory cache cannot exceed a certain
/// size. This number is counted for each database indepenently.
///
/// ```rust
/// # use bonsaidb_local::config::{KeyValuePersistence, PersistenceThreshold};
/// # use std::time::Duration;
/// #
/// let persistence = KeyValuePersistence::lazy([
///     PersistenceThreshold::after_changes(1).and_duration(Duration::from_secs(120)),
///     PersistenceThreshold::after_changes(10).and_duration(Duration::from_secs(10)),
///     PersistenceThreshold::after_changes(100),
/// ]);
///
/// // After 1 change and 60 seconds, no changes would be committed:
/// assert!(!persistence.should_commit(1, Duration::from_secs(60)));
/// // But on or after 120 seconds, that change will be committed:
/// assert!(persistence.should_commit(1, Duration::from_secs(120)));
///
/// // After 10 changes and 10 seconds, changes will be committed:
/// assert!(persistence.should_commit(10, Duration::from_secs(10)));
///
/// // Once 100 changes have been accumulated, this ruleset will always commit
/// // regardless of duration.
/// assert!(persistence.should_commit(100, Duration::ZERO));
/// ```
#[derive(Debug, Clone)]
#[must_use]
pub struct KeyValuePersistence(KeyValuePersistenceInner);

#[derive(Debug, Clone)]
enum KeyValuePersistenceInner {
    Immediate,
    Lazy(Vec<PersistenceThreshold>),
}

impl Default for KeyValuePersistence {
    /// Returns [`KeyValuePersistence::immediate()`].
    fn default() -> Self {
        Self::immediate()
    }
}

impl KeyValuePersistence {
    /// Returns a ruleset that commits all changes immediately.
    pub const fn immediate() -> Self {
        Self(KeyValuePersistenceInner::Immediate)
    }

    /// Returns a ruleset that lazily commits data based on a list of thresholds.
    pub fn lazy<II>(rules: II) -> Self
    where
        II: IntoIterator<Item = PersistenceThreshold>,
    {
        let mut rules = rules.into_iter().collect::<Vec<_>>();
        rules.sort_by(|a, b| a.number_of_changes.cmp(&b.number_of_changes));
        Self(KeyValuePersistenceInner::Lazy(rules))
    }

    /// Returns true if these rules determine that the outstanding changes should be persisted.
    #[must_use]
    pub fn should_commit(
        &self,
        number_of_changes: usize,
        elapsed_since_last_commit: Duration,
    ) -> bool {
        self.duration_until_next_commit(number_of_changes, elapsed_since_last_commit)
            == Some(Duration::ZERO)
    }

    pub(crate) fn duration_until_next_commit(
        &self,
        number_of_changes: usize,
        elapsed_since_last_commit: Duration,
    ) -> Option<Duration> {
        if number_of_changes == 0 {
            None
        } else {
            match &self.0 {
                KeyValuePersistenceInner::Immediate => Some(Duration::ZERO),
                KeyValuePersistenceInner::Lazy(rules) => {
                    let mut shortest_duration = Duration::MAX;
                    for rule in rules
                        .iter()
                        .take_while(|rule| rule.number_of_changes <= number_of_changes)
                    {
                        let remaining_time =
                            rule.duration.saturating_sub(elapsed_since_last_commit);
                        shortest_duration = shortest_duration.min(remaining_time);

                        if shortest_duration == Duration::ZERO {
                            break;
                        }
                    }
                    (shortest_duration < Duration::MAX).then(|| shortest_duration)
                }
            }
        }
    }
}

/// A threshold controlling lazy commits. For a threshold to apply, both
/// `number_of_changes` must be met or surpassed and `duration` must have
/// elpased since the last commit.
///
/// A threshold with a duration of zero will not wait any time to persist
/// changes once the specified `number_of_changes` has been met or surpassed.
#[derive(Debug, Copy, Clone)]
#[must_use]
pub struct PersistenceThreshold {
    /// The minimum number of changes that must have occurred for this threshold to apply.
    pub number_of_changes: usize,
    /// The amount of time that must elapse since the last write for this threshold to apply.
    pub duration: Duration,
}

impl PersistenceThreshold {
    /// Returns a threshold that applies after a number of changes have elapsed.
    pub const fn after_changes(number_of_changes: usize) -> Self {
        Self {
            number_of_changes,
            duration: Duration::ZERO,
        }
    }

    /// Sets the duration of this threshold to `duration` and returns self.
    pub const fn and_duration(mut self, duration: Duration) -> Self {
        self.duration = duration;
        self
    }
}

/// Storage configuration builder methods.
pub trait Builder: Default {
    /// Creates a default configuration with `path` set.
    #[must_use]
    fn new<P: AsRef<Path>>(path: P) -> Self {
        Self::default().path(path)
    }
    /// Registers the schema and returns self.
    fn with_schema<S: Schema>(self) -> Result<Self, Error>;

    /// Sets [`StorageConfiguration::path`](StorageConfiguration#structfield.memory_only) to true and returns self.
    #[must_use]
    fn memory_only(self) -> Self;
    /// Sets [`StorageConfiguration::path`](StorageConfiguration#structfield.path) to `path` and returns self.
    #[must_use]
    fn path<P: AsRef<Path>>(self, path: P) -> Self;
    /// Sets [`StorageConfiguration::unique_id`](StorageConfiguration#structfield.unique_id) to `unique_id` and returns self.
    #[must_use]
    fn unique_id(self, unique_id: u64) -> Self;
    /// Sets [`StorageConfiguration::vault_key_storage`](StorageConfiguration#structfield.vault_key_storage) to `key_storage` and returns self.
    #[cfg(feature = "encryption")]
    #[must_use]
    fn vault_key_storage<VaultKeyStorage: AnyVaultKeyStorage>(
        self,
        key_storage: VaultKeyStorage,
    ) -> Self;
    /// Sets [`StorageConfiguration::default_encryption_key`](StorageConfiguration#structfield.default_encryption_key) to `path` and returns self.
    #[cfg(feature = "encryption")]
    #[must_use]
    fn default_encryption_key(self, key: KeyId) -> Self;
    /// Sets [`Tasks::worker_count`] to `worker_count` and returns self.
    #[must_use]
    fn tasks_worker_count(self, worker_count: usize) -> Self;
    /// Sets [`Tasks::parallelization`] to `parallelization` and returns self.
    #[must_use]
    fn tasks_parallelization(self, parallelization: usize) -> Self;
    /// Sets [`Views::check_integrity_on_open`] to `check` and returns self.
    #[must_use]
    fn check_view_integrity_on_open(self, check: bool) -> Self;
    /// Sets [`StorageConfiguration::default_compression`](StorageConfiguration#structfield.default_compression) to `path` and returns self.
    #[cfg(feature = "compression")]
    #[must_use]
    fn default_compression(self, compression: Compression) -> Self;
    /// Sets [`StorageConfiguration::key_value_persistence`](StorageConfiguration#structfield.key_value_persistence) to `persistence` and returns self.
    #[must_use]
    fn key_value_persistence(self, persistence: KeyValuePersistence) -> Self;
    /// Sets [`Self::authenticated_permissions`](Self#structfield.authenticated_permissions) to `authenticated_permissions` and returns self.
    #[must_use]
    fn authenticated_permissions<P: Into<Permissions>>(self, authenticated_permissions: P) -> Self;
    /// Sets [`StorageConfiguration::argon`](StorageConfiguration#structfield.argon) to `argon` and returns self.
    #[cfg(feature = "password-hashing")]
    #[must_use]
    fn argon(self, argon: ArgonConfiguration) -> Self;
}

impl Builder for StorageConfiguration {
    fn with_schema<S: Schema>(mut self) -> Result<Self, Error> {
        self.register_schema::<S>()?;
        Ok(self)
    }

    fn memory_only(mut self) -> Self {
        self.memory_only = true;
        self
    }

    fn path<P: AsRef<Path>>(mut self, path: P) -> Self {
        self.path = Some(path.as_ref().to_owned());
        self
    }

    fn unique_id(mut self, unique_id: u64) -> Self {
        self.unique_id = Some(unique_id);
        self
    }

    #[cfg(feature = "encryption")]
    fn vault_key_storage<VaultKeyStorage: AnyVaultKeyStorage>(
        mut self,
        key_storage: VaultKeyStorage,
    ) -> Self {
        self.vault_key_storage = Some(Arc::new(key_storage));
        self
    }

    #[cfg(feature = "encryption")]
    fn default_encryption_key(mut self, key: KeyId) -> Self {
        self.default_encryption_key = Some(key);
        self
    }

    #[cfg(feature = "compression")]
    fn default_compression(mut self, compression: Compression) -> Self {
        self.default_compression = Some(compression);
        self
    }

    fn tasks_worker_count(mut self, worker_count: usize) -> Self {
        self.workers.worker_count = worker_count;
        self
    }

    fn tasks_parallelization(mut self, parallelization: usize) -> Self {
        self.workers.parallelization = parallelization;
        self
    }

    fn check_view_integrity_on_open(mut self, check: bool) -> Self {
        self.views.check_integrity_on_open = check;
        self
    }

    fn key_value_persistence(mut self, persistence: KeyValuePersistence) -> Self {
        self.key_value_persistence = persistence;
        self
    }

    fn authenticated_permissions<P: Into<Permissions>>(
        mut self,
        authenticated_permissions: P,
    ) -> Self {
        self.authenticated_permissions = authenticated_permissions.into();
        self
    }

    #[cfg(feature = "password-hashing")]
    fn argon(mut self, argon: ArgonConfiguration) -> Self {
        self.argon = argon;
        self
    }
}

pub(crate) trait SystemDefault: Sized {
    fn default_for(system: &System) -> Self;
    fn default() -> Self {
        let system_specs = RefreshKind::new().with_cpu().with_memory();
        let mut system = System::new_with_specifics(system_specs);
        system.refresh_specifics(system_specs);
        Self::default_for(&system)
    }
}

/// All available compression algorithms.
#[derive(Debug, Clone, Copy)]
pub enum Compression {
    /// Compress data using the
    /// [lz4](https://en.wikipedia.org/wiki/LZ4_(compression_algorithm))
    /// algorithm. This is powered by
    /// [lz4_flex](https://crates.io/crates/lz4_flex).
    Lz4 = 1,
}

impl Compression {
    #[must_use]
    #[cfg(feature = "compression")]
    pub(crate) fn from_u8(value: u8) -> Option<Self> {
        match value {
            1 => Some(Self::Lz4),
            _ => None,
        }
    }
}