1
use std::sync::Arc;
2

            
3
use async_trait::async_trait;
4
use bonsaidb_core::connection::{
5
    self, AccessPolicy, AsyncConnection, AsyncLowLevelConnection, AsyncStorageConnection,
6
    Connection, HasSchema, HasSession, IdentityReference, LowLevelConnection, Range,
7
    SerializedQueryKey, Session, Sort, StorageConnection,
8
};
9
use bonsaidb_core::document::{DocumentId, Header, OwnedDocument};
10
use bonsaidb_core::keyvalue::{AsyncKeyValue, KeyOperation, KeyValue, Output};
11
use bonsaidb_core::permissions::Permissions;
12
use bonsaidb_core::pubsub::{self, AsyncPubSub, AsyncSubscriber, PubSub, Receiver};
13
use bonsaidb_core::schema::view::map::MappedSerializedValue;
14
use bonsaidb_core::schema::{
15
    self, CollectionName, Nameable, Schema, SchemaName, SchemaSummary, Schematic, ViewName,
16
};
17
use bonsaidb_core::transaction::{self, OperationResult, Transaction};
18

            
19
use crate::config::StorageConfiguration;
20
use crate::database::DatabaseNonBlocking;
21
use crate::storage::{AnyBackupLocation, StorageNonBlocking};
22
use crate::{Database, Error, Storage, Subscriber};
23

            
24
/// A file-based, multi-database, multi-user database engine. This type is
25
/// designed for use with [Tokio](https://tokio.rs). For blocking
26
/// (non-asynchronous) code, see the [`Storage`] type instead.
27
///
28
/// ## Converting between Blocking and Async Types
29
///
30
/// [`AsyncDatabase`] and [`Database`] can be converted to and from each other
31
/// using:
32
///
33
/// - [`AsyncStorage::into_blocking()`]
34
/// - [`AsyncStorage::to_blocking()`]
35
/// - [`AsyncStorage::as_blocking()`]
36
/// - [`Storage::into_async()`]
37
/// - [`Storage::to_async()`]
38
/// - [`Storage::into_async_with_runtime()`]
39
/// - [`Storage::to_async_with_runtime()`]
40
///
41
/// ## Converting from `AsyncDatabase::open` to `AsyncStorage::open`
42
///
43
/// [`AsyncDatabase::open`](AsyncDatabase::open) is a simple method that uses
44
/// `AsyncStorage` to create a database named `default` with the schema
45
/// provided. These two ways of opening the database are the same:
46
///
47
/// ```rust
48
/// // `bonsaidb_core` is re-exported to `bonsaidb::core` or `bonsaidb_local::core`.
49
/// use bonsaidb_core::connection::AsyncStorageConnection;
50
/// use bonsaidb_core::schema::Schema;
51
/// // `bonsaidb_local` is re-exported to `bonsaidb::local` if using the omnibus crate.
52
/// use bonsaidb_local::{
53
///     config::{Builder, StorageConfiguration},
54
///     AsyncDatabase, AsyncStorage,
55
/// };
56
/// # async fn open<MySchema: Schema>() -> anyhow::Result<()> {
57
/// // This creates a Storage instance, creates a database, and returns it.
58
/// let db = AsyncDatabase::open::<MySchema>(StorageConfiguration::new("my-db.bonsaidb")).await?;
59
///
60
/// // This is the equivalent code being executed:
61
/// let storage =
62
///     AsyncStorage::open(StorageConfiguration::new("my-db.bonsaidb").with_schema::<MySchema>()?)
63
///         .await?;
64
/// storage.create_database::<MySchema>("default", true).await?;
65
/// let db = storage.database::<MySchema>("default").await?;
66
/// #     Ok(())
67
/// # }
68
/// ```
69
///
70
/// ## Using multiple databases
71
///
72
/// This example shows how to use `AsyncStorage` to create and use multiple
73
/// databases with multiple schemas:
74
///
75
/// ```rust
76
/// use bonsaidb_core::connection::AsyncStorageConnection;
77
/// use bonsaidb_core::schema::{Collection, Schema};
78
/// use bonsaidb_local::config::{Builder, StorageConfiguration};
79
/// use bonsaidb_local::AsyncStorage;
80
/// use serde::{Deserialize, Serialize};
81
///
82
/// #[derive(Debug, Schema)]
83
/// #[schema(name = "my-schema", collections = [BlogPost, Author])]
84
/// # #[schema(core = bonsaidb_core)]
85
/// struct MySchema;
86
///
87
/// #[derive(Debug, Serialize, Deserialize, Collection)]
88
/// #[collection(name = "blog-posts")]
89
/// # #[collection(core = bonsaidb_core)]
90
/// struct BlogPost {
91
///     pub title: String,
92
///     pub contents: String,
93
///     pub author_id: u64,
94
/// }
95
///
96
/// #[derive(Debug, Serialize, Deserialize, Collection)]
97
/// #[collection(name = "blog-posts")]
98
/// # #[collection(core = bonsaidb_core)]
99
/// struct Author {
100
///     pub name: String,
101
/// }
102
///
103
/// # async fn test_fn() -> Result<(), bonsaidb_core::Error> {
104
/// let storage = AsyncStorage::open(
105
///     StorageConfiguration::new("my-db.bonsaidb")
106
///         .with_schema::<BlogPost>()?
107
///         .with_schema::<MySchema>()?,
108
/// )
109
/// .await?;
110
///
111
/// storage
112
///     .create_database::<BlogPost>("ectons-blog", true)
113
///     .await?;
114
/// let ectons_blog = storage.database::<BlogPost>("ectons-blog").await?;
115
/// storage
116
///     .create_database::<MySchema>("another-db", true)
117
///     .await?;
118
/// let another_db = storage.database::<MySchema>("another-db").await?;
119
///
120
/// #     Ok(())
121
/// # }
122
/// ```
123
196143
#[derive(Debug, Clone)]
124
#[must_use]
125
pub struct AsyncStorage {
126
    pub(crate) storage: Storage,
127
    pub(crate) runtime: Arc<tokio::runtime::Handle>,
128
}
129

            
130
impl AsyncStorage {
131
    /// Creates or opens a multi-database [`AsyncStorage`] with its data stored in `directory`.
132
3585
    pub async fn open(configuration: StorageConfiguration) -> Result<Self, Error> {
133
165
        tokio::task::spawn_blocking(move || Storage::open(configuration))
134
165
            .await?
135
165
            .map(Storage::into_async)
136
165
    }
137

            
138
    /// Restores all data from a previously stored backup `location`.
139
1
    pub async fn restore<L: AnyBackupLocation + 'static>(&self, location: L) -> Result<(), Error> {
140
1
        let task_self = self.clone();
141
1
        self.runtime
142
1
            .spawn_blocking(move || task_self.storage.restore(&location))
143
1
            .await?
144
1
    }
145

            
146
    /// Stores a copy of all data in this instance to `location`.
147
1
    pub async fn backup<L: AnyBackupLocation + 'static>(&self, location: L) -> Result<(), Error> {
148
1
        let task_self = self.clone();
149
1
        self.runtime
150
1
            .spawn_blocking(move || task_self.storage.backup(&location))
151
1
            .await?
152
1
    }
153

            
154
    /// Restricts an unauthenticated instance to having `effective_permissions`.
155
    /// Returns `None` if a session has already been established.
156
    #[must_use]
157
    pub fn with_effective_permissions(&self, effective_permissions: Permissions) -> Option<Self> {
158
        self.storage
159
            .with_effective_permissions(effective_permissions)
160
            .map(|storage| Self {
161
                storage,
162
                runtime: self.runtime.clone(),
163
            })
164
    }
165

            
166
    #[cfg(feature = "internal-apis")]
167
    #[doc(hidden)]
168
2631958
    pub async fn database_without_schema(&self, name: &str) -> Result<AsyncDatabase, Error> {
169
71134
        let name = name.to_owned();
170
71134
        let task_self = self.clone();
171
71134
        self.runtime
172
71134
            .spawn_blocking(move || {
173
71134
                task_self
174
71134
                    .storage
175
71134
                    .database_without_schema(&name)
176
71134
                    .map(Database::into_async)
177
71134
            })
178
46038
            .await?
179
71134
    }
180

            
181
    /// Converts this instance into its blocking version, which is able to be
182
    /// used without async.
183
    pub fn into_blocking(self) -> Storage {
184
        self.storage
185
    }
186

            
187
    /// Converts this instance into its blocking version, which is able to be
188
    /// used without async.
189
    pub fn to_blocking(&self) -> Storage {
190
        self.storage.clone()
191
    }
192

            
193
    /// Returns a reference to this instance's blocking version, which is able
194
    /// to be used without async.
195
    pub fn as_blocking(&self) -> &Storage {
196
        &self.storage
197
    }
198
}
199

            
200
impl<'a> From<&'a AsyncStorage> for Storage {
201
    fn from(storage: &'a AsyncStorage) -> Self {
202
        storage.to_blocking()
203
    }
204
}
205

            
206
impl From<AsyncStorage> for Storage {
207
    fn from(storage: AsyncStorage) -> Self {
208
        storage.into_blocking()
209
    }
210
}
211

            
212
impl StorageNonBlocking for AsyncStorage {
213
6216
    fn path(&self) -> &std::path::Path {
214
6216
        self.storage.path()
215
6216
    }
216

            
217
2678837
    fn assume_session(&self, session: Session) -> Result<Self, bonsaidb_core::Error> {
218
2678837
        self.storage.assume_session(session).map(|storage| Self {
219
2678689
            storage,
220
2678689
            runtime: self.runtime.clone(),
221
2678837
        })
222
2678837
    }
223
}
224

            
225
/// A database stored in BonsaiDb. This type is designed for use with
226
/// [Tokio](https://tokio.rs). For blocking (non-asynchronous) code, see the
227
/// [`Database`] type instead.
228
///
229
/// ## Converting between Async and Blocking Types
230
///
231
/// [`AsyncDatabase`] and [`Database`] can be converted to and from each other
232
/// using:
233
///
234
/// - [`AsyncDatabase::into_blocking()`]
235
/// - [`AsyncDatabase::to_blocking()`]
236
/// - [`AsyncDatabase::as_blocking()`]
237
/// - [`Database::into_async()`]
238
/// - [`Database::to_async()`]
239
/// - [`Database::into_async_with_runtime()`]
240
/// - [`Database::to_async_with_runtime()`]
241
///
242
/// ## Using `Database` to create a single database
243
///
244
/// `Database`provides an easy mechanism to open and access a single database:
245
///
246
/// ```rust
247
/// // `bonsaidb_core` is re-exported to `bonsaidb::core` or `bonsaidb_local::core`.
248
/// use bonsaidb_core::schema::Collection;
249
/// // `bonsaidb_local` is re-exported to `bonsaidb::local` if using the omnibus crate.
250
/// use bonsaidb_local::{
251
///     config::{Builder, StorageConfiguration},
252
///     AsyncDatabase,
253
/// };
254
/// use serde::{Deserialize, Serialize};
255
///
256
/// #[derive(Debug, Serialize, Deserialize, Collection)]
257
/// #[collection(name = "blog-posts")]
258
/// # #[collection(core = bonsaidb_core)]
259
/// struct BlogPost {
260
///     pub title: String,
261
///     pub contents: String,
262
/// }
263
///
264
/// # async fn test_fn() -> Result<(), bonsaidb_core::Error> {
265
/// let db = AsyncDatabase::open::<BlogPost>(StorageConfiguration::new("my-db.bonsaidb")).await?;
266
/// #     Ok(())
267
/// # }
268
/// ```
269
///
270
/// Under the hood, this initializes a [`AsyncStorage`] instance pointing at
271
/// "./my-db.bonsaidb". It then returns (or creates) a database named "default"
272
/// with the schema `BlogPost`.
273
///
274
/// In this example, `BlogPost` implements the
275
/// [`Collection`](schema::Collection) trait, and all collections can be used as
276
/// a [`Schema`].
277
4194757
#[derive(Debug, Clone)]
278
pub struct AsyncDatabase {
279
    pub(crate) database: Database,
280
    pub(crate) runtime: Arc<tokio::runtime::Handle>,
281
}
282

            
283
impl AsyncDatabase {
284
    /// Creates a `Storage` with a single-database named "default" with its data stored at `path`.
285
12
    pub async fn open<DB: Schema>(configuration: StorageConfiguration) -> Result<Self, Error> {
286
12
        tokio::task::spawn_blocking(move || {
287
12
            Database::open::<DB>(configuration).map(Database::into_async)
288
12
        })
289
12
        .await?
290
12
    }
291

            
292
    /// Restricts an unauthenticated instance to having `effective_permissions`.
293
    /// Returns `None` if a session has already been established.
294
    #[must_use]
295
    pub fn with_effective_permissions(&self, effective_permissions: Permissions) -> Option<Self> {
296
        self.database
297
            .with_effective_permissions(effective_permissions)
298
            .map(|database| Self {
299
                database,
300
                runtime: self.runtime.clone(),
301
            })
302
    }
303

            
304
    /// Converts this instance into its blocking version, which is able to be
305
    /// used without async.
306
    #[must_use]
307
    pub fn into_blocking(self) -> Database {
308
        self.database
309
    }
310

            
311
    /// Converts this instance into its blocking version, which is able to be
312
    /// used without async.
313
    #[must_use]
314
    pub fn to_blocking(&self) -> Database {
315
        self.database.clone()
316
    }
317

            
318
    /// Returns a reference to this instance's blocking version, which is able
319
    /// to be used without async.
320
    #[must_use]
321
    pub fn as_blocking(&self) -> &Database {
322
        &self.database
323
    }
324
}
325

            
326
impl From<AsyncDatabase> for Database {
327
    fn from(database: AsyncDatabase) -> Self {
328
        database.into_blocking()
329
    }
330
}
331

            
332
impl<'a> From<&'a AsyncDatabase> for Database {
333
    fn from(database: &'a AsyncDatabase) -> Self {
334
        database.to_blocking()
335
    }
336
}
337

            
338
impl DatabaseNonBlocking for AsyncDatabase {
339
    fn name(&self) -> &str {
340
        self.database.name()
341
    }
342
}
343

            
344
impl HasSession for AsyncStorage {
345
4041
    fn session(&self) -> Option<&Session> {
346
4041
        self.storage.session()
347
4041
    }
348
}
349

            
350
#[async_trait]
351
impl AsyncStorageConnection for AsyncStorage {
352
    type Authenticated = Self;
353
    type Database = AsyncDatabase;
354

            
355
74
    async fn admin(&self) -> Self::Database {
356
74
        let task_self = self.clone();
357
74

            
358
74
        self.runtime
359
74
            .spawn_blocking(move || task_self.storage.admin())
360
74
            .await
361
74
            .unwrap()
362
74
            .into_async()
363
148
    }
364

            
365
29828
    async fn create_database_with_schema(
366
29828
        &self,
367
29828
        name: &str,
368
29828
        schema: SchemaName,
369
29828
        only_if_needed: bool,
370
29828
    ) -> Result<(), bonsaidb_core::Error> {
371
29828
        let task_self = self.clone();
372
29828
        let name = name.to_owned();
373
29828
        self.runtime
374
29828
            .spawn_blocking(move || {
375
29828
                StorageConnection::create_database_with_schema(
376
29828
                    &task_self.storage,
377
29828
                    &name,
378
29828
                    schema,
379
29828
                    only_if_needed,
380
29828
                )
381
29828
            })
382
29828
            .await
383
29828
            .map_err(Error::from)?
384
59656
    }
385

            
386
655
    async fn database<DB: Schema>(
387
655
        &self,
388
655
        name: &str,
389
655
    ) -> Result<Self::Database, bonsaidb_core::Error> {
390
655
        let task_self = self.clone();
391
655
        let name = name.to_owned();
392
655
        self.runtime
393
655
            .spawn_blocking(move || {
394
655
                task_self
395
655
                    .storage
396
655
                    .database::<DB>(&name)
397
655
                    .map(Database::into_async)
398
655
            })
399
648
            .await
400
655
            .map_err(Error::from)?
401
1310
    }
402

            
403
18800
    async fn delete_database(&self, name: &str) -> Result<(), bonsaidb_core::Error> {
404
18800
        let task_self = self.clone();
405
18800
        let name = name.to_owned();
406
18800
        self.runtime
407
18800
            .spawn_blocking(move || task_self.storage.delete_database(&name))
408
18800
            .await
409
18800
            .map_err(Error::from)?
410
37600
    }
411

            
412
150
    async fn list_databases(&self) -> Result<Vec<connection::Database>, bonsaidb_core::Error> {
413
150
        let task_self = self.clone();
414
150
        self.runtime
415
150
            .spawn_blocking(move || task_self.storage.list_databases())
416
150
            .await
417
150
            .map_err(Error::from)?
418
300
    }
419

            
420
150
    async fn list_available_schemas(&self) -> Result<Vec<SchemaSummary>, bonsaidb_core::Error> {
421
150
        let task_self = self.clone();
422
150
        self.runtime
423
150
            .spawn_blocking(move || task_self.storage.list_available_schemas())
424
150
            .await
425
150
            .map_err(Error::from)?
426
300
    }
427

            
428
633
    async fn create_user(&self, username: &str) -> Result<u64, bonsaidb_core::Error> {
429
633
        let task_self = self.clone();
430
633
        let username = username.to_owned();
431
633
        self.runtime
432
633
            .spawn_blocking(move || task_self.storage.create_user(&username))
433
596
            .await
434
633
            .map_err(Error::from)?
435
1266
    }
436

            
437
6
    async fn delete_user<'user, U: Nameable<'user, u64> + Send + Sync>(
438
6
        &self,
439
6
        user: U,
440
6
    ) -> Result<(), bonsaidb_core::Error> {
441
6
        let task_self = self.clone();
442
6
        let user = user.name()?.into_owned();
443
6
        self.runtime
444
6
            .spawn_blocking(move || task_self.storage.delete_user(user))
445
6
            .await
446
6
            .map_err(Error::from)?
447
12
    }
448

            
449
    #[cfg(feature = "password-hashing")]
450
5
    async fn set_user_password<'user, U: Nameable<'user, u64> + Send + Sync>(
451
5
        &self,
452
5
        user: U,
453
5
        password: bonsaidb_core::connection::SensitiveString,
454
5
    ) -> Result<(), bonsaidb_core::Error> {
455
5
        let task_self = self.clone();
456
5
        let user = user.name()?.into_owned();
457
5
        self.runtime
458
5
            .spawn_blocking(move || task_self.storage.set_user_password(user, password))
459
5
            .await
460
5
            .map_err(Error::from)?
461
10
    }
462

            
463
    #[cfg(any(feature = "token-authentication", feature = "password-hashing"))]
464
859
    async fn authenticate(
465
859
        &self,
466
859
        authentication: bonsaidb_core::connection::Authentication,
467
859
    ) -> Result<Self, bonsaidb_core::Error> {
468
859
        let task_self = self.clone();
469
859
        self.runtime
470
859
            .spawn_blocking(move || {
471
859
                task_self
472
859
                    .storage
473
859
                    .authenticate(authentication)
474
859
                    .map(Storage::into_async)
475
859
            })
476
859
            .await
477
859
            .map_err(Error::from)?
478
1718
    }
479

            
480
74
    async fn assume_identity(
481
74
        &self,
482
74
        identity: IdentityReference<'_>,
483
74
    ) -> Result<Self::Authenticated, bonsaidb_core::Error> {
484
74
        let task_self = self.clone();
485
74
        let identity = identity.into_owned();
486
74
        self.runtime
487
74
            .spawn_blocking(move || {
488
74
                task_self
489
74
                    .storage
490
74
                    .assume_identity(identity)
491
74
                    .map(Storage::into_async)
492
74
            })
493
74
            .await
494
74
            .map_err(Error::from)?
495
148
    }
496

            
497
16
    async fn add_permission_group_to_user<
498
16
        'user,
499
16
        'group,
500
16
        U: Nameable<'user, u64> + Send + Sync,
501
16
        G: Nameable<'group, u64> + Send + Sync,
502
16
    >(
503
16
        &self,
504
16
        user: U,
505
16
        permission_group: G,
506
16
    ) -> Result<(), bonsaidb_core::Error> {
507
16
        let task_self = self.clone();
508
16
        let user = user.name()?.into_owned();
509
16
        let group = permission_group.name()?.into_owned();
510
16
        self.runtime
511
16
            .spawn_blocking(move || task_self.storage.add_permission_group_to_user(user, group))
512
16
            .await
513
16
            .map_err(Error::from)?
514
32
    }
515

            
516
12
    async fn remove_permission_group_from_user<
517
12
        'user,
518
12
        'group,
519
12
        U: Nameable<'user, u64> + Send + Sync,
520
12
        G: Nameable<'group, u64> + Send + Sync,
521
12
    >(
522
12
        &self,
523
12
        user: U,
524
12
        permission_group: G,
525
12
    ) -> Result<(), bonsaidb_core::Error> {
526
12
        let task_self = self.clone();
527
12
        let user = user.name()?.into_owned();
528
12
        let group = permission_group.name()?.into_owned();
529
12
        self.runtime
530
12
            .spawn_blocking(move || {
531
12
                task_self
532
12
                    .storage
533
12
                    .remove_permission_group_from_user(user, group)
534
12
            })
535
12
            .await
536
12
            .map_err(Error::from)?
537
24
    }
538

            
539
12
    async fn add_role_to_user<
540
12
        'user,
541
12
        'group,
542
12
        U: Nameable<'user, u64> + Send + Sync,
543
12
        G: Nameable<'group, u64> + Send + Sync,
544
12
    >(
545
12
        &self,
546
12
        user: U,
547
12
        role: G,
548
12
    ) -> Result<(), bonsaidb_core::Error> {
549
12
        let task_self = self.clone();
550
12
        let user = user.name()?.into_owned();
551
12
        let role = role.name()?.into_owned();
552
12
        self.runtime
553
12
            .spawn_blocking(move || task_self.storage.add_role_to_user(user, role))
554
12
            .await
555
12
            .map_err(Error::from)?
556
24
    }
557

            
558
12
    async fn remove_role_from_user<
559
12
        'user,
560
12
        'group,
561
12
        U: Nameable<'user, u64> + Send + Sync,
562
12
        G: Nameable<'group, u64> + Send + Sync,
563
12
    >(
564
12
        &self,
565
12
        user: U,
566
12
        role: G,
567
12
    ) -> Result<(), bonsaidb_core::Error> {
568
12
        let task_self = self.clone();
569
12
        let user = user.name()?.into_owned();
570
12
        let role = role.name()?.into_owned();
571
12
        self.runtime
572
12
            .spawn_blocking(move || task_self.storage.remove_role_from_user(user, role))
573
12
            .await
574
12
            .map_err(Error::from)?
575
24
    }
576
}
577

            
578
impl HasSession for AsyncDatabase {
579
    fn session(&self) -> Option<&Session> {
580
        self.database.session()
581
    }
582
}
583

            
584
#[async_trait]
585
impl AsyncConnection for AsyncDatabase {
586
    type Storage = AsyncStorage;
587

            
588
    fn storage(&self) -> Self::Storage {
589
        AsyncStorage {
590
            storage: self.database.storage(),
591
            runtime: self.runtime.clone(),
592
        }
593
    }
594

            
595
39774
    async fn list_executed_transactions(
596
39774
        &self,
597
39774
        starting_id: Option<u64>,
598
39774
        result_limit: Option<u32>,
599
39774
    ) -> Result<Vec<transaction::Executed>, bonsaidb_core::Error> {
600
39774
        let task_self = self.clone();
601
39774
        self.runtime
602
39774
            .spawn_blocking(move || {
603
39774
                task_self
604
39774
                    .database
605
39774
                    .list_executed_transactions(starting_id, result_limit)
606
39774
            })
607
26935
            .await
608
39774
            .map_err(Error::from)?
609
79548
    }
610

            
611
150
    async fn last_transaction_id(&self) -> Result<Option<u64>, bonsaidb_core::Error> {
612
150
        Ok(self
613
150
            .database
614
150
            .roots()
615
150
            .transactions()
616
150
            .current_transaction_id())
617
150
    }
618

            
619
150
    async fn compact(&self) -> Result<(), bonsaidb_core::Error> {
620
150
        let task_self = self.clone();
621
150
        self.runtime
622
150
            .spawn_blocking(move || Connection::compact(&task_self.database))
623
150
            .await
624
150
            .map_err(Error::from)?
625
300
    }
626

            
627
3
    async fn compact_collection<C: schema::Collection>(&self) -> Result<(), bonsaidb_core::Error> {
628
3
        let task_self = self.clone();
629
3
        self.runtime
630
3
            .spawn_blocking(move || Connection::compact_collection::<C>(&task_self.database))
631
3
            .await
632
3
            .map_err(Error::from)?
633
6
    }
634

            
635
150
    async fn compact_key_value_store(&self) -> Result<(), bonsaidb_core::Error> {
636
150
        let task_self = self.clone();
637
150
        self.runtime
638
150
            .spawn_blocking(move || Connection::compact_key_value_store(&task_self.database))
639
150
            .await
640
150
            .map_err(Error::from)?
641
300
    }
642
}
643

            
644
#[async_trait]
645
impl AsyncKeyValue for AsyncDatabase {
646
1516663
    async fn execute_key_operation(
647
1516663
        &self,
648
1516663
        op: KeyOperation,
649
1516663
    ) -> Result<Output, bonsaidb_core::Error> {
650
1516663
        let task_self = self.clone();
651
1516663
        self.runtime
652
1516663
            .spawn_blocking(move || KeyValue::execute_key_operation(&task_self.database, op))
653
945336
            .await
654
1516663
            .map_err(Error::from)?
655
3033326
    }
656
}
657

            
658
#[async_trait]
659
impl AsyncPubSub for AsyncDatabase {
660
    type Subscriber = Subscriber;
661

            
662
1274
    async fn create_subscriber(&self) -> Result<Self::Subscriber, bonsaidb_core::Error> {
663
1274
        PubSub::create_subscriber(&self.database)
664
1274
    }
665

            
666
1907
    async fn publish_bytes(
667
1907
        &self,
668
1907
        topic: Vec<u8>,
669
1907
        payload: Vec<u8>,
670
1907
    ) -> Result<(), bonsaidb_core::Error> {
671
1907
        PubSub::publish_bytes(&self.database, topic, payload)
672
1907
    }
673

            
674
6
    async fn publish_bytes_to_all(
675
6
        &self,
676
6
        topics: impl IntoIterator<Item = Vec<u8>> + Send + 'async_trait,
677
6
        payload: Vec<u8>,
678
6
    ) -> Result<(), bonsaidb_core::Error> {
679
6
        PubSub::publish_bytes_to_all(&self.database, topics, payload)
680
6
    }
681
}
682

            
683
#[async_trait]
684
impl AsyncSubscriber for Subscriber {
685
581
    async fn subscribe_to_bytes(&self, topic: Vec<u8>) -> Result<(), bonsaidb_core::Error> {
686
581
        pubsub::Subscriber::subscribe_to_bytes(self, topic)
687
581
    }
688

            
689
39
    async fn unsubscribe_from_bytes(&self, topic: &[u8]) -> Result<(), bonsaidb_core::Error> {
690
39
        pubsub::Subscriber::unsubscribe_from_bytes(self, topic)
691
39
    }
692

            
693
1072
    fn receiver(&self) -> &Receiver {
694
1072
        pubsub::Subscriber::receiver(self)
695
1072
    }
696
}
697

            
698
impl HasSchema for AsyncDatabase {
699
531194
    fn schematic(&self) -> &Schematic {
700
531194
        self.database.schematic()
701
531194
    }
702
}
703

            
704
#[async_trait]
705
impl AsyncLowLevelConnection for AsyncDatabase {
706
635465
    async fn apply_transaction(
707
635465
        &self,
708
635465
        transaction: Transaction,
709
635465
    ) -> Result<Vec<OperationResult>, bonsaidb_core::Error> {
710
635465
        let task_self = self.clone();
711
635465
        self.runtime
712
635465
            .spawn_blocking(move || task_self.database.apply_transaction(transaction))
713
629323
            .await
714
635465
            .map_err(Error::from)?
715
1270930
    }
716

            
717
593109
    async fn get_from_collection(
718
593109
        &self,
719
593109
        id: DocumentId,
720
593109
        collection: &CollectionName,
721
593109
    ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error> {
722
593109
        let task_self = self.clone();
723
593109
        let collection = collection.clone();
724
593109
        self.runtime
725
593109
            .spawn_blocking(move || task_self.database.get_from_collection(id, &collection))
726
357937
            .await
727
593109
            .map_err(Error::from)?
728
1186218
    }
729

            
730
600
    async fn list_from_collection(
731
600
        &self,
732
600
        ids: Range<DocumentId>,
733
600
        order: Sort,
734
600
        limit: Option<u32>,
735
600
        collection: &CollectionName,
736
600
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
737
600
        let task_self = self.clone();
738
600
        let collection = collection.clone();
739
600
        self.runtime
740
600
            .spawn_blocking(move || {
741
600
                task_self
742
600
                    .database
743
600
                    .list_from_collection(ids, order, limit, &collection)
744
600
            })
745
563
            .await
746
600
            .map_err(Error::from)?
747
1200
    }
748

            
749
150
    async fn list_headers_from_collection(
750
150
        &self,
751
150
        ids: Range<DocumentId>,
752
150
        order: Sort,
753
150
        limit: Option<u32>,
754
150
        collection: &CollectionName,
755
150
    ) -> Result<Vec<Header>, bonsaidb_core::Error> {
756
150
        let task_self = self.clone();
757
150
        let collection = collection.clone();
758
150
        self.runtime
759
150
            .spawn_blocking(move || {
760
150
                task_self
761
150
                    .database
762
150
                    .list_headers_from_collection(ids, order, limit, &collection)
763
150
            })
764
150
            .await
765
150
            .map_err(Error::from)?
766
300
    }
767

            
768
300
    async fn count_from_collection(
769
300
        &self,
770
300
        ids: Range<DocumentId>,
771
300
        collection: &CollectionName,
772
300
    ) -> Result<u64, bonsaidb_core::Error> {
773
300
        let task_self = self.clone();
774
300
        let collection = collection.clone();
775
300
        self.runtime
776
300
            .spawn_blocking(move || task_self.database.count_from_collection(ids, &collection))
777
300
            .await
778
300
            .map_err(Error::from)?
779
600
    }
780

            
781
355882
    async fn get_multiple_from_collection(
782
355882
        &self,
783
355882
        ids: &[DocumentId],
784
355882
        collection: &CollectionName,
785
355882
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
786
355882
        let task_self = self.clone();
787
355882
        // TODO avoid the allocation here, switch to IntoIterator.
788
355882
        let ids = ids.to_vec();
789
355882
        let collection = collection.clone();
790
355882
        self.runtime
791
355882
            .spawn_blocking(move || {
792
355882
                task_self
793
355882
                    .database
794
355882
                    .get_multiple_from_collection(&ids, &collection)
795
355882
            })
796
216651
            .await
797
355882
            .map_err(Error::from)?
798
711764
    }
799

            
800
111
    async fn compact_collection_by_name(
801
111
        &self,
802
111
        collection: CollectionName,
803
111
    ) -> Result<(), bonsaidb_core::Error> {
804
111
        let task_self = self.clone();
805
111
        self.runtime
806
111
            .spawn_blocking(move || task_self.database.compact_collection_by_name(collection))
807
111
            .await
808
111
            .map_err(Error::from)?
809
222
    }
810

            
811
365895
    async fn query_by_name(
812
365895
        &self,
813
365895
        view: &ViewName,
814
365895
        key: Option<SerializedQueryKey>,
815
365895
        order: Sort,
816
365895
        limit: Option<u32>,
817
365895
        access_policy: AccessPolicy,
818
365895
    ) -> Result<Vec<schema::view::map::Serialized>, bonsaidb_core::Error> {
819
365895
        let task_self = self.clone();
820
365895
        let view = view.clone();
821
365895
        self.runtime
822
365895
            .spawn_blocking(move || {
823
365895
                task_self
824
365895
                    .database
825
365895
                    .query_by_name(&view, key, order, limit, access_policy)
826
365895
            })
827
256745
            .await
828
365895
            .map_err(Error::from)?
829
731790
    }
830

            
831
    async fn query_by_name_with_docs(
832
        &self,
833
        view: &ViewName,
834
        key: Option<SerializedQueryKey>,
835
        order: Sort,
836
        limit: Option<u32>,
837
        access_policy: AccessPolicy,
838
    ) -> Result<schema::view::map::MappedSerializedDocuments, bonsaidb_core::Error> {
839
        let task_self = self.clone();
840
        let view = view.clone();
841
        self.runtime
842
            .spawn_blocking(move || {
843
                task_self
844
                    .database
845
                    .query_by_name_with_docs(&view, key, order, limit, access_policy)
846
            })
847
            .await
848
            .map_err(Error::from)?
849
    }
850

            
851
685289
    async fn reduce_by_name(
852
685289
        &self,
853
685289
        view: &ViewName,
854
685289
        key: Option<SerializedQueryKey>,
855
685289
        access_policy: AccessPolicy,
856
685289
    ) -> Result<Vec<u8>, bonsaidb_core::Error> {
857
685289
        let task_self = self.clone();
858
685289
        let view = view.clone();
859
685289
        self.runtime
860
685289
            .spawn_blocking(move || task_self.database.reduce_by_name(&view, key, access_policy))
861
409898
            .await
862
685289
            .map_err(Error::from)?
863
1370578
    }
864

            
865
374
    async fn reduce_grouped_by_name(
866
374
        &self,
867
374
        view: &ViewName,
868
374
        key: Option<SerializedQueryKey>,
869
374
        access_policy: AccessPolicy,
870
374
    ) -> Result<Vec<MappedSerializedValue>, bonsaidb_core::Error> {
871
374
        let task_self = self.clone();
872
374
        let view = view.clone();
873
374
        self.runtime
874
374
            .spawn_blocking(move || {
875
374
                task_self
876
374
                    .database
877
374
                    .reduce_grouped_by_name(&view, key, access_policy)
878
374
            })
879
374
            .await
880
374
            .map_err(Error::from)?
881
748
    }
882

            
883
411
    async fn delete_docs_by_name(
884
411
        &self,
885
411
        view: &ViewName,
886
411
        key: Option<SerializedQueryKey>,
887
411
        access_policy: AccessPolicy,
888
411
    ) -> Result<u64, bonsaidb_core::Error> {
889
411
        let task_self = self.clone();
890
411
        let view = view.clone();
891
411
        self.runtime
892
411
            .spawn_blocking(move || {
893
411
                task_self
894
411
                    .database
895
411
                    .delete_docs_by_name(&view, key, access_policy)
896
411
            })
897
411
            .await
898
411
            .map_err(Error::from)?
899
822
    }
900
}