1
use std::fs::DirEntry;
2
use std::io::ErrorKind;
3
use std::path::{Path, PathBuf};
4

            
5
use bonsaidb_core::connection::{LowLevelConnection, Range, Sort, StorageConnection};
6
use bonsaidb_core::document::DocumentId;
7
use bonsaidb_core::schema::{Collection, Qualified, SchemaName};
8
use bonsaidb_core::transaction::{Operation, Transaction};
9
use bonsaidb_core::{admin, AnyError};
10

            
11
use crate::database::keyvalue::Entry;
12
use crate::database::DatabaseNonBlocking;
13
use crate::{Database, Error, Storage};
14

            
15
/// A location to store and restore a database from.
16
pub trait BackupLocation: Send + Sync {
17
    /// The error type for the backup location.
18
    type Error: AnyError;
19

            
20
    /// Store `object` at `path` with `name`.
21
    fn store(
22
        &self,
23
        schema: &SchemaName,
24
        database_name: &str,
25
        container: &str,
26
        name: &str,
27
        object: &[u8],
28
    ) -> Result<(), Self::Error>;
29

            
30
    /// Lists all of the schemas stored in this backup location.
31
    fn list_schemas(&self) -> Result<Vec<SchemaName>, Self::Error>;
32

            
33
    /// List all of the names of the databases stored for `schema`.
34
    fn list_databases(&self, schema: &SchemaName) -> Result<Vec<String>, Self::Error>;
35

            
36
    /// List all stored named objects at `path`. The names should be the same that were provided when `store()` was called.
37
    fn list_stored(
38
        &self,
39
        schema: &SchemaName,
40
        database_name: &str,
41
        container: &str,
42
    ) -> Result<Vec<String>, Self::Error>;
43

            
44
    /// Load a previously stored object from `path` with `name`.
45
    fn load(
46
        &self,
47
        schema: &SchemaName,
48
        database_name: &str,
49
        container: &str,
50
        name: &str,
51
    ) -> Result<Vec<u8>, Self::Error>;
52
}
53

            
54
impl Storage {
55
    /// Stores a copy of all data in this instance to `location`.
56
37
    pub fn backup<L: AnyBackupLocation>(&self, location: &L) -> Result<(), Error> {
57
37
        let databases = {
58
37
            self.instance
59
37
                .data
60
37
                .available_databases
61
37
                .read()
62
37
                .keys()
63
37
                .cloned()
64
37
                .collect::<Vec<_>>()
65
        };
66

            
67
147
        for name in databases {
68
110
            let database = self
69
110
                .instance
70
110
                .database_without_schema(&name, Some(self), None)?;
71
110
            Self::backup_database(&database, location)?;
72
        }
73

            
74
37
        Ok(())
75
37
    }
76

            
77
    /// Restores all data from a previously stored backup `location`.
78
38
    pub fn restore<L: AnyBackupLocation>(&self, location: &L) -> Result<(), Error> {
79
111
        for schema in location
80
38
            .list_schemas()
81
38
            .map_err(|err| Error::Backup(Box::new(err)))?
82
        {
83
111
            for database in location
84
111
                .list_databases(&schema)
85
111
                .map_err(|err| Error::Backup(Box::new(err)))?
86
            {
87
                // The admin database is already going to be created by the process of creating a database.
88
111
                self.create_database_with_schema(&database, schema.clone(), true)?;
89

            
90
111
                let database =
91
111
                    self.instance
92
111
                        .database_without_schema(&database, Some(self), None)?;
93
111
                Self::restore_database(&database, location)?;
94
            }
95
        }
96

            
97
37
        Ok(())
98
38
    }
99

            
100
110
    pub(crate) fn backup_database(
101
110
        database: &Database,
102
110
        location: &dyn AnyBackupLocation,
103
110
    ) -> Result<(), Error> {
104
110
        let schema = database.schematic().name.clone();
105
294
        for collection in database.schematic().collections() {
106
294
            let documents = database.list_from_collection(
107
294
                Range::from(..),
108
294
                Sort::Ascending,
109
294
                None,
110
294
                collection,
111
294
            )?;
112
294
            let collection_name = collection.encoded();
113
            // TODO consider how to best parallelize -- perhaps a location can opt into parallelization?
114
513
            for document in documents {
115
219
                location.store(
116
219
                    &schema,
117
219
                    database.name(),
118
219
                    &collection_name,
119
219
                    &document.header.id.to_string(),
120
219
                    &document.contents,
121
219
                )?;
122
            }
123
294
            for ((namespace, key), entry) in database.all_key_value_entries()? {
124
3
                let full_name = format!("{}._key._{key}", namespace.as_deref().unwrap_or(""));
125
3
                location.store(
126
3
                    &schema,
127
3
                    database.name(),
128
3
                    "_kv",
129
3
                    &full_name,
130
3
                    &pot::to_vec(&entry)?,
131
                )?;
132
            }
133
        }
134
110
        Ok(())
135
110
    }
136

            
137
111
    pub(crate) fn restore_database(
138
111
        database: &Database,
139
111
        location: &dyn AnyBackupLocation,
140
111
    ) -> Result<(), Error> {
141
111
        let schema = database.schematic().name.clone();
142
111
        let mut transaction = Transaction::new();
143
111
        // Restore all the collections. However, there's one collection we don't
144
111
        // want to restore: the Databases list. This will be recreated during
145
111
        // the process of restoring the backup, so we skip it.
146
111
        let database_collection = admin::Database::collection_name();
147
258
        for collection in database
148
111
            .schematic()
149
111
            .collections()
150
295
            .filter(|c| *c != &database_collection)
151
        {
152
258
            let collection_name = collection.encoded();
153
258
            for (id, id_string) in location
154
258
                .list_stored(&schema, database.name(), &collection_name)?
155
258
                .into_iter()
156
258
                .filter_map(|id_string| {
157
110
                    id_string
158
110
                        .parse::<DocumentId>()
159
110
                        .ok()
160
110
                        .map(|id| (id, id_string))
161
258
                })
162
110
            {
163
110
                let contents =
164
110
                    location.load(&schema, database.name(), &collection_name, &id_string)?;
165
110
                transaction.push(Operation::insert(collection.clone(), Some(id), contents));
166
            }
167
        }
168
111
        database.apply_transaction(transaction)?;
169

            
170
111
        for full_key in location.list_stored(&schema, database.name(), "_kv")? {
171
3
            if let Some((namespace, key)) = full_key.split_once("._key._") {
172
3
                let entry = location.load(&schema, database.name(), "_kv", &full_key)?;
173
3
                let entry = pot::from_slice::<Entry>(&entry)?;
174
3
                let namespace = if namespace.is_empty() {
175
3
                    None
176
                } else {
177
                    Some(namespace.to_string())
178
                };
179
3
                entry.restore(namespace, key.to_string(), database)?;
180
            }
181
        }
182

            
183
110
        Ok(())
184
111
    }
185
}
186

            
187
pub trait AnyBackupLocation: Send + Sync {
188
    fn store(
189
        &self,
190
        schema: &SchemaName,
191
        database_name: &str,
192
        container: &str,
193
        name: &str,
194
        object: &[u8],
195
    ) -> Result<(), Error>;
196

            
197
    fn list_schemas(&self) -> Result<Vec<SchemaName>, Error>;
198

            
199
    fn list_databases(&self, schema: &SchemaName) -> Result<Vec<String>, Error>;
200

            
201
    fn list_stored(
202
        &self,
203
        schema: &SchemaName,
204
        database_name: &str,
205
        container: &str,
206
    ) -> Result<Vec<String>, Error>;
207

            
208
    fn load(
209
        &self,
210
        schema: &SchemaName,
211
        database_name: &str,
212
        container: &str,
213
        name: &str,
214
    ) -> Result<Vec<u8>, Error>;
215
}
216

            
217
impl<L, E> AnyBackupLocation for L
218
where
219
    L: BackupLocation<Error = E>,
220
    E: AnyError,
221
{
222
222
    fn store(
223
222
        &self,
224
222
        schema: &SchemaName,
225
222
        database_name: &str,
226
222
        container: &str,
227
222
        name: &str,
228
222
        object: &[u8],
229
222
    ) -> Result<(), Error> {
230
222
        self.store(schema, database_name, container, name, object)
231
222
            .map_err(|err| Error::Backup(Box::new(err)))
232
222
    }
233

            
234
38
    fn list_schemas(&self) -> Result<Vec<SchemaName>, Error> {
235
38
        self.list_schemas()
236
38
            .map_err(|err| Error::Backup(Box::new(err)))
237
38
    }
238

            
239
111
    fn list_databases(&self, schema: &SchemaName) -> Result<Vec<String>, Error> {
240
111
        self.list_databases(schema)
241
111
            .map_err(|err| Error::Backup(Box::new(err)))
242
111
    }
243

            
244
368
    fn list_stored(
245
368
        &self,
246
368
        schema: &SchemaName,
247
368
        database_name: &str,
248
368
        container: &str,
249
368
    ) -> Result<Vec<String>, Error> {
250
368
        self.list_stored(schema, database_name, container)
251
368
            .map_err(|err| Error::Backup(Box::new(err)))
252
368
    }
253

            
254
113
    fn load(
255
113
        &self,
256
113
        schema: &SchemaName,
257
113
        database_name: &str,
258
113
        container: &str,
259
113
        name: &str,
260
113
    ) -> Result<Vec<u8>, Error> {
261
113
        self.load(schema, database_name, container, name)
262
113
            .map_err(|err| Error::Backup(Box::new(err)))
263
113
    }
264
}
265

            
266
impl BackupLocation for Path {
267
    type Error = std::io::Error;
268

            
269
222
    fn store(
270
222
        &self,
271
222
        schema: &SchemaName,
272
222
        database_name: &str,
273
222
        container: &str,
274
222
        name: &str,
275
222
        object: &[u8],
276
222
    ) -> Result<(), Self::Error> {
277
222
        let container_folder = container_folder(self, schema, database_name, container);
278
222
        std::fs::create_dir_all(&container_folder)?;
279
222
        std::fs::write(container_folder.join(name), object)?;
280

            
281
222
        Ok(())
282
222
    }
283

            
284
38
    fn list_schemas(&self) -> Result<Vec<SchemaName>, Self::Error> {
285
112
        iterate_directory(self, |entry, file_name| {
286
112
            if entry.file_type()?.is_dir() {
287
112
                if let Ok(schema_name) = SchemaName::parse_encoded(file_name.as_str()) {
288
112
                    return Ok(Some(schema_name));
289
                }
290
            }
291
            Ok(None)
292
112
        })
293
38
    }
294

            
295
111
    fn list_databases(&self, schema: &SchemaName) -> Result<Vec<String>, Self::Error> {
296
111
        iterate_directory(&schema_folder(self, schema), |entry, file_name| {
297
111
            if entry.file_type()?.is_dir() && file_name != "_kv" {
298
111
                return Ok(Some(file_name));
299
            }
300
            Ok(None)
301
111
        })
302
111
    }
303

            
304
368
    fn list_stored(
305
368
        &self,
306
368
        schema: &SchemaName,
307
368
        database_name: &str,
308
368
        container: &str,
309
368
    ) -> Result<Vec<String>, Self::Error> {
310
368
        iterate_directory(
311
368
            &container_folder(self, schema, database_name, container),
312
368
            |entry, file_name| {
313
113
                if entry.file_type()?.is_file() {
314
113
                    return Ok(Some(file_name));
315
                }
316
                Ok(None)
317
368
            },
318
368
        )
319
368
    }
320

            
321
113
    fn load(
322
113
        &self,
323
113
        schema: &SchemaName,
324
113
        database_name: &str,
325
113
        container: &str,
326
113
        name: &str,
327
113
    ) -> Result<Vec<u8>, Self::Error> {
328
113
        std::fs::read(container_folder(self, schema, database_name, container).join(name))
329
113
    }
330
}
331

            
332
impl BackupLocation for PathBuf {
333
    type Error = std::io::Error;
334

            
335
222
    fn store(
336
222
        &self,
337
222
        schema: &SchemaName,
338
222
        database_name: &str,
339
222
        container: &str,
340
222
        name: &str,
341
222
        object: &[u8],
342
222
    ) -> Result<(), Self::Error> {
343
222
        BackupLocation::store(
344
222
            self.as_path(),
345
222
            schema,
346
222
            database_name,
347
222
            container,
348
222
            name,
349
222
            object,
350
222
        )
351
222
    }
352

            
353
38
    fn list_schemas(&self) -> Result<Vec<SchemaName>, Self::Error> {
354
38
        BackupLocation::list_schemas(self.as_path())
355
38
    }
356

            
357
111
    fn list_databases(&self, schema: &SchemaName) -> Result<Vec<String>, Self::Error> {
358
111
        BackupLocation::list_databases(self.as_path(), schema)
359
111
    }
360

            
361
368
    fn list_stored(
362
368
        &self,
363
368
        schema: &SchemaName,
364
368
        database_name: &str,
365
368
        container: &str,
366
368
    ) -> Result<Vec<String>, Self::Error> {
367
368
        BackupLocation::list_stored(self.as_path(), schema, database_name, container)
368
368
    }
369

            
370
113
    fn load(
371
113
        &self,
372
113
        schema: &SchemaName,
373
113
        database_name: &str,
374
113
        container: &str,
375
113
        name: &str,
376
113
    ) -> Result<Vec<u8>, Self::Error> {
377
113
        BackupLocation::load(self.as_path(), schema, database_name, container, name)
378
113
    }
379
}
380

            
381
517
fn iterate_directory<T, F: FnMut(DirEntry, String) -> Result<Option<T>, std::io::Error>>(
382
517
    path: &Path,
383
517
    mut callback: F,
384
517
) -> Result<Vec<T>, std::io::Error> {
385
517
    let mut collected = Vec::new();
386
517
    let Some(mut directories) = std::fs::read_dir(path).ignore_not_found()? else {
387
293
        return Ok(collected);
388
    };
389

            
390
560
    while let Some(entry) = directories
391
560
        .next()
392
560
        .map(IoResultExt::ignore_not_found)
393
560
        .transpose()?
394
560
        .flatten()
395
    {
396
336
        if let Ok(file_name) = entry.file_name().into_string() {
397
336
            if let Some(result) = callback(entry, file_name)? {
398
336
                collected.push(result);
399
336
            }
400
        }
401
    }
402

            
403
224
    Ok(collected)
404
517
}
405

            
406
trait IoResultExt<T>: Sized {
407
    fn ignore_not_found(self) -> Result<Option<T>, std::io::Error>;
408
}
409

            
410
impl<T> IoResultExt<T> for Result<T, std::io::Error> {
411
853
    fn ignore_not_found(self) -> Result<Option<T>, std::io::Error> {
412
853
        match self {
413
560
            Ok(value) => Ok(Some(value)),
414
293
            Err(err) => {
415
293
                if err.kind() == ErrorKind::NotFound {
416
293
                    Ok(None)
417
                } else {
418
                    Err(err)
419
                }
420
            }
421
        }
422
853
    }
423
}
424

            
425
814
fn schema_folder(base: &Path, schema: &SchemaName) -> PathBuf {
426
814
    base.join(schema.encoded())
427
814
}
428

            
429
703
fn database_folder(base: &Path, schema: &SchemaName, database_name: &str) -> PathBuf {
430
703
    schema_folder(base, schema).join(database_name)
431
703
}
432

            
433
703
fn container_folder(
434
703
    base: &Path,
435
703
    schema: &SchemaName,
436
703
    database_name: &str,
437
703
    container: &str,
438
703
) -> PathBuf {
439
703
    database_folder(base, schema, database_name).join(container)
440
703
}
441

            
442
#[cfg(test)]
443
mod tests {
444
    use bonsaidb_core::connection::{Connection as _, StorageConnection as _};
445
    use bonsaidb_core::keyvalue::KeyValue;
446
    use bonsaidb_core::schema::SerializedCollection;
447
    use bonsaidb_core::test_util::{Basic, TestDirectory};
448

            
449
    use crate::config::{Builder, KeyValuePersistence, PersistenceThreshold, StorageConfiguration};
450
    use crate::Storage;
451

            
452
1
    #[test]
453
1
    fn backup_restore() -> anyhow::Result<()> {
454
1
        let backup_destination = TestDirectory::new("backup-restore.bonsaidb.backup");
455

            
456
        // First, create a database that we'll be restoring. `TestDirectory`
457
        // will automatically erase the database when it drops out of scope,
458
        // which is why we're creating a nested scope here.
459
1
        let test_doc = {
460
1
            let database_directory = TestDirectory::new("backup-restore.bonsaidb");
461
1
            let storage = Storage::open(
462
1
                StorageConfiguration::new(&database_directory)
463
1
                    .key_value_persistence(KeyValuePersistence::lazy([
464
1
                        PersistenceThreshold::after_changes(2),
465
1
                    ]))
466
1
                    .with_schema::<Basic>()?,
467
            )?;
468

            
469
1
            let db = storage.create_database::<Basic>("basic", false)?;
470
1
            let test_doc = db.collection::<Basic>().push(&Basic::new("somevalue"))?;
471
1
            db.set_numeric_key("key1", 1_u64).execute()?;
472
1
            db.set_numeric_key("key2", 2_u64).execute()?;
473
            // This key will not be persisted right away.
474
1
            db.set_numeric_key("key3", 3_u64).execute()?;
475

            
476
1
            storage.backup(&backup_destination.0).unwrap();
477
1

            
478
1
            test_doc
479
1
        };
480
1

            
481
1
        // `backup_destination` now contains an export of the database, time to try loading it:
482
1
        let database_directory = TestDirectory::new("backup-restore.bonsaidb");
483
1
        let restored_storage =
484
1
            Storage::open(StorageConfiguration::new(&database_directory).with_schema::<Basic>()?)?;
485
1
        restored_storage.restore(&backup_destination.0).unwrap();
486

            
487
1
        let db = restored_storage.database::<Basic>("basic")?;
488
1
        let doc = Basic::get(&test_doc.id, &db)?.expect("Backed up document.not found");
489
1
        assert_eq!(doc.contents.value, "somevalue");
490
1
        assert_eq!(db.get_key("key1").into_u64()?, Some(1));
491
1
        assert_eq!(db.get_key("key2").into_u64()?, Some(2));
492
1
        assert_eq!(db.get_key("key3").into_u64()?, Some(3));
493

            
494
        // Calling restore again should generate an error.
495
1
        assert!(restored_storage.restore(&backup_destination.0).is_err());
496

            
497
1
        Ok(())
498
1
    }
499
}