1
use std::{
2
    collections::HashMap,
3
    path::{Path, PathBuf},
4
    sync::Arc,
5
    time::Duration,
6
};
7

            
8
#[cfg(feature = "encryption")]
9
use bonsaidb_core::document::KeyId;
10
use bonsaidb_core::{
11
    permissions::Permissions,
12
    schema::{Schema, SchemaName},
13
};
14
use sysinfo::{RefreshKind, System, SystemExt};
15

            
16
#[cfg(feature = "encryption")]
17
use crate::vault::AnyVaultKeyStorage;
18
use crate::{
19
    storage::{DatabaseOpener, StorageSchemaOpener},
20
    Error,
21
};
22

            
23
#[cfg(feature = "password-hashing")]
24
mod argon;
25
#[cfg(feature = "password-hashing")]
26
pub use argon::*;
27

            
28
/// Configuration options for [`Storage`](crate::storage::Storage).
29
12
#[derive(Debug, Clone)]
30
#[non_exhaustive]
31
pub struct StorageConfiguration {
32
    /// The path to the database. Defaults to `db.bonsaidb` if not specified.
33
    pub path: Option<PathBuf>,
34

            
35
    /// Prevents storing data on the disk. This is intended for testing purposes
36
    /// primarily. Keep in mind that the underlying storage format is
37
    /// append-only.
38
    pub memory_only: bool,
39

            
40
    /// The unique id of the server. If not specified, the server will randomly
41
    /// generate a unique id on startup. If the server generated an id and this
42
    /// value is subsequently set, the generated id will be overridden by the
43
    /// one specified here.
44
    pub unique_id: Option<u64>,
45

            
46
    /// The vault key storage to use. If not specified,
47
    /// [`LocalVaultKeyStorage`](crate::vault::LocalVaultKeyStorage) will be
48
    /// used with the server's data folder as the path. This is **incredibly
49
    /// insecure and should not be used outside of testing**.
50
    ///
51
    /// For secure encryption, it is important to store the vault keys in a
52
    /// location that is separate from the database. If the keys are on the same
53
    /// hardware as the encrypted content, anyone with access to the disk will
54
    /// be able to decrypt the stored data.
55
    #[cfg(feature = "encryption")]
56
    pub vault_key_storage: Option<Arc<dyn AnyVaultKeyStorage>>,
57

            
58
    /// The default encryption key for the database. If specified, all documents
59
    /// will be stored encrypted at-rest using the key specified. Having this
60
    /// key specified will also encrypt views. Without this, views will be
61
    /// stored unencrypted.
62
    #[cfg(feature = "encryption")]
63
    pub default_encryption_key: Option<KeyId>,
64

            
65
    /// Configuration options related to background tasks.
66
    pub workers: Tasks,
67

            
68
    /// Configuration options related to views.
69
    pub views: Views,
70

            
71
    /// Controls how the key-value store persists keys, on a per-database basis.
72
    pub key_value_persistence: KeyValuePersistence,
73

            
74
    /// Sets the default compression algorithm.
75
    #[cfg(feature = "compression")]
76
    pub default_compression: Option<Compression>,
77

            
78
    /// The permissions granted to authenticated connections to this server.
79
    pub authenticated_permissions: Permissions,
80

            
81
    /// Password hashing configuration.
82
    #[cfg(feature = "password-hashing")]
83
    pub argon: ArgonConfiguration,
84

            
85
    pub(crate) initial_schemas: HashMap<SchemaName, Arc<dyn DatabaseOpener>>,
86
}
87

            
88
impl Default for StorageConfiguration {
89
3465
    fn default() -> Self {
90
3465
        let system_specs = RefreshKind::new().with_cpu().with_memory();
91
3465
        let mut system = System::new_with_specifics(system_specs);
92
3465
        system.refresh_specifics(system_specs);
93
3465
        Self {
94
3465
            path: None,
95
3465
            memory_only: false,
96
3465
            unique_id: None,
97
3465
            #[cfg(feature = "encryption")]
98
3465
            vault_key_storage: None,
99
3465
            #[cfg(feature = "encryption")]
100
3465
            default_encryption_key: None,
101
3465
            #[cfg(feature = "compression")]
102
3465
            default_compression: None,
103
3465
            workers: Tasks::default_for(&system),
104
3465
            views: Views::default(),
105
3465
            key_value_persistence: KeyValuePersistence::default(),
106
3465
            authenticated_permissions: Permissions::default(),
107
3465
            #[cfg(feature = "password-hashing")]
108
3465
            argon: ArgonConfiguration::default_for(&system),
109
3465
            initial_schemas: HashMap::default(),
110
3465
        }
111
3465
    }
112
}
113

            
114
impl StorageConfiguration {
115
    /// Registers the schema provided.
116
    pub fn register_schema<S: Schema>(&mut self) -> Result<(), Error> {
117
        // TODO this should error on duplicate registration.
118
        self.initial_schemas
119
333
            .insert(S::schema_name(), Arc::new(StorageSchemaOpener::<S>::new()?));
120
333
        Ok(())
121
333
    }
122
}
123

            
124
/// Configuration options for background tasks.
125
12
#[derive(Debug, Clone)]
126
pub struct Tasks {
127
    /// Defines how many workers should be spawned to process tasks. This
128
    /// defaults to the 2x the number of cpu cores available to the system or 2,
129
    /// whichever is larger.
130
    pub worker_count: usize,
131

            
132
    /// Defines how many simultaneous threads should be used when a task is
133
    /// parallelizable. This defaults to the nuber of cpu cores available to the
134
    /// system.
135
    pub parallelization: usize,
136
}
137

            
138
impl SystemDefault for Tasks {
139
3465
    fn default_for(system: &System) -> Self {
140
3465
        let num_cpus = system
141
3465
            .physical_core_count()
142
3465
            .unwrap_or(0)
143
3465
            .max(system.processors().len())
144
3465
            .max(1);
145
3465
        Self {
146
3465
            worker_count: num_cpus * 2,
147
3465
            parallelization: num_cpus,
148
3465
        }
149
3465
    }
150
}
151

            
152
/// Configuration options for views.
153
3465
#[derive(Clone, Debug, Default)]
154
pub struct Views {
155
    /// If true, the database will scan all views during the call to
156
    /// `open_local`. This will cause database opening to take longer, but once
157
    /// the database is open, no request will need to wait for the integrity to
158
    /// be checked. However, for faster startup time, you may wish to delay the
159
    /// integrity scan. Default value is `false`.
160
    pub check_integrity_on_open: bool,
161
}
162

            
163
/// Rules for persisting key-value changes. Default persistence is to
164
/// immediately persist all changes. While this ensures data integrity, the
165
/// overhead of the key-value store can be significantly reduced by utilizing
166
/// lazy persistence strategies that delay writing changes until certain
167
/// thresholds have been met.
168
///
169
/// ## Immediate persistence
170
///
171
/// The default persistence mode will trigger commits always:
172
///
173
/// ```rust
174
/// # use bonsaidb_local::config::KeyValuePersistence;
175
/// # use std::time::Duration;
176
/// assert!(!KeyValuePersistence::default().should_commit(0, Duration::ZERO));
177
/// assert!(KeyValuePersistence::default().should_commit(1, Duration::ZERO));
178
/// ```
179
///
180
/// ## Lazy persistence
181
///
182
/// Lazy persistence allows setting multiple thresholds, allowing for customized
183
/// behavior that can help tune performance, especially under write-heavy loads.
184
///
185
/// It is good practice to include one [`PersistenceThreshold`] that has no
186
/// duration, as it will ensure that the in-memory cache cannot exceed a certain
187
/// size. This number is counted for each database indepenently.
188
///
189
/// ```rust
190
/// # use bonsaidb_local::config::{KeyValuePersistence, PersistenceThreshold};
191
/// # use std::time::Duration;
192
/// #
193
/// let persistence = KeyValuePersistence::lazy([
194
///     PersistenceThreshold::after_changes(1).and_duration(Duration::from_secs(120)),
195
///     PersistenceThreshold::after_changes(10).and_duration(Duration::from_secs(10)),
196
///     PersistenceThreshold::after_changes(100),
197
/// ]);
198
///
199
/// // After 1 change and 60 seconds, no changes would be committed:
200
/// assert!(!persistence.should_commit(1, Duration::from_secs(60)));
201
/// // But on or after 120 seconds, that change will be committed:
202
/// assert!(persistence.should_commit(1, Duration::from_secs(120)));
203
///
204
/// // After 10 changes and 10 seconds, changes will be committed:
205
/// assert!(persistence.should_commit(10, Duration::from_secs(10)));
206
///
207
/// // Once 100 changes have been accumulated, this ruleset will always commit
208
/// // regardless of duration.
209
/// assert!(persistence.should_commit(100, Duration::ZERO));
210
/// ```
211
26882
#[derive(Debug, Clone)]
212
#[must_use]
213
pub struct KeyValuePersistence(KeyValuePersistenceInner);
214

            
215
26882
#[derive(Debug, Clone)]
216
enum KeyValuePersistenceInner {
217
    Immediate,
218
    Lazy(Vec<PersistenceThreshold>),
219
}
220

            
221
impl Default for KeyValuePersistence {
222
    /// Returns [`KeyValuePersistence::immediate()`].
223
3470
    fn default() -> Self {
224
3470
        Self::immediate()
225
3470
    }
226
}
227

            
228
impl KeyValuePersistence {
229
    /// Returns a ruleset that commits all changes immediately.
230
3710
    pub const fn immediate() -> Self {
231
3710
        Self(KeyValuePersistenceInner::Immediate)
232
3710
    }
233

            
234
    /// Returns a ruleset that lazily commits data based on a list of thresholds.
235
13
    pub fn lazy<II>(rules: II) -> Self
236
13
    where
237
13
        II: IntoIterator<Item = PersistenceThreshold>,
238
13
    {
239
13
        let mut rules = rules.into_iter().collect::<Vec<_>>();
240
13
        rules.sort_by(|a, b| a.number_of_changes.cmp(&b.number_of_changes));
241
13
        Self(KeyValuePersistenceInner::Lazy(rules))
242
13
    }
243

            
244
    /// Returns true if these rules determine that the outstanding changes should be persisted.
245
    #[must_use]
246
91771
    pub fn should_commit(
247
91771
        &self,
248
91771
        number_of_changes: usize,
249
91771
        elapsed_since_last_commit: Duration,
250
91771
    ) -> bool {
251
91771
        self.duration_until_next_commit(number_of_changes, elapsed_since_last_commit)
252
91771
            == Some(Duration::ZERO)
253
91771
    }
254

            
255
184188
    pub(crate) fn duration_until_next_commit(
256
184188
        &self,
257
184188
        number_of_changes: usize,
258
184188
        elapsed_since_last_commit: Duration,
259
184188
    ) -> Option<Duration> {
260
184188
        if number_of_changes == 0 {
261
13002
            None
262
        } else {
263
171186
            match &self.0 {
264
170275
                KeyValuePersistenceInner::Immediate => Some(Duration::ZERO),
265
911
                KeyValuePersistenceInner::Lazy(rules) => {
266
911
                    let mut shortest_duration = Duration::MAX;
267
911
                    for rule in rules
268
911
                        .iter()
269
915
                        .take_while(|rule| rule.number_of_changes <= number_of_changes)
270
                    {
271
7
                        let remaining_time =
272
7
                            rule.duration.saturating_sub(elapsed_since_last_commit);
273
7
                        shortest_duration = shortest_duration.min(remaining_time);
274
7

            
275
7
                        if shortest_duration == Duration::ZERO {
276
3
                            break;
277
4
                        }
278
                    }
279
911
                    (shortest_duration < Duration::MAX).then(|| shortest_duration)
280
                }
281
            }
282
        }
283
184188
    }
284
}
285

            
286
/// A threshold controlling lazy commits. For a threshold to apply, both
287
/// `number_of_changes` must be met or surpassed and `duration` must have
288
/// elpased since the last commit.
289
///
290
/// A threshold with a duration of zero will not wait any time to persist
291
/// changes once the specified `number_of_changes` has been met or surpassed.
292
#[derive(Debug, Copy, Clone)]
293
#[must_use]
294
pub struct PersistenceThreshold {
295
    /// The minimum number of changes that must have occurred for this threshold to apply.
296
    pub number_of_changes: usize,
297
    /// The amount of time that must elapse since the last write for this threshold to apply.
298
    pub duration: Duration,
299
}
300

            
301
impl PersistenceThreshold {
302
    /// Returns a threshold that applies after a number of changes have elapsed.
303
304
    pub const fn after_changes(number_of_changes: usize) -> Self {
304
304
        Self {
305
304
            number_of_changes,
306
304
            duration: Duration::ZERO,
307
304
        }
308
304
    }
309

            
310
    /// Sets the duration of this threshold to `duration` and returns self.
311
1
    pub const fn and_duration(mut self, duration: Duration) -> Self {
312
1
        self.duration = duration;
313
1
        self
314
1
    }
315
}
316

            
317
/// Storage configuration builder methods.
318
pub trait Builder: Default {
319
    /// Creates a default configuration with `path` set.
320
    #[must_use]
321
246
    fn new<P: AsRef<Path>>(path: P) -> Self {
322
246
        Self::default().path(path)
323
246
    }
324
    /// Registers the schema and returns self.
325
    fn with_schema<S: Schema>(self) -> Result<Self, Error>;
326

            
327
    /// Sets [`StorageConfiguration::path`](StorageConfiguration#structfield.memory_only) to true and returns self.
328
    #[must_use]
329
    fn memory_only(self) -> Self;
330
    /// Sets [`StorageConfiguration::path`](StorageConfiguration#structfield.path) to `path` and returns self.
331
    #[must_use]
332
    fn path<P: AsRef<Path>>(self, path: P) -> Self;
333
    /// Sets [`StorageConfiguration::unique_id`](StorageConfiguration#structfield.unique_id) to `unique_id` and returns self.
334
    #[must_use]
335
    fn unique_id(self, unique_id: u64) -> Self;
336
    /// Sets [`StorageConfiguration::vault_key_storage`](StorageConfiguration#structfield.vault_key_storage) to `key_storage` and returns self.
337
    #[cfg(feature = "encryption")]
338
    #[must_use]
339
    fn vault_key_storage<VaultKeyStorage: AnyVaultKeyStorage>(
340
        self,
341
        key_storage: VaultKeyStorage,
342
    ) -> Self;
343
    /// Sets [`StorageConfiguration::default_encryption_key`](StorageConfiguration#structfield.default_encryption_key) to `path` and returns self.
344
    #[cfg(feature = "encryption")]
345
    #[must_use]
346
    fn default_encryption_key(self, key: KeyId) -> Self;
347
    /// Sets [`Tasks::worker_count`] to `worker_count` and returns self.
348
    #[must_use]
349
    fn tasks_worker_count(self, worker_count: usize) -> Self;
350
    /// Sets [`Tasks::parallelization`] to `parallelization` and returns self.
351
    #[must_use]
352
    fn tasks_parallelization(self, parallelization: usize) -> Self;
353
    /// Sets [`Views::check_integrity_on_open`] to `check` and returns self.
354
    #[must_use]
355
    fn check_view_integrity_on_open(self, check: bool) -> Self;
356
    /// Sets [`StorageConfiguration::default_compression`](StorageConfiguration#structfield.default_compression) to `path` and returns self.
357
    #[cfg(feature = "compression")]
358
    #[must_use]
359
    fn default_compression(self, compression: Compression) -> Self;
360
    /// Sets [`StorageConfiguration::key_value_persistence`](StorageConfiguration#structfield.key_value_persistence) to `persistence` and returns self.
361
    #[must_use]
362
    fn key_value_persistence(self, persistence: KeyValuePersistence) -> Self;
363
    /// Sets [`Self::authenticated_permissions`](Self#structfield.authenticated_permissions) to `authenticated_permissions` and returns self.
364
    #[must_use]
365
    fn authenticated_permissions<P: Into<Permissions>>(self, authenticated_permissions: P) -> Self;
366
    /// Sets [`StorageConfiguration::argon`](StorageConfiguration#structfield.argon) to `argon` and returns self.
367
    #[cfg(feature = "password-hashing")]
368
    #[must_use]
369
    fn argon(self, argon: ArgonConfiguration) -> Self;
370
}
371

            
372
impl Builder for StorageConfiguration {
373
    fn with_schema<S: Schema>(mut self) -> Result<Self, Error> {
374
252
        self.register_schema::<S>()?;
375
252
        Ok(self)
376
252
    }
377

            
378
62
    fn memory_only(mut self) -> Self {
379
62
        self.memory_only = true;
380
62
        self
381
62
    }
382

            
383
164
    fn path<P: AsRef<Path>>(mut self, path: P) -> Self {
384
164
        self.path = Some(path.as_ref().to_owned());
385
164
        self
386
164
    }
387

            
388
    fn unique_id(mut self, unique_id: u64) -> Self {
389
        self.unique_id = Some(unique_id);
390
        self
391
    }
392

            
393
    #[cfg(feature = "encryption")]
394
6
    fn vault_key_storage<VaultKeyStorage: AnyVaultKeyStorage>(
395
6
        mut self,
396
6
        key_storage: VaultKeyStorage,
397
6
    ) -> Self {
398
6
        self.vault_key_storage = Some(Arc::new(key_storage));
399
6
        self
400
6
    }
401

            
402
    #[cfg(feature = "encryption")]
403
180
    fn default_encryption_key(mut self, key: KeyId) -> Self {
404
180
        self.default_encryption_key = Some(key);
405
180
        self
406
180
    }
407

            
408
    #[cfg(feature = "compression")]
409
275
    fn default_compression(mut self, compression: Compression) -> Self {
410
275
        self.default_compression = Some(compression);
411
275
        self
412
275
    }
413

            
414
    fn tasks_worker_count(mut self, worker_count: usize) -> Self {
415
        self.workers.worker_count = worker_count;
416
        self
417
    }
418

            
419
    fn tasks_parallelization(mut self, parallelization: usize) -> Self {
420
        self.workers.parallelization = parallelization;
421
        self
422
    }
423

            
424
1
    fn check_view_integrity_on_open(mut self, check: bool) -> Self {
425
1
        self.views.check_integrity_on_open = check;
426
1
        self
427
1
    }
428

            
429
1
    fn key_value_persistence(mut self, persistence: KeyValuePersistence) -> Self {
430
1
        self.key_value_persistence = persistence;
431
1
        self
432
1
    }
433

            
434
    fn authenticated_permissions<P: Into<Permissions>>(
435
        mut self,
436
        authenticated_permissions: P,
437
    ) -> Self {
438
        self.authenticated_permissions = authenticated_permissions.into();
439
        self
440
    }
441

            
442
    #[cfg(feature = "password-hashing")]
443
    fn argon(mut self, argon: ArgonConfiguration) -> Self {
444
        self.argon = argon;
445
        self
446
    }
447
}
448

            
449
pub(crate) trait SystemDefault: Sized {
450
    fn default_for(system: &System) -> Self;
451
1
    fn default() -> Self {
452
1
        let system_specs = RefreshKind::new().with_cpu().with_memory();
453
1
        let mut system = System::new_with_specifics(system_specs);
454
1
        system.refresh_specifics(system_specs);
455
1
        Self::default_for(&system)
456
1
    }
457
}
458

            
459
/// All available compression algorithms.
460
1730246
#[derive(Debug, Clone, Copy)]
461
pub enum Compression {
462
    /// Compress data using the
463
    /// [lz4](https://en.wikipedia.org/wiki/LZ4_(compression_algorithm))
464
    /// algorithm. This is powered by
465
    /// [lz4_flex](https://crates.io/crates/lz4_flex).
466
    Lz4 = 1,
467
}
468

            
469
impl Compression {
470
    #[must_use]
471
    #[cfg(feature = "compression")]
472
366964
    pub(crate) fn from_u8(value: u8) -> Option<Self> {
473
366964
        match value {
474
358953
            1 => Some(Self::Lz4),
475
8011
            _ => None,
476
        }
477
366964
    }
478
}