1
use std::{
2
    collections::HashMap,
3
    path::{Path, PathBuf},
4
    sync::Arc,
5
    time::Duration,
6
};
7

            
8
#[cfg(feature = "encryption")]
9
use bonsaidb_core::document::KeyId;
10
use bonsaidb_core::schema::{Schema, SchemaName};
11
use sysinfo::{RefreshKind, System, SystemExt};
12

            
13
#[cfg(feature = "encryption")]
14
use crate::vault::AnyVaultKeyStorage;
15
use crate::{
16
    storage::{DatabaseOpener, StorageSchemaOpener},
17
    Error,
18
};
19

            
20
#[cfg(feature = "password-hashing")]
21
mod argon;
22
#[cfg(feature = "password-hashing")]
23
pub use argon::*;
24

            
25
/// Configuration options for [`Storage`](crate::storage::Storage).
26
12
#[derive(Debug, Clone)]
27
#[non_exhaustive]
28
pub struct StorageConfiguration {
29
    /// The path to the database. Defaults to `db.bonsaidb` if not specified.
30
    pub path: Option<PathBuf>,
31

            
32
    /// Prevents storing data on the disk. This is intended for testing purposes
33
    /// primarily. Keep in mind that the underlying storage format is
34
    /// append-only.
35
    pub memory_only: bool,
36

            
37
    /// The unique id of the server. If not specified, the server will randomly
38
    /// generate a unique id on startup. If the server generated an id and this
39
    /// value is subsequently set, the generated id will be overridden by the
40
    /// one specified here.
41
    pub unique_id: Option<u64>,
42

            
43
    /// The vault key storage to use. If not specified,
44
    /// [`LocalVaultKeyStorage`](crate::vault::LocalVaultKeyStorage) will be
45
    /// used with the server's data folder as the path. This is **incredibly
46
    /// insecure and should not be used outside of testing**.
47
    ///
48
    /// For secure encryption, it is important to store the vault keys in a
49
    /// location that is separate from the database. If the keys are on the same
50
    /// hardware as the encrypted content, anyone with access to the disk will
51
    /// be able to decrypt the stored data.
52
    #[cfg(feature = "encryption")]
53
    pub vault_key_storage: Option<Arc<dyn AnyVaultKeyStorage>>,
54

            
55
    /// The default encryption key for the database. If specified, all documents
56
    /// will be stored encrypted at-rest using the key specified. Having this
57
    /// key specified will also encrypt views. Without this, views will be
58
    /// stored unencrypted.
59
    #[cfg(feature = "encryption")]
60
    pub default_encryption_key: Option<KeyId>,
61

            
62
    /// Configuration options related to background tasks.
63
    pub workers: Tasks,
64

            
65
    /// Configuration options related to views.
66
    pub views: Views,
67

            
68
    /// Controls how the key-value store persists keys, on a per-database basis.
69
    pub key_value_persistence: KeyValuePersistence,
70

            
71
    /// Sets the default compression algorithm.
72
    #[cfg(feature = "compression")]
73
    pub default_compression: Option<Compression>,
74

            
75
    /// Password hashing configuration.
76
    #[cfg(feature = "password-hashing")]
77
    pub argon: ArgonConfiguration,
78

            
79
    pub(crate) initial_schemas: HashMap<SchemaName, Arc<dyn DatabaseOpener>>,
80
}
81

            
82
impl Default for StorageConfiguration {
83
2697
    fn default() -> Self {
84
2697
        let system_specs = RefreshKind::new().with_cpu().with_memory();
85
2697
        let mut system = System::new_with_specifics(system_specs);
86
2697
        system.refresh_specifics(system_specs);
87
2697
        Self {
88
2697
            path: None,
89
2697
            memory_only: false,
90
2697
            unique_id: None,
91
2697
            #[cfg(feature = "encryption")]
92
2697
            vault_key_storage: None,
93
2697
            #[cfg(feature = "encryption")]
94
2697
            default_encryption_key: None,
95
2697
            #[cfg(feature = "compression")]
96
2697
            default_compression: None,
97
2697
            workers: Tasks::default_for(&system),
98
2697
            views: Views::default(),
99
2697
            key_value_persistence: KeyValuePersistence::default(),
100
2697
            #[cfg(feature = "password-hashing")]
101
2697
            argon: ArgonConfiguration::default_for(&system),
102
2697
            initial_schemas: HashMap::default(),
103
2697
        }
104
2697
    }
105
}
106

            
107
impl StorageConfiguration {
108
    /// Registers the schema provided.
109
    pub fn register_schema<S: Schema>(&mut self) -> Result<(), Error> {
110
        self.initial_schemas
111
256
            .insert(S::schema_name(), Arc::new(StorageSchemaOpener::<S>::new()?));
112
256
        Ok(())
113
256
    }
114
}
115

            
116
/// Configuration options for background tasks.
117
12
#[derive(Debug, Clone)]
118
pub struct Tasks {
119
    /// Defines how many workers should be spawned to process tasks. This
120
    /// defaults to the 2x the number of cpu cores available to the system or 2,
121
    /// whichever is larger.
122
    pub worker_count: usize,
123

            
124
    /// Defines how many simultaneous threads should be used when a task is
125
    /// parallelizable. This defaults to the nuber of cpu cores available to the
126
    /// system.
127
    pub parallelization: usize,
128
}
129

            
130
impl SystemDefault for Tasks {
131
2723
    fn default_for(system: &System) -> Self {
132
2723
        let num_cpus = system
133
2723
            .physical_core_count()
134
2723
            .unwrap_or(0)
135
2723
            .max(system.processors().len())
136
2723
            .max(1);
137
2723
        Self {
138
2723
            worker_count: num_cpus * 2,
139
2723
            parallelization: num_cpus,
140
2723
        }
141
2723
    }
142
}
143

            
144
/// Configuration options for views.
145
2697
#[derive(Clone, Debug, Default)]
146
pub struct Views {
147
    /// If true, the database will scan all views during the call to
148
    /// `open_local`. This will cause database opening to take longer, but once
149
    /// the database is open, no request will need to wait for the integrity to
150
    /// be checked. However, for faster startup time, you may wish to delay the
151
    /// integrity scan. Default value is `false`.
152
    pub check_integrity_on_open: bool,
153
}
154

            
155
/// Rules for persisting key-value changes. Default persistence is to
156
/// immediately persist all changes. While this ensures data integrity, the
157
/// overhead of the key-value store can be significantly reduced by utilizing
158
/// lazy persistence strategies that delay writing changes until certain
159
/// thresholds have been met.
160
///
161
/// ## Immediate persistence
162
///
163
/// The default persistence mode will trigger commits always:
164
///
165
/// ```rust
166
/// # use bonsaidb_local::config::KeyValuePersistence;
167
/// # use std::time::Duration;
168
/// assert!(!KeyValuePersistence::default().should_commit(0, Duration::ZERO));
169
/// assert!(KeyValuePersistence::default().should_commit(1, Duration::ZERO));
170
/// ```
171
///
172
/// ## Lazy persistence
173
///
174
/// Lazy persistence allows setting multiple thresholds, allowing for customized
175
/// behavior that can help tune performance, especially under write-heavy loads.
176
///
177
/// It is good practice to include one [`PersistenceThreshold`] that has no
178
/// duration, as it will ensure that the in-memory cache cannot exceed a certain
179
/// size. This number is counted for each database indepenently.
180
///
181
/// ```rust
182
/// # use bonsaidb_local::config::{KeyValuePersistence, PersistenceThreshold};
183
/// # use std::time::Duration;
184
/// #
185
/// let persistence = KeyValuePersistence::lazy([
186
///     PersistenceThreshold::after_changes(1).and_duration(Duration::from_secs(120)),
187
///     PersistenceThreshold::after_changes(10).and_duration(Duration::from_secs(10)),
188
///     PersistenceThreshold::after_changes(100),
189
/// ]);
190
///
191
/// // After 1 change and 60 seconds, no changes would be committed:
192
/// assert!(!persistence.should_commit(1, Duration::from_secs(60)));
193
/// // But on or after 120 seconds, that change will be committed:
194
/// assert!(persistence.should_commit(1, Duration::from_secs(120)));
195
///
196
/// // After 10 changes and 10 seconds, changes will be committed:
197
/// assert!(persistence.should_commit(10, Duration::from_secs(10)));
198
///
199
/// // Once 100 changes have been accumulated, this ruleset will always commit
200
/// // regardless of duration.
201
/// assert!(persistence.should_commit(100, Duration::ZERO));
202
/// ```
203
21698
#[derive(Debug, Clone)]
204
#[must_use]
205
pub struct KeyValuePersistence(KeyValuePersistenceInner);
206

            
207
21698
#[derive(Debug, Clone)]
208
enum KeyValuePersistenceInner {
209
    Immediate,
210
    Lazy(Vec<PersistenceThreshold>),
211
}
212

            
213
impl Default for KeyValuePersistence {
214
    /// Returns [`KeyValuePersistence::immediate()`].
215
2728
    fn default() -> Self {
216
2728
        Self::immediate()
217
2728
    }
218
}
219

            
220
impl KeyValuePersistence {
221
    /// Returns a ruleset that commits all changes immediately.
222
2936
    pub const fn immediate() -> Self {
223
2936
        Self(KeyValuePersistenceInner::Immediate)
224
2936
    }
225

            
226
    /// Returns a ruleset that lazily commits data based on a list of thresholds.
227
13
    pub fn lazy<II>(rules: II) -> Self
228
13
    where
229
13
        II: IntoIterator<Item = PersistenceThreshold>,
230
13
    {
231
13
        let mut rules = rules.into_iter().collect::<Vec<_>>();
232
13
        rules.sort_by(|a, b| a.number_of_changes.cmp(&b.number_of_changes));
233
13
        Self(KeyValuePersistenceInner::Lazy(rules))
234
13
    }
235

            
236
    /// Returns true if these rules determine that the outstanding changes should be persisted.
237
    #[must_use]
238
127230
    pub fn should_commit(
239
127230
        &self,
240
127230
        number_of_changes: usize,
241
127230
        elapsed_since_last_commit: Duration,
242
127230
    ) -> bool {
243
127230
        self.duration_until_next_commit(number_of_changes, elapsed_since_last_commit)
244
127230
            == Duration::ZERO
245
127230
    }
246

            
247
254968
    pub(crate) fn duration_until_next_commit(
248
254968
        &self,
249
254968
        number_of_changes: usize,
250
254968
        elapsed_since_last_commit: Duration,
251
254968
    ) -> Duration {
252
254968
        if number_of_changes == 0 {
253
60763
            Duration::MAX
254
        } else {
255
194205
            match &self.0 {
256
193414
                KeyValuePersistenceInner::Immediate => Duration::ZERO,
257
791
                KeyValuePersistenceInner::Lazy(rules) => {
258
791
                    let mut shortest_duration = Duration::MAX;
259
791
                    for rule in rules
260
791
                        .iter()
261
795
                        .take_while(|rule| rule.number_of_changes <= number_of_changes)
262
                    {
263
7
                        let remaining_time =
264
7
                            rule.duration.saturating_sub(elapsed_since_last_commit);
265
7
                        shortest_duration = shortest_duration.min(remaining_time);
266
7

            
267
7
                        if shortest_duration == Duration::ZERO {
268
3
                            break;
269
4
                        }
270
                    }
271
791
                    shortest_duration
272
                }
273
            }
274
        }
275
254968
    }
276
}
277

            
278
/// A threshold controlling lazy commits. For a threshold to apply, both
279
/// `number_of_changes` must be met or surpassed and `duration` must have
280
/// elpased since the last commit.
281
///
282
/// A threshold with a duration of zero will not wait any time to persist
283
/// changes once the specified `number_of_changes` has been met or surpassed.
284
#[derive(Debug, Copy, Clone)]
285
#[must_use]
286
pub struct PersistenceThreshold {
287
    /// The minimum number of changes that must have occurred for this threshold to apply.
288
    pub number_of_changes: usize,
289
    /// The amount of time that must elapse since the last write for this threshold to apply.
290
    pub duration: Duration,
291
}
292

            
293
impl PersistenceThreshold {
294
    /// Returns a threshold that applies after a number of changes have elapsed.
295
264
    pub const fn after_changes(number_of_changes: usize) -> Self {
296
264
        Self {
297
264
            number_of_changes,
298
264
            duration: Duration::ZERO,
299
264
        }
300
264
    }
301

            
302
    /// Sets the duration of this threshold to `duration` and returns self.
303
1
    pub const fn and_duration(mut self, duration: Duration) -> Self {
304
1
        self.duration = duration;
305
1
        self
306
1
    }
307
}
308

            
309
/// Storage configuration builder methods.
310
pub trait Builder: Default {
311
    /// Creates a default configuration with `path` set.
312
    #[must_use]
313
173
    fn new<P: AsRef<Path>>(path: P) -> Self {
314
173
        Self::default().path(path)
315
173
    }
316

            
317
    /// Registers the schema and returns self.
318
    fn with_schema<S: Schema>(self) -> Result<Self, Error>;
319

            
320
    /// Sets [`StorageConfiguration::path`](StorageConfiguration#structfield.memory_only) to true and returns self.
321
    #[must_use]
322
    fn memory_only(self) -> Self;
323
    /// Sets [`StorageConfiguration::path`](StorageConfiguration#structfield.path) to `path` and returns self.
324
    #[must_use]
325
    fn path<P: AsRef<Path>>(self, path: P) -> Self;
326
    /// Sets [`StorageConfiguration::unique_id`](StorageConfiguration#structfield.unique_id) to `unique_id` and returns self.
327
    #[must_use]
328
    fn unique_id(self, unique_id: u64) -> Self;
329
    /// Sets [`StorageConfiguration::vault_key_storage`](StorageConfiguration#structfield.vault_key_storage) to `key_storage` and returns self.
330
    #[cfg(feature = "encryption")]
331
    #[must_use]
332
    fn vault_key_storage<VaultKeyStorage: AnyVaultKeyStorage>(
333
        self,
334
        key_storage: VaultKeyStorage,
335
    ) -> Self;
336
    /// Sets [`StorageConfiguration::default_encryption_key`](StorageConfiguration#structfield.default_encryption_key) to `path` and returns self.
337
    #[cfg(feature = "encryption")]
338
    #[must_use]
339
    fn default_encryption_key(self, key: KeyId) -> Self;
340
    /// Sets [`Tasks::worker_count`] to `worker_count` and returns self.
341
    #[must_use]
342
    fn tasks_worker_count(self, worker_count: usize) -> Self;
343
    /// Sets [`Tasks::parallelization`] to `parallelization` and returns self.
344
    #[must_use]
345
    fn tasks_parallelization(self, parallelization: usize) -> Self;
346
    /// Sets [`Views::check_integrity_on_open`] to `check` and returns self.
347
    #[must_use]
348
    fn check_view_integrity_on_open(self, check: bool) -> Self;
349
    /// Sets [`StorageConfiguration::default_compression`](StorageConfiguration#structfield.default_compression) to `path` and returns self.
350
    #[cfg(feature = "compression")]
351
    #[must_use]
352
    fn default_compression(self, compression: Compression) -> Self;
353
    /// Sets [`StorageConfiguration::key_value_persistence`](StorageConfiguration#structfield.key_value_persistence) to `persistence` and returns self.
354
    #[must_use]
355
    fn key_value_persistence(self, persistence: KeyValuePersistence) -> Self;
356
}
357

            
358
impl Builder for StorageConfiguration {
359
    fn with_schema<S: Schema>(mut self) -> Result<Self, Error> {
360
179
        self.register_schema::<S>()?;
361
179
        Ok(self)
362
179
    }
363

            
364
30
    fn memory_only(mut self) -> Self {
365
30
        self.memory_only = true;
366
30
        self
367
30
    }
368

            
369
93
    fn path<P: AsRef<Path>>(mut self, path: P) -> Self {
370
93
        self.path = Some(path.as_ref().to_owned());
371
93
        self
372
93
    }
373

            
374
    fn unique_id(mut self, unique_id: u64) -> Self {
375
        self.unique_id = Some(unique_id);
376
        self
377
    }
378

            
379
    #[cfg(feature = "encryption")]
380
3
    fn vault_key_storage<VaultKeyStorage: AnyVaultKeyStorage>(
381
3
        mut self,
382
3
        key_storage: VaultKeyStorage,
383
3
    ) -> Self {
384
3
        self.vault_key_storage = Some(Arc::new(key_storage));
385
3
        self
386
3
    }
387

            
388
    #[cfg(feature = "encryption")]
389
78
    fn default_encryption_key(mut self, key: KeyId) -> Self {
390
78
        self.default_encryption_key = Some(key);
391
78
        self
392
78
    }
393

            
394
    #[cfg(feature = "compression")]
395
191
    fn default_compression(mut self, compression: Compression) -> Self {
396
191
        self.default_compression = Some(compression);
397
191
        self
398
191
    }
399

            
400
    fn tasks_worker_count(mut self, worker_count: usize) -> Self {
401
        self.workers.worker_count = worker_count;
402
        self
403
    }
404

            
405
    fn tasks_parallelization(mut self, parallelization: usize) -> Self {
406
        self.workers.parallelization = parallelization;
407
        self
408
    }
409

            
410
1
    fn check_view_integrity_on_open(mut self, check: bool) -> Self {
411
1
        self.views.check_integrity_on_open = check;
412
1
        self
413
1
    }
414

            
415
1
    fn key_value_persistence(mut self, persistence: KeyValuePersistence) -> Self {
416
1
        self.key_value_persistence = persistence;
417
1
        self
418
1
    }
419
}
420

            
421
pub(crate) trait SystemDefault: Sized {
422
    fn default_for(system: &System) -> Self;
423
1
    fn default() -> Self {
424
1
        let system_specs = RefreshKind::new().with_cpu().with_memory();
425
1
        let mut system = System::new_with_specifics(system_specs);
426
1
        system.refresh_specifics(system_specs);
427
1
        Self::default_for(&system)
428
1
    }
429
}
430

            
431
/// All available compression algorithms.
432
1218651
#[derive(Debug, Clone, Copy)]
433
pub enum Compression {
434
    /// Compress data using the
435
    /// [lz4](https://en.wikipedia.org/wiki/LZ4_(compression_algorithm))
436
    /// algorithm. This is powered by
437
    /// [lz4_flex](https://crates.io/crates/lz4_flex).
438
    Lz4 = 1,
439
}
440

            
441
impl Compression {
442
    #[must_use]
443
    #[cfg(feature = "compression")]
444
246813
    pub(crate) fn from_u8(value: u8) -> Option<Self> {
445
246813
        match value {
446
240817
            1 => Some(Self::Lz4),
447
5996
            _ => None,
448
        }
449
246813
    }
450
}