1
use std::{
2
    io::ErrorKind,
3
    path::{Path, PathBuf},
4
};
5

            
6
use async_trait::async_trait;
7
use bonsaidb_core::{
8
    admin,
9
    connection::{Connection, Range, Sort, StorageConnection},
10
    schema::{Collection, SchemaName},
11
    transaction::{Operation, Transaction},
12
    AnyError,
13
};
14
use futures::Future;
15
use tokio::fs::DirEntry;
16

            
17
use crate::{database::keyvalue::Entry, Database, Error, Storage};
18

            
19
/// A location to store and restore a database from.
20
#[async_trait]
21
pub trait BackupLocation: Send + Sync {
22
    /// The error type for the backup location.
23
    type Error: AnyError;
24

            
25
    /// Store `object` at `path` with `name`.
26
    async fn store(
27
        &self,
28
        schema: &SchemaName,
29
        database_name: &str,
30
        container: &str,
31
        name: &str,
32
        object: &[u8],
33
    ) -> Result<(), Self::Error>;
34

            
35
    /// Lists all of the schemas stored in this backup location.
36
    async fn list_schemas(&self) -> Result<Vec<SchemaName>, Self::Error>;
37

            
38
    /// List all of the names of the databases stored for `schema`.
39
    async fn list_databases(&self, schema: &SchemaName) -> Result<Vec<String>, Self::Error>;
40

            
41
    /// List all stored named objects at `path`. The names should be the same that were provided when `store()` was called.
42
    async fn list_stored(
43
        &self,
44
        schema: &SchemaName,
45
        database_name: &str,
46
        container: &str,
47
    ) -> Result<Vec<String>, Self::Error>;
48

            
49
    /// Load a previously stored object from `path` with `name`.
50
    async fn load(
51
        &self,
52
        schema: &SchemaName,
53
        database_name: &str,
54
        container: &str,
55
        name: &str,
56
    ) -> Result<Vec<u8>, Self::Error>;
57
}
58

            
59
impl Storage {
60
    /// Stores a copy of all data in this instance to `location`.
61
2
    pub async fn backup<L: AnyBackupLocation>(&self, location: L) -> Result<(), Error> {
62
2
        let databases = {
63
2
            self.data
64
2
                .available_databases
65
2
                .read()
66
                .await
67
2
                .keys()
68
2
                .cloned()
69
2
                .collect::<Vec<_>>()
70
        };
71

            
72
7
        for name in databases {
73
5
            let database = self.database_without_schema(&name).await?;
74
48
            self.backup_database(&database, &location).await?;
75
        }
76

            
77
2
        Ok(())
78
2
    }
79

            
80
    /// Stores a copy of all data in this instance to `location`.
81
3
    pub async fn restore<L: AnyBackupLocation>(&self, location: L) -> Result<(), Error> {
82
6
        for schema in location
83
20
            .list_schemas()
84
20
            .await
85
3
            .map_err(|err| Error::Backup(Box::new(err)))?
86
        {
87
6
            for database in location
88
19
                .list_databases(&schema)
89
19
                .await
90
6
                .map_err(|err| Error::Backup(Box::new(err)))?
91
            {
92
                // The admin database is already going to be created by the process of creating a database.
93
7
                self.create_database_with_schema(&database, schema.clone(), true)
94
6
                    .await?;
95

            
96
6
                let database = self.database_without_schema(&database).await?;
97
50
                self.restore_database(&database, &location).await?;
98
            }
99
        }
100

            
101
2
        Ok(())
102
3
    }
103

            
104
68
    pub(crate) async fn backup_database(
105
68
        &self,
106
68
        database: &Database,
107
68
        location: &dyn AnyBackupLocation,
108
68
    ) -> Result<(), Error> {
109
5
        let schema = database.schematic().name.clone();
110
12
        for collection in database.schematic().collections() {
111
12
            let documents = database
112
12
                .list(Range::from(..), Sort::Ascending, None, &collection)
113
12
                .await?;
114
12
            let collection_name = collection.encoded();
115
            // TODO consider how to best parallelize -- perhaps a location can opt into parallelization?
116
21
            for document in documents {
117
9
                location
118
9
                    .store(
119
9
                        &schema,
120
9
                        database.name(),
121
9
                        &collection_name,
122
9
                        &document.id.to_string(),
123
9
                        &document.contents,
124
18
                    )
125
18
                    .await?;
126
            }
127
12
            for ((namespace, key), entry) in database.all_key_value_entries().await? {
128
3
                let full_name = format!("{}._key._{}", namespace.as_deref().unwrap_or(""), key);
129
3
                location
130
3
                    .store(
131
3
                        &schema,
132
3
                        database.name(),
133
3
                        "_kv",
134
3
                        &full_name,
135
3
                        &pot::to_vec(&entry)?,
136
6
                    )
137
6
                    .await?;
138
            }
139
        }
140
5
        Ok(())
141
5
    }
142

            
143
69
    pub(crate) async fn restore_database(
144
69
        &self,
145
69
        database: &Database,
146
69
        location: &dyn AnyBackupLocation,
147
69
    ) -> Result<(), Error> {
148
6
        let schema = database.schematic().name.clone();
149
6
        let mut transaction = Transaction::new();
150
6
        // Restore all the collections. However, there's one collection we don't
151
6
        // want to restore: the Databases list. This will be recreated during
152
6
        // the process of restoring the backup, so we skip it.
153
6
        let database_collection = admin::Database::collection_name();
154
11
        for collection in database
155
6
            .schematic()
156
6
            .collections()
157
6
            .into_iter()
158
13
            .filter(|c| c != &database_collection)
159
        {
160
11
            let collection_name = collection.encoded();
161
11
            for (id, id_string) in location
162
24
                .list_stored(&schema, database.name(), &collection_name)
163
24
                .await?
164
11
                .into_iter()
165
11
                .filter_map(|id_string| id_string.parse::<u64>().ok().map(|id| (id, id_string)))
166
5
            {
167
5
                let contents = location
168
5
                    .load(&schema, database.name(), &collection_name, &id_string)
169
4
                    .await?;
170
5
                transaction.push(Operation::insert(collection.clone(), Some(id), contents));
171
            }
172
        }
173
7
        database.apply_transaction(transaction).await?;
174

            
175
6
        for full_key in location
176
12
            .list_stored(&schema, database.name(), "_kv")
177
12
            .await?
178
        {
179
3
            if let Some((namespace, key)) = full_key.split_once("._key._") {
180
3
                let entry = location
181
3
                    .load(&schema, database.name(), "_kv", &full_key)
182
3
                    .await?;
183
3
                let entry = pot::from_slice::<Entry>(&entry)?;
184
3
                let namespace = if namespace.is_empty() {
185
3
                    None
186
                } else {
187
                    Some(namespace.to_string())
188
                };
189
3
                entry.restore(namespace, key.to_string(), database).await?;
190
            }
191
        }
192

            
193
5
        Ok(())
194
6
    }
195
}
196

            
197
#[async_trait]
198
pub trait AnyBackupLocation: Send + Sync {
199
    async fn store(
200
        &self,
201
        schema: &SchemaName,
202
        database_name: &str,
203
        container: &str,
204
        name: &str,
205
        object: &[u8],
206
    ) -> Result<(), Error>;
207

            
208
    async fn list_schemas(&self) -> Result<Vec<SchemaName>, Error>;
209

            
210
    async fn list_databases(&self, schema: &SchemaName) -> Result<Vec<String>, Error>;
211

            
212
    async fn list_stored(
213
        &self,
214
        schema: &SchemaName,
215
        database_name: &str,
216
        container: &str,
217
    ) -> Result<Vec<String>, Error>;
218

            
219
    async fn load(
220
        &self,
221
        schema: &SchemaName,
222
        database_name: &str,
223
        container: &str,
224
        name: &str,
225
    ) -> Result<Vec<u8>, Error>;
226
}
227

            
228
#[async_trait]
229
impl<L, E> AnyBackupLocation for L
230
where
231
    L: BackupLocation<Error = E>,
232
    E: AnyError,
233
{
234
12
    async fn store(
235
12
        &self,
236
12
        schema: &SchemaName,
237
12
        database_name: &str,
238
12
        container: &str,
239
12
        name: &str,
240
12
        object: &[u8],
241
12
    ) -> Result<(), Error> {
242
24
        self.store(schema, database_name, container, name, object)
243
24
            .await
244
12
            .map_err(|err| Error::Backup(Box::new(err)))
245
24
    }
246

            
247
3
    async fn list_schemas(&self) -> Result<Vec<SchemaName>, Error> {
248
20
        self.list_schemas()
249
20
            .await
250
3
            .map_err(|err| Error::Backup(Box::new(err)))
251
6
    }
252

            
253
6
    async fn list_databases(&self, schema: &SchemaName) -> Result<Vec<String>, Error> {
254
19
        self.list_databases(schema)
255
19
            .await
256
6
            .map_err(|err| Error::Backup(Box::new(err)))
257
12
    }
258

            
259
16
    async fn list_stored(
260
16
        &self,
261
16
        schema: &SchemaName,
262
16
        database_name: &str,
263
16
        container: &str,
264
16
    ) -> Result<Vec<String>, Error> {
265
36
        self.list_stored(schema, database_name, container)
266
36
            .await
267
16
            .map_err(|err| Error::Backup(Box::new(err)))
268
32
    }
269

            
270
8
    async fn load(
271
8
        &self,
272
8
        schema: &SchemaName,
273
8
        database_name: &str,
274
8
        container: &str,
275
8
        name: &str,
276
8
    ) -> Result<Vec<u8>, Error> {
277
8
        self.load(schema, database_name, container, name)
278
7
            .await
279
8
            .map_err(|err| Error::Backup(Box::new(err)))
280
16
    }
281
}
282

            
283
#[async_trait]
284
impl<'a> BackupLocation for &'a Path {
285
    type Error = std::io::Error;
286

            
287
138
    async fn store(
288
138
        &self,
289
138
        schema: &SchemaName,
290
138
        database_name: &str,
291
138
        container: &str,
292
138
        name: &str,
293
138
        object: &[u8],
294
138
    ) -> Result<(), Self::Error> {
295
138
        let container_folder = container_folder(self, schema, database_name, container);
296
138
        tokio::fs::create_dir_all(&container_folder).await?;
297
138
        tokio::fs::write(container_folder.join(name), object).await?;
298

            
299
138
        Ok(())
300
276
    }
301

            
302
24
    async fn list_schemas(&self) -> Result<Vec<SchemaName>, Self::Error> {
303
70
        iterate_directory(self, |entry, file_name| async move {
304
70
            if entry.file_type().await?.is_dir() {
305
70
                if let Ok(schema_name) = SchemaName::parse_encoded(file_name.as_str()) {
306
70
                    return Ok(Some(schema_name));
307
                }
308
            }
309
            Ok(None)
310
188
        })
311
188
        .await
312
48
    }
313

            
314
69
    async fn list_databases(&self, schema: &SchemaName) -> Result<Vec<String>, Self::Error> {
315
69
        iterate_directory(
316
69
            &schema_folder(self, schema),
317
69
            |entry, file_name| async move {
318
69
                if entry.file_type().await?.is_dir() && file_name != "_kv" {
319
69
                    return Ok(Some(file_name));
320
                }
321
                Ok(None)
322
69
            },
323
271
        )
324
271
        .await
325
138
    }
326

            
327
205
    async fn list_stored(
328
205
        &self,
329
205
        schema: &SchemaName,
330
205
        database_name: &str,
331
205
        container: &str,
332
205
    ) -> Result<Vec<String>, Self::Error> {
333
205
        iterate_directory(
334
205
            &container_folder(self, schema, database_name, container),
335
205
            |entry, file_name| async move {
336
71
                if entry.file_type().await?.is_file() {
337
71
                    return Ok(Some(file_name));
338
                }
339
                Ok(None)
340
205
            },
341
393
        )
342
393
        .await
343
410
    }
344

            
345
71
    async fn load(
346
71
        &self,
347
71
        schema: &SchemaName,
348
71
        database_name: &str,
349
71
        container: &str,
350
71
        name: &str,
351
71
    ) -> Result<Vec<u8>, Self::Error> {
352
71
        tokio::fs::read(container_folder(self, schema, database_name, container).join(name)).await
353
142
    }
354
}
355

            
356
#[async_trait]
357
impl BackupLocation for PathBuf {
358
    type Error = std::io::Error;
359

            
360
    async fn store(
361
        &self,
362
        schema: &SchemaName,
363
        database_name: &str,
364
        container: &str,
365
        name: &str,
366
        object: &[u8],
367
    ) -> Result<(), Self::Error> {
368
        BackupLocation::store(&*self, schema, database_name, container, name, object).await
369
    }
370

            
371
    async fn list_schemas(&self) -> Result<Vec<SchemaName>, Self::Error> {
372
        BackupLocation::list_schemas(&*self).await
373
    }
374

            
375
    async fn list_databases(&self, schema: &SchemaName) -> Result<Vec<String>, Self::Error> {
376
        BackupLocation::list_databases(&*self, schema).await
377
    }
378

            
379
    async fn list_stored(
380
        &self,
381
        schema: &SchemaName,
382
        database_name: &str,
383
        container: &str,
384
    ) -> Result<Vec<String>, Self::Error> {
385
        BackupLocation::list_stored(&*self, schema, database_name, container).await
386
    }
387

            
388
    async fn load(
389
        &self,
390
        schema: &SchemaName,
391
        database_name: &str,
392
        container: &str,
393
        name: &str,
394
    ) -> Result<Vec<u8>, Self::Error> {
395
        BackupLocation::load(&*self, schema, database_name, container, name).await
396
    }
397
}
398

            
399
298
async fn iterate_directory<
400
298
    T,
401
298
    F: FnMut(DirEntry, String) -> Fut,
402
298
    Fut: Future<Output = Result<Option<T>, std::io::Error>>,
403
298
>(
404
298
    path: &Path,
405
298
    mut callback: F,
406
298
) -> Result<Vec<T>, std::io::Error> {
407
298
    let mut collected = Vec::new();
408
140
    let mut directories =
409
298
        if let Some(directories) = tokio::fs::read_dir(path).await.ignore_not_found()? {
410
140
            directories
411
        } else {
412
158
            return Ok(collected);
413
        };
414

            
415
350
    while let Some(entry) = directories.next_entry().await.ignore_not_found()?.flatten() {
416
210
        if let Ok(file_name) = entry.file_name().into_string() {
417
210
            if let Some(result) = callback(entry, file_name).await? {
418
210
                collected.push(result);
419
210
            }
420
        }
421
    }
422

            
423
140
    Ok(collected)
424
298
}
425

            
426
trait IoResultExt<T>: Sized {
427
    fn ignore_not_found(self) -> Result<Option<T>, std::io::Error>;
428
}
429

            
430
impl<T> IoResultExt<T> for Result<T, std::io::Error> {
431
648
    fn ignore_not_found(self) -> Result<Option<T>, std::io::Error> {
432
648
        match self {
433
490
            Ok(value) => Ok(Some(value)),
434
158
            Err(err) => {
435
158
                if err.kind() == ErrorKind::NotFound {
436
158
                    Ok(None)
437
                } else {
438
                    Err(err)
439
                }
440
            }
441
        }
442
648
    }
443
}
444

            
445
483
fn schema_folder(base: &Path, schema: &SchemaName) -> PathBuf {
446
483
    base.join(schema.encoded())
447
483
}
448

            
449
414
fn database_folder(base: &Path, schema: &SchemaName, database_name: &str) -> PathBuf {
450
414
    schema_folder(base, schema).join(database_name)
451
414
}
452

            
453
414
fn container_folder(
454
414
    base: &Path,
455
414
    schema: &SchemaName,
456
414
    database_name: &str,
457
414
    container: &str,
458
414
) -> PathBuf {
459
414
    database_folder(base, schema, database_name).join(container)
460
414
}
461

            
462
#[cfg(test)]
463
mod tests {
464
    use bonsaidb_core::{
465
        connection::Connection as _,
466
        document::Document,
467
        keyvalue::KeyValue,
468
        test_util::{Basic, TestDirectory},
469
    };
470

            
471
    use super::*;
472
    use crate::config::{Builder, KeyValuePersistence, PersistenceThreshold, StorageConfiguration};
473

            
474
1
    #[tokio::test]
475
1
    async fn backup_restore() -> anyhow::Result<()> {
476
1
        let backup_destination = TestDirectory::new("backup-restore.bonsaidb.backup");
477

            
478
        // First, create a database that we'll be restoring. `TestDirectory`
479
        // will automatically erase the database when it drops out of scope,
480
        // which is why we're creating a nested scope here.
481
1
        let test_doc = {
482
1
            let database_directory = TestDirectory::new("backup-restore.bonsaidb");
483
1
            let storage = Storage::open(
484
1
                StorageConfiguration::new(&database_directory)
485
1
                    .key_value_persistence(KeyValuePersistence::lazy([
486
1
                        PersistenceThreshold::after_changes(2),
487
1
                    ]))
488
1
                    .with_schema::<Basic>()?,
489
17
            )
490
17
            .await?;
491
2
            storage.create_database::<Basic>("basic", false).await?;
492
1
            let db = storage.database::<Basic>("basic").await?;
493
1
            let test_doc = db
494
1
                .collection::<Basic>()
495
1
                .push(&Basic::new("somevalue"))
496
1
                .await?;
497
1
            db.set_numeric_key("key1", 1_u64).await?;
498
1
            db.set_numeric_key("key2", 2_u64).await?;
499
            // This key will not be persisted right away.
500
1
            db.set_numeric_key("key3", 3_u64).await?;
501

            
502
22
            storage.backup(&*backup_destination.0).await.unwrap();
503
1

            
504
1
            test_doc
505
1
        };
506
1

            
507
1
        // `backup_destination` now contains an export of the database, time to try loading it:
508
1
        let database_directory = TestDirectory::new("backup-restore.bonsaidb");
509
1
        let restored_storage =
510
7
            Storage::open(StorageConfiguration::new(&database_directory).with_schema::<Basic>()?)
511
7
                .await?;
512
1
        restored_storage
513
36
            .restore(&*backup_destination.0)
514
36
            .await
515
1
            .unwrap();
516

            
517
1
        let db = restored_storage.database::<Basic>("basic").await?;
518
1
        let doc = db
519
1
            .get::<Basic>(test_doc.id)
520
1
            .await?
521
1
            .expect("Backed up document.not found");
522
1
        let contents = doc.contents::<Basic>()?;
523
1
        assert_eq!(contents.value, "somevalue");
524
1
        assert_eq!(db.get_key("key1").into_u64().await?, Some(1));
525
1
        assert_eq!(db.get_key("key2").into_u64().await?, Some(2));
526
1
        assert_eq!(db.get_key("key3").into_u64().await?, Some(3));
527

            
528
        // Calling restore again should generate an error.
529
1
        assert!(restored_storage
530
14
            .restore(&*backup_destination.0)
531
14
            .await
532
1
            .is_err());
533

            
534
1
        Ok(())
535
1
    }
536
}