1
use std::{
2
    fs::DirEntry,
3
    io::ErrorKind,
4
    path::{Path, PathBuf},
5
};
6

            
7
use bonsaidb_core::{
8
    admin,
9
    connection::{LowLevelConnection, Range, Sort, StorageConnection},
10
    document::DocumentId,
11
    schema::{Collection, Qualified, SchemaName},
12
    transaction::{Operation, Transaction},
13
    AnyError,
14
};
15

            
16
use crate::{
17
    database::{keyvalue::Entry, DatabaseNonBlocking},
18
    Database, Error, Storage,
19
};
20

            
21
/// A location to store and restore a database from.
22
pub trait BackupLocation: Send + Sync {
23
    /// The error type for the backup location.
24
    type Error: AnyError;
25

            
26
    /// Store `object` at `path` with `name`.
27
    fn store(
28
        &self,
29
        schema: &SchemaName,
30
        database_name: &str,
31
        container: &str,
32
        name: &str,
33
        object: &[u8],
34
    ) -> Result<(), Self::Error>;
35

            
36
    /// Lists all of the schemas stored in this backup location.
37
    fn list_schemas(&self) -> Result<Vec<SchemaName>, Self::Error>;
38

            
39
    /// List all of the names of the databases stored for `schema`.
40
    fn list_databases(&self, schema: &SchemaName) -> Result<Vec<String>, Self::Error>;
41

            
42
    /// List all stored named objects at `path`. The names should be the same that were provided when `store()` was called.
43
    fn list_stored(
44
        &self,
45
        schema: &SchemaName,
46
        database_name: &str,
47
        container: &str,
48
    ) -> Result<Vec<String>, Self::Error>;
49

            
50
    /// Load a previously stored object from `path` with `name`.
51
    fn load(
52
        &self,
53
        schema: &SchemaName,
54
        database_name: &str,
55
        container: &str,
56
        name: &str,
57
    ) -> Result<Vec<u8>, Self::Error>;
58
}
59

            
60
impl Storage {
61
    /// Stores a copy of all data in this instance to `location`.
62
31
    pub fn backup<L: AnyBackupLocation>(&self, location: &L) -> Result<(), Error> {
63
31
        let databases = {
64
31
            self.instance
65
31
                .data
66
31
                .available_databases
67
31
                .read()
68
31
                .keys()
69
31
                .cloned()
70
31
                .collect::<Vec<_>>()
71
        };
72

            
73
123
        for name in databases {
74
92
            let database = self
75
92
                .instance
76
92
                .database_without_schema(&name, Some(self), None)?;
77
92
            Self::backup_database(&database, location)?;
78
        }
79

            
80
31
        Ok(())
81
31
    }
82

            
83
    /// Restores all data from a previously stored backup `location`.
84
32
    pub fn restore<L: AnyBackupLocation>(&self, location: &L) -> Result<(), Error> {
85
94
        for schema in location
86
32
            .list_schemas()
87
32
            .map_err(|err| Error::Backup(Box::new(err)))?
88
        {
89
94
            for database in location
90
94
                .list_databases(&schema)
91
94
                .map_err(|err| Error::Backup(Box::new(err)))?
92
            {
93
                // The admin database is already going to be created by the process of creating a database.
94
94
                self.create_database_with_schema(&database, schema.clone(), true)?;
95

            
96
94
                let database =
97
94
                    self.instance
98
94
                        .database_without_schema(&database, Some(self), None)?;
99
94
                Self::restore_database(&database, location)?;
100
            }
101
        }
102

            
103
31
        Ok(())
104
32
    }
105

            
106
92
    pub(crate) fn backup_database(
107
92
        database: &Database,
108
92
        location: &dyn AnyBackupLocation,
109
92
    ) -> Result<(), Error> {
110
92
        let schema = database.schematic().name.clone();
111
215
        for collection in database.schematic().collections() {
112
215
            let documents = database.list_from_collection(
113
215
                Range::from(..),
114
215
                Sort::Ascending,
115
215
                None,
116
215
                &collection,
117
215
            )?;
118
215
            let collection_name = collection.encoded();
119
            // TODO consider how to best parallelize -- perhaps a location can opt into parallelization?
120
398
            for document in documents {
121
183
                location.store(
122
183
                    &schema,
123
183
                    database.name(),
124
183
                    &collection_name,
125
183
                    &document.header.id.to_string(),
126
183
                    &document.contents,
127
183
                )?;
128
            }
129
215
            for ((namespace, key), entry) in database.all_key_value_entries()? {
130
3
                let full_name = format!("{}._key._{}", namespace.as_deref().unwrap_or(""), key);
131
3
                location.store(
132
3
                    &schema,
133
3
                    database.name(),
134
3
                    "_kv",
135
3
                    &full_name,
136
3
                    &pot::to_vec(&entry)?,
137
                )?;
138
            }
139
        }
140
92
        Ok(())
141
92
    }
142

            
143
94
    pub(crate) fn restore_database(
144
94
        database: &Database,
145
94
        location: &dyn AnyBackupLocation,
146
94
    ) -> Result<(), Error> {
147
94
        let schema = database.schematic().name.clone();
148
94
        let mut transaction = Transaction::new();
149
94
        // Restore all the collections. However, there's one collection we don't
150
94
        // want to restore: the Databases list. This will be recreated during
151
94
        // the process of restoring the backup, so we skip it.
152
94
        let database_collection = admin::Database::collection_name();
153
188
        for collection in database
154
94
            .schematic()
155
94
            .collections()
156
94
            .into_iter()
157
220
            .filter(|c| c != &database_collection)
158
        {
159
188
            let collection_name = collection.encoded();
160
188
            for (id, id_string) in location
161
188
                .list_stored(&schema, database.name(), &collection_name)?
162
188
                .into_iter()
163
188
                .filter_map(|id_string| {
164
92
                    id_string
165
92
                        .parse::<DocumentId>()
166
92
                        .ok()
167
92
                        .map(|id| (id, id_string))
168
188
                })
169
92
            {
170
92
                let contents =
171
92
                    location.load(&schema, database.name(), &collection_name, &id_string)?;
172
92
                transaction.push(Operation::insert(collection.clone(), Some(id), contents));
173
            }
174
        }
175
94
        database.apply_transaction(transaction)?;
176

            
177
93
        for full_key in location.list_stored(&schema, database.name(), "_kv")? {
178
3
            if let Some((namespace, key)) = full_key.split_once("._key._") {
179
3
                let entry = location.load(&schema, database.name(), "_kv", &full_key)?;
180
3
                let entry = pot::from_slice::<Entry>(&entry)?;
181
3
                let namespace = if namespace.is_empty() {
182
3
                    None
183
                } else {
184
                    Some(namespace.to_string())
185
                };
186
3
                entry.restore(namespace, key.to_string(), database)?;
187
            }
188
        }
189

            
190
93
        Ok(())
191
94
    }
192
}
193

            
194
pub trait AnyBackupLocation: Send + Sync {
195
    fn store(
196
        &self,
197
        schema: &SchemaName,
198
        database_name: &str,
199
        container: &str,
200
        name: &str,
201
        object: &[u8],
202
    ) -> Result<(), Error>;
203

            
204
    fn list_schemas(&self) -> Result<Vec<SchemaName>, Error>;
205

            
206
    fn list_databases(&self, schema: &SchemaName) -> Result<Vec<String>, Error>;
207

            
208
    fn list_stored(
209
        &self,
210
        schema: &SchemaName,
211
        database_name: &str,
212
        container: &str,
213
    ) -> Result<Vec<String>, Error>;
214

            
215
    fn load(
216
        &self,
217
        schema: &SchemaName,
218
        database_name: &str,
219
        container: &str,
220
        name: &str,
221
    ) -> Result<Vec<u8>, Error>;
222
}
223

            
224
impl<L, E> AnyBackupLocation for L
225
where
226
    L: BackupLocation<Error = E>,
227
    E: AnyError,
228
{
229
186
    fn store(
230
186
        &self,
231
186
        schema: &SchemaName,
232
186
        database_name: &str,
233
186
        container: &str,
234
186
        name: &str,
235
186
        object: &[u8],
236
186
    ) -> Result<(), Error> {
237
186
        self.store(schema, database_name, container, name, object)
238
186
            .map_err(|err| Error::Backup(Box::new(err)))
239
186
    }
240

            
241
32
    fn list_schemas(&self) -> Result<Vec<SchemaName>, Error> {
242
32
        self.list_schemas()
243
32
            .map_err(|err| Error::Backup(Box::new(err)))
244
32
    }
245

            
246
94
    fn list_databases(&self, schema: &SchemaName) -> Result<Vec<String>, Error> {
247
94
        self.list_databases(schema)
248
94
            .map_err(|err| Error::Backup(Box::new(err)))
249
94
    }
250

            
251
281
    fn list_stored(
252
281
        &self,
253
281
        schema: &SchemaName,
254
281
        database_name: &str,
255
281
        container: &str,
256
281
    ) -> Result<Vec<String>, Error> {
257
281
        self.list_stored(schema, database_name, container)
258
281
            .map_err(|err| Error::Backup(Box::new(err)))
259
281
    }
260

            
261
95
    fn load(
262
95
        &self,
263
95
        schema: &SchemaName,
264
95
        database_name: &str,
265
95
        container: &str,
266
95
        name: &str,
267
95
    ) -> Result<Vec<u8>, Error> {
268
95
        self.load(schema, database_name, container, name)
269
95
            .map_err(|err| Error::Backup(Box::new(err)))
270
95
    }
271
}
272

            
273
impl BackupLocation for Path {
274
    type Error = std::io::Error;
275

            
276
186
    fn store(
277
186
        &self,
278
186
        schema: &SchemaName,
279
186
        database_name: &str,
280
186
        container: &str,
281
186
        name: &str,
282
186
        object: &[u8],
283
186
    ) -> Result<(), Self::Error> {
284
186
        let container_folder = container_folder(self, schema, database_name, container);
285
186
        std::fs::create_dir_all(&container_folder)?;
286
186
        std::fs::write(container_folder.join(name), object)?;
287

            
288
186
        Ok(())
289
186
    }
290

            
291
32
    fn list_schemas(&self) -> Result<Vec<SchemaName>, Self::Error> {
292
94
        iterate_directory(self, |entry, file_name| {
293
94
            if entry.file_type()?.is_dir() {
294
94
                if let Ok(schema_name) = SchemaName::parse_encoded(file_name.as_str()) {
295
94
                    return Ok(Some(schema_name));
296
                }
297
            }
298
            Ok(None)
299
94
        })
300
32
    }
301

            
302
94
    fn list_databases(&self, schema: &SchemaName) -> Result<Vec<String>, Self::Error> {
303
94
        iterate_directory(&schema_folder(self, schema), |entry, file_name| {
304
94
            if entry.file_type()?.is_dir() && file_name != "_kv" {
305
94
                return Ok(Some(file_name));
306
            }
307
            Ok(None)
308
94
        })
309
94
    }
310

            
311
281
    fn list_stored(
312
281
        &self,
313
281
        schema: &SchemaName,
314
281
        database_name: &str,
315
281
        container: &str,
316
281
    ) -> Result<Vec<String>, Self::Error> {
317
281
        iterate_directory(
318
281
            &container_folder(self, schema, database_name, container),
319
281
            |entry, file_name| {
320
95
                if entry.file_type()?.is_file() {
321
95
                    return Ok(Some(file_name));
322
                }
323
                Ok(None)
324
281
            },
325
281
        )
326
281
    }
327

            
328
95
    fn load(
329
95
        &self,
330
95
        schema: &SchemaName,
331
95
        database_name: &str,
332
95
        container: &str,
333
95
        name: &str,
334
95
    ) -> Result<Vec<u8>, Self::Error> {
335
95
        std::fs::read(container_folder(self, schema, database_name, container).join(name))
336
95
    }
337
}
338

            
339
impl BackupLocation for PathBuf {
340
    type Error = std::io::Error;
341

            
342
186
    fn store(
343
186
        &self,
344
186
        schema: &SchemaName,
345
186
        database_name: &str,
346
186
        container: &str,
347
186
        name: &str,
348
186
        object: &[u8],
349
186
    ) -> Result<(), Self::Error> {
350
186
        BackupLocation::store(
351
186
            self.as_path(),
352
186
            schema,
353
186
            database_name,
354
186
            container,
355
186
            name,
356
186
            object,
357
186
        )
358
186
    }
359

            
360
32
    fn list_schemas(&self) -> Result<Vec<SchemaName>, Self::Error> {
361
32
        BackupLocation::list_schemas(self.as_path())
362
32
    }
363

            
364
94
    fn list_databases(&self, schema: &SchemaName) -> Result<Vec<String>, Self::Error> {
365
94
        BackupLocation::list_databases(self.as_path(), schema)
366
94
    }
367

            
368
281
    fn list_stored(
369
281
        &self,
370
281
        schema: &SchemaName,
371
281
        database_name: &str,
372
281
        container: &str,
373
281
    ) -> Result<Vec<String>, Self::Error> {
374
281
        BackupLocation::list_stored(self.as_path(), schema, database_name, container)
375
281
    }
376

            
377
95
    fn load(
378
95
        &self,
379
95
        schema: &SchemaName,
380
95
        database_name: &str,
381
95
        container: &str,
382
95
        name: &str,
383
95
    ) -> Result<Vec<u8>, Self::Error> {
384
95
        BackupLocation::load(self.as_path(), schema, database_name, container, name)
385
95
    }
386
}
387

            
388
407
fn iterate_directory<T, F: FnMut(DirEntry, String) -> Result<Option<T>, std::io::Error>>(
389
407
    path: &Path,
390
407
    mut callback: F,
391
407
) -> Result<Vec<T>, std::io::Error> {
392
407
    let mut collected = Vec::new();
393
407
    let mut directories = if let Some(directories) = std::fs::read_dir(path).ignore_not_found()? {
394
189
        directories
395
    } else {
396
218
        return Ok(collected);
397
    };
398

            
399
472
    while let Some(entry) = directories
400
472
        .next()
401
472
        .map(IoResultExt::ignore_not_found)
402
472
        .transpose()?
403
472
        .flatten()
404
    {
405
283
        if let Ok(file_name) = entry.file_name().into_string() {
406
283
            if let Some(result) = callback(entry, file_name)? {
407
283
                collected.push(result);
408
283
            }
409
        }
410
    }
411

            
412
189
    Ok(collected)
413
407
}
414

            
415
trait IoResultExt<T>: Sized {
416
    fn ignore_not_found(self) -> Result<Option<T>, std::io::Error>;
417
}
418

            
419
impl<T> IoResultExt<T> for Result<T, std::io::Error> {
420
690
    fn ignore_not_found(self) -> Result<Option<T>, std::io::Error> {
421
690
        match self {
422
472
            Ok(value) => Ok(Some(value)),
423
218
            Err(err) => {
424
218
                if err.kind() == ErrorKind::NotFound {
425
218
                    Ok(None)
426
                } else {
427
                    Err(err)
428
                }
429
            }
430
        }
431
690
    }
432
}
433

            
434
656
fn schema_folder(base: &Path, schema: &SchemaName) -> PathBuf {
435
656
    base.join(schema.encoded())
436
656
}
437

            
438
562
fn database_folder(base: &Path, schema: &SchemaName, database_name: &str) -> PathBuf {
439
562
    schema_folder(base, schema).join(database_name)
440
562
}
441

            
442
562
fn container_folder(
443
562
    base: &Path,
444
562
    schema: &SchemaName,
445
562
    database_name: &str,
446
562
    container: &str,
447
562
) -> PathBuf {
448
562
    database_folder(base, schema, database_name).join(container)
449
562
}
450

            
451
#[cfg(test)]
452
mod tests {
453
    use bonsaidb_core::{
454
        connection::{Connection as _, StorageConnection as _},
455
        keyvalue::KeyValue,
456
        schema::SerializedCollection,
457
        test_util::{Basic, TestDirectory},
458
    };
459

            
460
    use crate::{
461
        config::{Builder, KeyValuePersistence, PersistenceThreshold, StorageConfiguration},
462
        Storage,
463
    };
464

            
465
1
    #[test]
466
1
    fn backup_restore() -> anyhow::Result<()> {
467
1
        let backup_destination = TestDirectory::new("backup-restore.bonsaidb.backup");
468

            
469
        // First, create a database that we'll be restoring. `TestDirectory`
470
        // will automatically erase the database when it drops out of scope,
471
        // which is why we're creating a nested scope here.
472
1
        let test_doc = {
473
1
            let database_directory = TestDirectory::new("backup-restore.bonsaidb");
474
1
            let storage = Storage::open(
475
1
                StorageConfiguration::new(&database_directory)
476
1
                    .key_value_persistence(KeyValuePersistence::lazy([
477
1
                        PersistenceThreshold::after_changes(2),
478
1
                    ]))
479
1
                    .with_schema::<Basic>()?,
480
            )?;
481

            
482
1
            let db = storage.create_database::<Basic>("basic", false)?;
483
1
            let test_doc = db.collection::<Basic>().push(&Basic::new("somevalue"))?;
484
1
            db.set_numeric_key("key1", 1_u64).execute()?;
485
1
            db.set_numeric_key("key2", 2_u64).execute()?;
486
            // This key will not be persisted right away.
487
1
            db.set_numeric_key("key3", 3_u64).execute()?;
488

            
489
1
            storage.backup(&backup_destination.0).unwrap();
490
1

            
491
1
            test_doc
492
1
        };
493
1

            
494
1
        // `backup_destination` now contains an export of the database, time to try loading it:
495
1
        let database_directory = TestDirectory::new("backup-restore.bonsaidb");
496
1
        let restored_storage =
497
1
            Storage::open(StorageConfiguration::new(&database_directory).with_schema::<Basic>()?)?;
498
1
        restored_storage.restore(&backup_destination.0).unwrap();
499

            
500
1
        let db = restored_storage.database::<Basic>("basic")?;
501
1
        let doc = Basic::get(test_doc.id, &db)?.expect("Backed up document.not found");
502
1
        assert_eq!(doc.contents.value, "somevalue");
503
1
        assert_eq!(db.get_key("key1").into_u64()?, Some(1));
504
1
        assert_eq!(db.get_key("key2").into_u64()?, Some(2));
505
1
        assert_eq!(db.get_key("key3").into_u64()?, Some(3));
506

            
507
        // Calling restore again should generate an error.
508
1
        assert!(restored_storage.restore(&backup_destination.0).is_err());
509

            
510
1
        Ok(())
511
1
    }
512
}