1
use std::{
2
    io::ErrorKind,
3
    path::{Path, PathBuf},
4
};
5

            
6
use async_trait::async_trait;
7
use bonsaidb_core::{
8
    admin,
9
    connection::{Connection, Range, Sort, StorageConnection},
10
    document::DocumentId,
11
    schema::{Collection, SchemaName},
12
    transaction::{Operation, Transaction},
13
    AnyError,
14
};
15
use futures::Future;
16
use tokio::fs::DirEntry;
17

            
18
use crate::{database::keyvalue::Entry, Database, Error, Storage};
19

            
20
/// A location to store and restore a database from.
21
#[async_trait]
22
pub trait BackupLocation: Send + Sync {
23
    /// The error type for the backup location.
24
    type Error: AnyError;
25

            
26
    /// Store `object` at `path` with `name`.
27
    async fn store(
28
        &self,
29
        schema: &SchemaName,
30
        database_name: &str,
31
        container: &str,
32
        name: &str,
33
        object: &[u8],
34
    ) -> Result<(), Self::Error>;
35

            
36
    /// Lists all of the schemas stored in this backup location.
37
    async fn list_schemas(&self) -> Result<Vec<SchemaName>, Self::Error>;
38

            
39
    /// List all of the names of the databases stored for `schema`.
40
    async fn list_databases(&self, schema: &SchemaName) -> Result<Vec<String>, Self::Error>;
41

            
42
    /// List all stored named objects at `path`. The names should be the same that were provided when `store()` was called.
43
    async fn list_stored(
44
        &self,
45
        schema: &SchemaName,
46
        database_name: &str,
47
        container: &str,
48
    ) -> Result<Vec<String>, Self::Error>;
49

            
50
    /// Load a previously stored object from `path` with `name`.
51
    async fn load(
52
        &self,
53
        schema: &SchemaName,
54
        database_name: &str,
55
        container: &str,
56
        name: &str,
57
    ) -> Result<Vec<u8>, Self::Error>;
58
}
59

            
60
impl Storage {
61
    /// Stores a copy of all data in this instance to `location`.
62
2
    pub async fn backup<L: AnyBackupLocation>(&self, location: L) -> Result<(), Error> {
63
2
        let databases = {
64
2
            self.data
65
2
                .available_databases
66
2
                .read()
67
                .await
68
2
                .keys()
69
2
                .cloned()
70
2
                .collect::<Vec<_>>()
71
        };
72

            
73
7
        for name in databases {
74
5
            let database = self.database_without_schema(&name).await?;
75
49
            self.backup_database(&database, &location).await?;
76
        }
77

            
78
2
        Ok(())
79
2
    }
80

            
81
    /// Stores a copy of all data in this instance to `location`.
82
3
    pub async fn restore<L: AnyBackupLocation>(&self, location: L) -> Result<(), Error> {
83
6
        for schema in location
84
20
            .list_schemas()
85
20
            .await
86
3
            .map_err(|err| Error::Backup(Box::new(err)))?
87
        {
88
6
            for database in location
89
24
                .list_databases(&schema)
90
24
                .await
91
6
                .map_err(|err| Error::Backup(Box::new(err)))?
92
            {
93
                // The admin database is already going to be created by the process of creating a database.
94
7
                self.create_database_with_schema(&database, schema.clone(), true)
95
6
                    .await?;
96

            
97
6
                let database = self.database_without_schema(&database).await?;
98
52
                self.restore_database(&database, &location).await?;
99
            }
100
        }
101

            
102
2
        Ok(())
103
3
    }
104

            
105
77
    pub(crate) async fn backup_database(
106
77
        &self,
107
77
        database: &Database,
108
77
        location: &dyn AnyBackupLocation,
109
77
    ) -> Result<(), Error> {
110
5
        let schema = database.schematic().name.clone();
111
12
        for collection in database.schematic().collections() {
112
12
            let documents = database
113
12
                .list(Range::from(..), Sort::Ascending, None, &collection)
114
12
                .await?;
115
12
            let collection_name = collection.encoded();
116
            // TODO consider how to best parallelize -- perhaps a location can opt into parallelization?
117
21
            for document in documents {
118
9
                location
119
9
                    .store(
120
9
                        &schema,
121
9
                        database.name(),
122
9
                        &collection_name,
123
9
                        &document.header.id.to_string(),
124
9
                        &document.contents,
125
18
                    )
126
18
                    .await?;
127
            }
128
13
            for ((namespace, key), entry) in database.all_key_value_entries().await? {
129
3
                let full_name = format!("{}._key._{}", namespace.as_deref().unwrap_or(""), key);
130
3
                location
131
3
                    .store(
132
3
                        &schema,
133
3
                        database.name(),
134
3
                        "_kv",
135
3
                        &full_name,
136
3
                        &pot::to_vec(&entry)?,
137
6
                    )
138
6
                    .await?;
139
            }
140
        }
141
5
        Ok(())
142
5
    }
143

            
144
78
    pub(crate) async fn restore_database(
145
78
        &self,
146
78
        database: &Database,
147
78
        location: &dyn AnyBackupLocation,
148
78
    ) -> Result<(), Error> {
149
6
        let schema = database.schematic().name.clone();
150
6
        let mut transaction = Transaction::new();
151
6
        // Restore all the collections. However, there's one collection we don't
152
6
        // want to restore: the Databases list. This will be recreated during
153
6
        // the process of restoring the backup, so we skip it.
154
6
        let database_collection = admin::Database::collection_name();
155
11
        for collection in database
156
6
            .schematic()
157
6
            .collections()
158
6
            .into_iter()
159
13
            .filter(|c| c != &database_collection)
160
        {
161
11
            let collection_name = collection.encoded();
162
11
            for (id, id_string) in location
163
25
                .list_stored(&schema, database.name(), &collection_name)
164
25
                .await?
165
11
                .into_iter()
166
11
                .filter_map(|id_string| {
167
5
                    id_string
168
5
                        .parse::<DocumentId>()
169
5
                        .ok()
170
5
                        .map(|id| (id, id_string))
171
11
                })
172
5
            {
173
5
                let contents = location
174
5
                    .load(&schema, database.name(), &collection_name, &id_string)
175
5
                    .await?;
176
5
                transaction.push(Operation::insert(collection.clone(), Some(id), contents));
177
            }
178
        }
179
7
        database.apply_transaction(transaction).await?;
180

            
181
6
        for full_key in location
182
12
            .list_stored(&schema, database.name(), "_kv")
183
12
            .await?
184
        {
185
3
            if let Some((namespace, key)) = full_key.split_once("._key._") {
186
3
                let entry = location
187
3
                    .load(&schema, database.name(), "_kv", &full_key)
188
3
                    .await?;
189
3
                let entry = pot::from_slice::<Entry>(&entry)?;
190
3
                let namespace = if namespace.is_empty() {
191
3
                    None
192
                } else {
193
                    Some(namespace.to_string())
194
                };
195
3
                entry.restore(namespace, key.to_string(), database).await?;
196
            }
197
        }
198

            
199
5
        Ok(())
200
6
    }
201
}
202

            
203
#[async_trait]
204
pub trait AnyBackupLocation: Send + Sync {
205
    async fn store(
206
        &self,
207
        schema: &SchemaName,
208
        database_name: &str,
209
        container: &str,
210
        name: &str,
211
        object: &[u8],
212
    ) -> Result<(), Error>;
213

            
214
    async fn list_schemas(&self) -> Result<Vec<SchemaName>, Error>;
215

            
216
    async fn list_databases(&self, schema: &SchemaName) -> Result<Vec<String>, Error>;
217

            
218
    async fn list_stored(
219
        &self,
220
        schema: &SchemaName,
221
        database_name: &str,
222
        container: &str,
223
    ) -> Result<Vec<String>, Error>;
224

            
225
    async fn load(
226
        &self,
227
        schema: &SchemaName,
228
        database_name: &str,
229
        container: &str,
230
        name: &str,
231
    ) -> Result<Vec<u8>, Error>;
232
}
233

            
234
#[async_trait]
235
impl<L, E> AnyBackupLocation for L
236
where
237
    L: BackupLocation<Error = E>,
238
    E: AnyError,
239
{
240
12
    async fn store(
241
12
        &self,
242
12
        schema: &SchemaName,
243
12
        database_name: &str,
244
12
        container: &str,
245
12
        name: &str,
246
12
        object: &[u8],
247
12
    ) -> Result<(), Error> {
248
24
        self.store(schema, database_name, container, name, object)
249
24
            .await
250
12
            .map_err(|err| Error::Backup(Box::new(err)))
251
24
    }
252

            
253
3
    async fn list_schemas(&self) -> Result<Vec<SchemaName>, Error> {
254
20
        self.list_schemas()
255
20
            .await
256
3
            .map_err(|err| Error::Backup(Box::new(err)))
257
6
    }
258

            
259
6
    async fn list_databases(&self, schema: &SchemaName) -> Result<Vec<String>, Error> {
260
24
        self.list_databases(schema)
261
24
            .await
262
6
            .map_err(|err| Error::Backup(Box::new(err)))
263
12
    }
264

            
265
16
    async fn list_stored(
266
16
        &self,
267
16
        schema: &SchemaName,
268
16
        database_name: &str,
269
16
        container: &str,
270
16
    ) -> Result<Vec<String>, Error> {
271
37
        self.list_stored(schema, database_name, container)
272
37
            .await
273
16
            .map_err(|err| Error::Backup(Box::new(err)))
274
32
    }
275

            
276
8
    async fn load(
277
8
        &self,
278
8
        schema: &SchemaName,
279
8
        database_name: &str,
280
8
        container: &str,
281
8
        name: &str,
282
8
    ) -> Result<Vec<u8>, Error> {
283
8
        self.load(schema, database_name, container, name)
284
8
            .await
285
8
            .map_err(|err| Error::Backup(Box::new(err)))
286
16
    }
287
}
288

            
289
#[async_trait]
290
impl<'a> BackupLocation for &'a Path {
291
    type Error = std::io::Error;
292

            
293
156
    async fn store(
294
156
        &self,
295
156
        schema: &SchemaName,
296
156
        database_name: &str,
297
156
        container: &str,
298
156
        name: &str,
299
156
        object: &[u8],
300
156
    ) -> Result<(), Self::Error> {
301
156
        let container_folder = container_folder(self, schema, database_name, container);
302
156
        tokio::fs::create_dir_all(&container_folder).await?;
303
156
        tokio::fs::write(container_folder.join(name), object).await?;
304

            
305
156
        Ok(())
306
312
    }
307

            
308
27
    async fn list_schemas(&self) -> Result<Vec<SchemaName>, Self::Error> {
309
79
        iterate_directory(self, |entry, file_name| async move {
310
79
            if entry.file_type().await?.is_dir() {
311
79
                if let Ok(schema_name) = SchemaName::parse_encoded(file_name.as_str()) {
312
79
                    return Ok(Some(schema_name));
313
                }
314
            }
315
            Ok(None)
316
212
        })
317
212
        .await
318
54
    }
319

            
320
78
    async fn list_databases(&self, schema: &SchemaName) -> Result<Vec<String>, Self::Error> {
321
78
        iterate_directory(
322
78
            &schema_folder(self, schema),
323
78
            |entry, file_name| async move {
324
78
                if entry.file_type().await?.is_dir() && file_name != "_kv" {
325
78
                    return Ok(Some(file_name));
326
                }
327
                Ok(None)
328
78
            },
329
312
        )
330
312
        .await
331
156
    }
332

            
333
232
    async fn list_stored(
334
232
        &self,
335
232
        schema: &SchemaName,
336
232
        database_name: &str,
337
232
        container: &str,
338
232
    ) -> Result<Vec<String>, Self::Error> {
339
232
        iterate_directory(
340
232
            &container_folder(self, schema, database_name, container),
341
232
            |entry, file_name| async move {
342
80
                if entry.file_type().await?.is_file() {
343
80
                    return Ok(Some(file_name));
344
                }
345
                Ok(None)
346
232
            },
347
445
        )
348
445
        .await
349
464
    }
350

            
351
80
    async fn load(
352
80
        &self,
353
80
        schema: &SchemaName,
354
80
        database_name: &str,
355
80
        container: &str,
356
80
        name: &str,
357
80
    ) -> Result<Vec<u8>, Self::Error> {
358
80
        tokio::fs::read(container_folder(self, schema, database_name, container).join(name)).await
359
160
    }
360
}
361

            
362
#[async_trait]
363
impl BackupLocation for PathBuf {
364
    type Error = std::io::Error;
365

            
366
    async fn store(
367
        &self,
368
        schema: &SchemaName,
369
        database_name: &str,
370
        container: &str,
371
        name: &str,
372
        object: &[u8],
373
    ) -> Result<(), Self::Error> {
374
        BackupLocation::store(&*self, schema, database_name, container, name, object).await
375
    }
376

            
377
    async fn list_schemas(&self) -> Result<Vec<SchemaName>, Self::Error> {
378
        BackupLocation::list_schemas(&*self).await
379
    }
380

            
381
    async fn list_databases(&self, schema: &SchemaName) -> Result<Vec<String>, Self::Error> {
382
        BackupLocation::list_databases(&*self, schema).await
383
    }
384

            
385
    async fn list_stored(
386
        &self,
387
        schema: &SchemaName,
388
        database_name: &str,
389
        container: &str,
390
    ) -> Result<Vec<String>, Self::Error> {
391
        BackupLocation::list_stored(&*self, schema, database_name, container).await
392
    }
393

            
394
    async fn load(
395
        &self,
396
        schema: &SchemaName,
397
        database_name: &str,
398
        container: &str,
399
        name: &str,
400
    ) -> Result<Vec<u8>, Self::Error> {
401
        BackupLocation::load(&*self, schema, database_name, container, name).await
402
    }
403
}
404

            
405
337
async fn iterate_directory<
406
337
    T,
407
337
    F: FnMut(DirEntry, String) -> Fut,
408
337
    Fut: Future<Output = Result<Option<T>, std::io::Error>>,
409
337
>(
410
337
    path: &Path,
411
337
    mut callback: F,
412
337
) -> Result<Vec<T>, std::io::Error> {
413
337
    let mut collected = Vec::new();
414
158
    let mut directories =
415
337
        if let Some(directories) = tokio::fs::read_dir(path).await.ignore_not_found()? {
416
158
            directories
417
        } else {
418
179
            return Ok(collected);
419
        };
420

            
421
395
    while let Some(entry) = directories.next_entry().await.ignore_not_found()?.flatten() {
422
237
        if let Ok(file_name) = entry.file_name().into_string() {
423
237
            if let Some(result) = callback(entry, file_name).await? {
424
237
                collected.push(result);
425
237
            }
426
        }
427
    }
428

            
429
158
    Ok(collected)
430
337
}
431

            
432
trait IoResultExt<T>: Sized {
433
    fn ignore_not_found(self) -> Result<Option<T>, std::io::Error>;
434
}
435

            
436
impl<T> IoResultExt<T> for Result<T, std::io::Error> {
437
732
    fn ignore_not_found(self) -> Result<Option<T>, std::io::Error> {
438
732
        match self {
439
553
            Ok(value) => Ok(Some(value)),
440
179
            Err(err) => {
441
179
                if err.kind() == ErrorKind::NotFound {
442
179
                    Ok(None)
443
                } else {
444
                    Err(err)
445
                }
446
            }
447
        }
448
732
    }
449
}
450

            
451
546
fn schema_folder(base: &Path, schema: &SchemaName) -> PathBuf {
452
546
    base.join(schema.encoded())
453
546
}
454

            
455
468
fn database_folder(base: &Path, schema: &SchemaName, database_name: &str) -> PathBuf {
456
468
    schema_folder(base, schema).join(database_name)
457
468
}
458

            
459
468
fn container_folder(
460
468
    base: &Path,
461
468
    schema: &SchemaName,
462
468
    database_name: &str,
463
468
    container: &str,
464
468
) -> PathBuf {
465
468
    database_folder(base, schema, database_name).join(container)
466
468
}
467

            
468
#[cfg(test)]
469
mod tests {
470
    use bonsaidb_core::{
471
        connection::Connection as _,
472
        keyvalue::KeyValue,
473
        schema::SerializedCollection,
474
        test_util::{Basic, TestDirectory},
475
    };
476

            
477
    use super::*;
478
    use crate::config::{Builder, KeyValuePersistence, PersistenceThreshold, StorageConfiguration};
479

            
480
1
    #[tokio::test]
481
1
    async fn backup_restore() -> anyhow::Result<()> {
482
1
        let backup_destination = TestDirectory::new("backup-restore.bonsaidb.backup");
483

            
484
        // First, create a database that we'll be restoring. `TestDirectory`
485
        // will automatically erase the database when it drops out of scope,
486
        // which is why we're creating a nested scope here.
487
1
        let test_doc = {
488
1
            let database_directory = TestDirectory::new("backup-restore.bonsaidb");
489
1
            let storage = Storage::open(
490
1
                StorageConfiguration::new(&database_directory)
491
1
                    .key_value_persistence(KeyValuePersistence::lazy([
492
1
                        PersistenceThreshold::after_changes(2),
493
1
                    ]))
494
1
                    .with_schema::<Basic>()?,
495
17
            )
496
17
            .await?;
497
2
            storage.create_database::<Basic>("basic", false).await?;
498
1
            let db = storage.database::<Basic>("basic").await?;
499
1
            let test_doc = db
500
1
                .collection::<Basic>()
501
1
                .push(&Basic::new("somevalue"))
502
1
                .await?;
503
1
            db.set_numeric_key("key1", 1_u64).await?;
504
1
            db.set_numeric_key("key2", 2_u64).await?;
505
            // This key will not be persisted right away.
506
1
            db.set_numeric_key("key3", 3_u64).await?;
507

            
508
22
            storage.backup(&*backup_destination.0).await.unwrap();
509
1

            
510
1
            test_doc
511
1
        };
512
1

            
513
1
        // `backup_destination` now contains an export of the database, time to try loading it:
514
1
        let database_directory = TestDirectory::new("backup-restore.bonsaidb");
515
1
        let restored_storage =
516
7
            Storage::open(StorageConfiguration::new(&database_directory).with_schema::<Basic>()?)
517
7
                .await?;
518
1
        restored_storage
519
40
            .restore(&*backup_destination.0)
520
40
            .await
521
1
            .unwrap();
522

            
523
1
        let db = restored_storage.database::<Basic>("basic").await?;
524
1
        let doc = Basic::get(test_doc.id, &db)
525
1
            .await?
526
1
            .expect("Backed up document.not found");
527
1
        assert_eq!(doc.contents.value, "somevalue");
528
1
        assert_eq!(db.get_key("key1").into_u64().await?, Some(1));
529
1
        assert_eq!(db.get_key("key2").into_u64().await?, Some(2));
530
1
        assert_eq!(db.get_key("key3").into_u64().await?, Some(3));
531

            
532
        // Calling restore again should generate an error.
533
1
        assert!(restored_storage
534
17
            .restore(&*backup_destination.0)
535
17
            .await
536
1
            .is_err());
537

            
538
1
        Ok(())
539
1
    }
540
}