1
use std::fs::DirEntry;
2
use std::io::ErrorKind;
3
use std::path::{Path, PathBuf};
4

            
5
use bonsaidb_core::connection::{LowLevelConnection, Range, Sort, StorageConnection};
6
use bonsaidb_core::document::DocumentId;
7
use bonsaidb_core::schema::{Collection, Qualified, SchemaName};
8
use bonsaidb_core::transaction::{Operation, Transaction};
9
use bonsaidb_core::{admin, AnyError};
10

            
11
use crate::database::keyvalue::Entry;
12
use crate::database::DatabaseNonBlocking;
13
use crate::{Database, Error, Storage};
14

            
15
/// A location to store and restore a database from.
16
pub trait BackupLocation: Send + Sync {
17
    /// The error type for the backup location.
18
    type Error: AnyError;
19

            
20
    /// Store `object` at `path` with `name`.
21
    fn store(
22
        &self,
23
        schema: &SchemaName,
24
        database_name: &str,
25
        container: &str,
26
        name: &str,
27
        object: &[u8],
28
    ) -> Result<(), Self::Error>;
29

            
30
    /// Lists all of the schemas stored in this backup location.
31
    fn list_schemas(&self) -> Result<Vec<SchemaName>, Self::Error>;
32

            
33
    /// List all of the names of the databases stored for `schema`.
34
    fn list_databases(&self, schema: &SchemaName) -> Result<Vec<String>, Self::Error>;
35

            
36
    /// List all stored named objects at `path`. The names should be the same that were provided when `store()` was called.
37
    fn list_stored(
38
        &self,
39
        schema: &SchemaName,
40
        database_name: &str,
41
        container: &str,
42
    ) -> Result<Vec<String>, Self::Error>;
43

            
44
    /// Load a previously stored object from `path` with `name`.
45
    fn load(
46
        &self,
47
        schema: &SchemaName,
48
        database_name: &str,
49
        container: &str,
50
        name: &str,
51
    ) -> Result<Vec<u8>, Self::Error>;
52
}
53

            
54
impl Storage {
55
    /// Stores a copy of all data in this instance to `location`.
56
38
    pub fn backup<L: AnyBackupLocation>(&self, location: &L) -> Result<(), Error> {
57
38
        let databases = {
58
38
            self.instance
59
38
                .data
60
38
                .available_databases
61
38
                .read()
62
38
                .keys()
63
38
                .cloned()
64
38
                .collect::<Vec<_>>()
65
        };
66

            
67
151
        for name in databases {
68
113
            let database = self
69
113
                .instance
70
113
                .database_without_schema(&name, Some(self), None)?;
71
113
            Self::backup_database(&database, location)?;
72
        }
73

            
74
38
        Ok(())
75
38
    }
76

            
77
    /// Restores all data from a previously stored backup `location`.
78
39
    pub fn restore<L: AnyBackupLocation>(&self, location: &L) -> Result<(), Error> {
79
114
        for schema in location
80
39
            .list_schemas()
81
39
            .map_err(|err| Error::Backup(Box::new(err)))?
82
        {
83
114
            for database in location
84
114
                .list_databases(&schema)
85
114
                .map_err(|err| Error::Backup(Box::new(err)))?
86
            {
87
                // The admin database is already going to be created by the process of creating a database.
88
114
                self.create_database_with_schema(&database, schema.clone(), true)?;
89

            
90
114
                let database =
91
114
                    self.instance
92
114
                        .database_without_schema(&database, Some(self), None)?;
93
114
                Self::restore_database(&database, location)?;
94
            }
95
        }
96

            
97
38
        Ok(())
98
39
    }
99

            
100
113
    pub(crate) fn backup_database(
101
113
        database: &Database,
102
113
        location: &dyn AnyBackupLocation,
103
113
    ) -> Result<(), Error> {
104
113
        let schema = database.schematic().name.clone();
105
302
        for collection in database.schematic().collections() {
106
302
            let documents = database.list_from_collection(
107
302
                Range::from(..),
108
302
                Sort::Ascending,
109
302
                None,
110
302
                collection,
111
302
            )?;
112
302
            let collection_name = collection.encoded();
113
            // TODO consider how to best parallelize -- perhaps a location can opt into parallelization?
114
527
            for document in documents {
115
225
                location.store(
116
225
                    &schema,
117
225
                    database.name(),
118
225
                    &collection_name,
119
225
                    &document.header.id.to_string(),
120
225
                    &document.contents,
121
225
                )?;
122
            }
123
302
            for ((namespace, key), entry) in database.all_key_value_entries()? {
124
3
                let full_name = format!("{}._key._{key}", namespace.as_deref().unwrap_or(""));
125
3
                location.store(
126
3
                    &schema,
127
3
                    database.name(),
128
3
                    "_kv",
129
3
                    &full_name,
130
3
                    &pot::to_vec(&entry)?,
131
                )?;
132
            }
133
        }
134
113
        Ok(())
135
113
    }
136

            
137
114
    pub(crate) fn restore_database(
138
114
        database: &Database,
139
114
        location: &dyn AnyBackupLocation,
140
114
    ) -> Result<(), Error> {
141
114
        let schema = database.schematic().name.clone();
142
114
        let mut transaction = Transaction::new();
143
114
        // Restore all the collections. However, there's one collection we don't
144
114
        // want to restore: the Databases list. This will be recreated during
145
114
        // the process of restoring the backup, so we skip it.
146
114
        let database_collection = admin::Database::collection_name();
147
265
        for collection in database
148
114
            .schematic()
149
114
            .collections()
150
303
            .filter(|c| *c != &database_collection)
151
        {
152
265
            let collection_name = collection.encoded();
153
265
            for (id, id_string) in location
154
265
                .list_stored(&schema, database.name(), &collection_name)?
155
265
                .into_iter()
156
265
                .filter_map(|id_string| {
157
113
                    id_string
158
113
                        .parse::<DocumentId>()
159
113
                        .ok()
160
113
                        .map(|id| (id, id_string))
161
265
                })
162
113
            {
163
113
                let contents =
164
113
                    location.load(&schema, database.name(), &collection_name, &id_string)?;
165
113
                transaction.push(Operation::insert(collection.clone(), Some(id), contents));
166
            }
167
        }
168
114
        database.apply_transaction(transaction)?;
169

            
170
114
        for full_key in location.list_stored(&schema, database.name(), "_kv")? {
171
3
            if let Some((namespace, key)) = full_key.split_once("._key._") {
172
3
                let entry = location.load(&schema, database.name(), "_kv", &full_key)?;
173
3
                let entry = pot::from_slice::<Entry>(&entry)?;
174
3
                let namespace = if namespace.is_empty() {
175
3
                    None
176
                } else {
177
                    Some(namespace.to_string())
178
                };
179
3
                entry.restore(namespace, key.to_string(), database)?;
180
            }
181
        }
182

            
183
113
        Ok(())
184
114
    }
185
}
186

            
187
pub trait AnyBackupLocation: Send + Sync {
188
    fn store(
189
        &self,
190
        schema: &SchemaName,
191
        database_name: &str,
192
        container: &str,
193
        name: &str,
194
        object: &[u8],
195
    ) -> Result<(), Error>;
196

            
197
    fn list_schemas(&self) -> Result<Vec<SchemaName>, Error>;
198

            
199
    fn list_databases(&self, schema: &SchemaName) -> Result<Vec<String>, Error>;
200

            
201
    fn list_stored(
202
        &self,
203
        schema: &SchemaName,
204
        database_name: &str,
205
        container: &str,
206
    ) -> Result<Vec<String>, Error>;
207

            
208
    fn load(
209
        &self,
210
        schema: &SchemaName,
211
        database_name: &str,
212
        container: &str,
213
        name: &str,
214
    ) -> Result<Vec<u8>, Error>;
215
}
216

            
217
impl<L, E> AnyBackupLocation for L
218
where
219
    L: BackupLocation<Error = E>,
220
    E: AnyError,
221
{
222
228
    fn store(
223
228
        &self,
224
228
        schema: &SchemaName,
225
228
        database_name: &str,
226
228
        container: &str,
227
228
        name: &str,
228
228
        object: &[u8],
229
228
    ) -> Result<(), Error> {
230
228
        self.store(schema, database_name, container, name, object)
231
228
            .map_err(|err| Error::Backup(Box::new(err)))
232
228
    }
233

            
234
39
    fn list_schemas(&self) -> Result<Vec<SchemaName>, Error> {
235
39
        self.list_schemas()
236
39
            .map_err(|err| Error::Backup(Box::new(err)))
237
39
    }
238

            
239
114
    fn list_databases(&self, schema: &SchemaName) -> Result<Vec<String>, Error> {
240
114
        self.list_databases(schema)
241
114
            .map_err(|err| Error::Backup(Box::new(err)))
242
114
    }
243

            
244
378
    fn list_stored(
245
378
        &self,
246
378
        schema: &SchemaName,
247
378
        database_name: &str,
248
378
        container: &str,
249
378
    ) -> Result<Vec<String>, Error> {
250
378
        self.list_stored(schema, database_name, container)
251
378
            .map_err(|err| Error::Backup(Box::new(err)))
252
378
    }
253

            
254
116
    fn load(
255
116
        &self,
256
116
        schema: &SchemaName,
257
116
        database_name: &str,
258
116
        container: &str,
259
116
        name: &str,
260
116
    ) -> Result<Vec<u8>, Error> {
261
116
        self.load(schema, database_name, container, name)
262
116
            .map_err(|err| Error::Backup(Box::new(err)))
263
116
    }
264
}
265

            
266
impl BackupLocation for Path {
267
    type Error = std::io::Error;
268

            
269
228
    fn store(
270
228
        &self,
271
228
        schema: &SchemaName,
272
228
        database_name: &str,
273
228
        container: &str,
274
228
        name: &str,
275
228
        object: &[u8],
276
228
    ) -> Result<(), Self::Error> {
277
228
        let container_folder = container_folder(self, schema, database_name, container);
278
228
        std::fs::create_dir_all(&container_folder)?;
279
228
        std::fs::write(container_folder.join(name), object)?;
280

            
281
228
        Ok(())
282
228
    }
283

            
284
39
    fn list_schemas(&self) -> Result<Vec<SchemaName>, Self::Error> {
285
39
        iterate_directory(self, |entry, file_name| {
286
115
            if entry.file_type()?.is_dir() {
287
115
                if let Ok(schema_name) = SchemaName::parse_encoded(file_name.as_str()) {
288
115
                    return Ok(Some(schema_name));
289
                }
290
            }
291
            Ok(None)
292
115
        })
293
39
    }
294

            
295
114
    fn list_databases(&self, schema: &SchemaName) -> Result<Vec<String>, Self::Error> {
296
114
        iterate_directory(&schema_folder(self, schema), |entry, file_name| {
297
114
            if entry.file_type()?.is_dir() && file_name != "_kv" {
298
114
                return Ok(Some(file_name));
299
            }
300
            Ok(None)
301
114
        })
302
114
    }
303

            
304
378
    fn list_stored(
305
378
        &self,
306
378
        schema: &SchemaName,
307
378
        database_name: &str,
308
378
        container: &str,
309
378
    ) -> Result<Vec<String>, Self::Error> {
310
378
        iterate_directory(
311
378
            &container_folder(self, schema, database_name, container),
312
378
            |entry, file_name| {
313
116
                if entry.file_type()?.is_file() {
314
116
                    return Ok(Some(file_name));
315
                }
316
                Ok(None)
317
378
            },
318
378
        )
319
378
    }
320

            
321
116
    fn load(
322
116
        &self,
323
116
        schema: &SchemaName,
324
116
        database_name: &str,
325
116
        container: &str,
326
116
        name: &str,
327
116
    ) -> Result<Vec<u8>, Self::Error> {
328
116
        std::fs::read(container_folder(self, schema, database_name, container).join(name))
329
116
    }
330
}
331

            
332
impl BackupLocation for PathBuf {
333
    type Error = std::io::Error;
334

            
335
228
    fn store(
336
228
        &self,
337
228
        schema: &SchemaName,
338
228
        database_name: &str,
339
228
        container: &str,
340
228
        name: &str,
341
228
        object: &[u8],
342
228
    ) -> Result<(), Self::Error> {
343
228
        BackupLocation::store(
344
228
            self.as_path(),
345
228
            schema,
346
228
            database_name,
347
228
            container,
348
228
            name,
349
228
            object,
350
228
        )
351
228
    }
352

            
353
39
    fn list_schemas(&self) -> Result<Vec<SchemaName>, Self::Error> {
354
39
        BackupLocation::list_schemas(self.as_path())
355
39
    }
356

            
357
114
    fn list_databases(&self, schema: &SchemaName) -> Result<Vec<String>, Self::Error> {
358
114
        BackupLocation::list_databases(self.as_path(), schema)
359
114
    }
360

            
361
378
    fn list_stored(
362
378
        &self,
363
378
        schema: &SchemaName,
364
378
        database_name: &str,
365
378
        container: &str,
366
378
    ) -> Result<Vec<String>, Self::Error> {
367
378
        BackupLocation::list_stored(self.as_path(), schema, database_name, container)
368
378
    }
369

            
370
116
    fn load(
371
116
        &self,
372
116
        schema: &SchemaName,
373
116
        database_name: &str,
374
116
        container: &str,
375
116
        name: &str,
376
116
    ) -> Result<Vec<u8>, Self::Error> {
377
116
        BackupLocation::load(self.as_path(), schema, database_name, container, name)
378
116
    }
379
}
380

            
381
531
fn iterate_directory<T, F: FnMut(DirEntry, String) -> Result<Option<T>, std::io::Error>>(
382
531
    path: &Path,
383
531
    mut callback: F,
384
531
) -> Result<Vec<T>, std::io::Error> {
385
531
    let mut collected = Vec::new();
386
531
    let Some(mut directories) = std::fs::read_dir(path).ignore_not_found()? else {
387
301
        return Ok(collected);
388
    };
389

            
390
575
    while let Some(entry) = directories
391
575
        .next()
392
575
        .map(IoResultExt::ignore_not_found)
393
575
        .transpose()?
394
575
        .flatten()
395
    {
396
345
        if let Ok(file_name) = entry.file_name().into_string() {
397
345
            if let Some(result) = callback(entry, file_name)? {
398
345
                collected.push(result);
399
345
            }
400
        }
401
    }
402

            
403
230
    Ok(collected)
404
531
}
405

            
406
trait IoResultExt<T>: Sized {
407
    fn ignore_not_found(self) -> Result<Option<T>, std::io::Error>;
408
}
409

            
410
impl<T> IoResultExt<T> for Result<T, std::io::Error> {
411
876
    fn ignore_not_found(self) -> Result<Option<T>, std::io::Error> {
412
876
        match self {
413
575
            Ok(value) => Ok(Some(value)),
414
301
            Err(err) => {
415
301
                if err.kind() == ErrorKind::NotFound {
416
301
                    Ok(None)
417
                } else {
418
                    Err(err)
419
                }
420
            }
421
        }
422
876
    }
423
}
424

            
425
836
fn schema_folder(base: &Path, schema: &SchemaName) -> PathBuf {
426
836
    base.join(schema.encoded())
427
836
}
428

            
429
722
fn database_folder(base: &Path, schema: &SchemaName, database_name: &str) -> PathBuf {
430
722
    schema_folder(base, schema).join(database_name)
431
722
}
432

            
433
722
fn container_folder(
434
722
    base: &Path,
435
722
    schema: &SchemaName,
436
722
    database_name: &str,
437
722
    container: &str,
438
722
) -> PathBuf {
439
722
    database_folder(base, schema, database_name).join(container)
440
722
}
441

            
442
#[cfg(test)]
443
mod tests {
444
    use bonsaidb_core::connection::{Connection as _, StorageConnection as _};
445
    use bonsaidb_core::keyvalue::KeyValue;
446
    use bonsaidb_core::schema::SerializedCollection;
447
    use bonsaidb_core::test_util::{Basic, TestDirectory};
448

            
449
    use crate::config::{Builder, KeyValuePersistence, PersistenceThreshold, StorageConfiguration};
450
    use crate::Storage;
451

            
452
1
    #[test]
453
1
    fn backup_restore() -> anyhow::Result<()> {
454
1
        let backup_destination = TestDirectory::new("backup-restore.bonsaidb.backup");
455

            
456
        // First, create a database that we'll be restoring. `TestDirectory`
457
        // will automatically erase the database when it drops out of scope,
458
        // which is why we're creating a nested scope here.
459
1
        let test_doc = {
460
1
            let database_directory = TestDirectory::new("backup-restore.bonsaidb");
461
1
            let storage = Storage::open(
462
1
                StorageConfiguration::new(&database_directory)
463
1
                    .key_value_persistence(KeyValuePersistence::lazy([
464
1
                        PersistenceThreshold::after_changes(2),
465
1
                    ]))
466
1
                    .with_schema::<Basic>()?,
467
            )?;
468

            
469
1
            let db = storage.create_database::<Basic>("basic", false)?;
470
1
            let test_doc = db.collection::<Basic>().push(&Basic::new("somevalue"))?;
471
1
            db.set_numeric_key("key1", 1_u64).execute()?;
472
1
            db.set_numeric_key("key2", 2_u64).execute()?;
473
            // This key will not be persisted right away.
474
1
            db.set_numeric_key("key3", 3_u64).execute()?;
475

            
476
1
            storage.backup(&backup_destination.0).unwrap();
477
1

            
478
1
            test_doc
479
1
        };
480
1

            
481
1
        // `backup_destination` now contains an export of the database, time to try loading it:
482
1
        let database_directory = TestDirectory::new("backup-restore.bonsaidb");
483
1
        let restored_storage =
484
1
            Storage::open(StorageConfiguration::new(&database_directory).with_schema::<Basic>()?)?;
485
1
        restored_storage.restore(&backup_destination.0).unwrap();
486

            
487
1
        let db = restored_storage.database::<Basic>("basic")?;
488
1
        let doc = Basic::get(&test_doc.id, &db)?.expect("Backed up document.not found");
489
1
        assert_eq!(doc.contents.value, "somevalue");
490
1
        assert_eq!(db.get_key("key1").into_u64()?, Some(1));
491
1
        assert_eq!(db.get_key("key2").into_u64()?, Some(2));
492
1
        assert_eq!(db.get_key("key3").into_u64()?, Some(3));
493

            
494
        // Calling restore again should generate an error.
495
1
        assert!(restored_storage.restore(&backup_destination.0).is_err());
496

            
497
1
        Ok(())
498
1
    }
499
}