1
use std::sync::Arc;
2

            
3
use async_trait::async_trait;
4
use bonsaidb_core::connection::{
5
    self, AccessPolicy, AsyncConnection, AsyncLowLevelConnection, AsyncStorageConnection,
6
    Connection, HasSchema, HasSession, IdentityReference, LowLevelConnection, Range,
7
    SerializedQueryKey, Session, Sort, StorageConnection,
8
};
9
use bonsaidb_core::document::{DocumentId, Header, OwnedDocument};
10
use bonsaidb_core::keyvalue::{AsyncKeyValue, KeyOperation, KeyValue, Output};
11
use bonsaidb_core::permissions::Permissions;
12
use bonsaidb_core::pubsub::{self, AsyncPubSub, AsyncSubscriber, PubSub, Receiver};
13
use bonsaidb_core::schema::view::map::MappedSerializedValue;
14
use bonsaidb_core::schema::{
15
    self, CollectionName, Nameable, Schema, SchemaName, SchemaSummary, Schematic, ViewName,
16
};
17
use bonsaidb_core::transaction::{self, OperationResult, Transaction};
18

            
19
use crate::config::StorageConfiguration;
20
use crate::database::DatabaseNonBlocking;
21
use crate::storage::{AnyBackupLocation, StorageNonBlocking};
22
use crate::{Database, Error, Storage, Subscriber};
23

            
24
/// A file-based, multi-database, multi-user database engine. This type is
25
/// designed for use with [Tokio](https://tokio.rs). For blocking
26
/// (non-asynchronous) code, see the [`Storage`] type instead.
27
///
28
/// ## Converting between Blocking and Async Types
29
///
30
/// [`AsyncDatabase`] and [`Database`] can be converted to and from each other
31
/// using:
32
///
33
/// - [`AsyncStorage::into_blocking()`]
34
/// - [`AsyncStorage::to_blocking()`]
35
/// - [`AsyncStorage::as_blocking()`]
36
/// - [`Storage::into_async()`]
37
/// - [`Storage::to_async()`]
38
/// - [`Storage::into_async_with_runtime()`]
39
/// - [`Storage::to_async_with_runtime()`]
40
///
41
/// ## Converting from `AsyncDatabase::open` to `AsyncStorage::open`
42
///
43
/// [`AsyncDatabase::open`](AsyncDatabase::open) is a simple method that uses
44
/// `AsyncStorage` to create a database named `default` with the schema
45
/// provided. These two ways of opening the database are the same:
46
///
47
/// ```rust
48
/// // `bonsaidb_core` is re-exported to `bonsaidb::core` or `bonsaidb_local::core`.
49
/// use bonsaidb_core::connection::AsyncStorageConnection;
50
/// use bonsaidb_core::schema::Schema;
51
/// // `bonsaidb_local` is re-exported to `bonsaidb::local` if using the omnibus crate.
52
/// use bonsaidb_local::{
53
///     config::{Builder, StorageConfiguration},
54
///     AsyncDatabase, AsyncStorage,
55
/// };
56
/// # async fn open<MySchema: Schema>() -> anyhow::Result<()> {
57
/// // This creates a Storage instance, creates a database, and returns it.
58
/// let db = AsyncDatabase::open::<MySchema>(StorageConfiguration::new("my-db.bonsaidb")).await?;
59
///
60
/// // This is the equivalent code being executed:
61
/// let storage =
62
///     AsyncStorage::open(StorageConfiguration::new("my-db.bonsaidb").with_schema::<MySchema>()?)
63
///         .await?;
64
/// storage.create_database::<MySchema>("default", true).await?;
65
/// let db = storage.database::<MySchema>("default").await?;
66
/// #     Ok(())
67
/// # }
68
/// ```
69
///
70
/// ## Using multiple databases
71
///
72
/// This example shows how to use `AsyncStorage` to create and use multiple
73
/// databases with multiple schemas:
74
///
75
/// ```rust
76
/// use bonsaidb_core::connection::AsyncStorageConnection;
77
/// use bonsaidb_core::schema::{Collection, Schema};
78
/// use bonsaidb_local::config::{Builder, StorageConfiguration};
79
/// use bonsaidb_local::AsyncStorage;
80
/// use serde::{Deserialize, Serialize};
81
///
82
/// #[derive(Debug, Schema)]
83
/// #[schema(name = "my-schema", collections = [BlogPost, Author])]
84
/// # #[schema(core = bonsaidb_core)]
85
/// struct MySchema;
86
///
87
/// #[derive(Debug, Serialize, Deserialize, Collection)]
88
/// #[collection(name = "blog-posts")]
89
/// # #[collection(core = bonsaidb_core)]
90
/// struct BlogPost {
91
///     pub title: String,
92
///     pub contents: String,
93
///     pub author_id: u64,
94
/// }
95
///
96
/// #[derive(Debug, Serialize, Deserialize, Collection)]
97
/// #[collection(name = "blog-posts")]
98
/// # #[collection(core = bonsaidb_core)]
99
/// struct Author {
100
///     pub name: String,
101
/// }
102
///
103
/// # async fn test_fn() -> Result<(), bonsaidb_core::Error> {
104
/// let storage = AsyncStorage::open(
105
///     StorageConfiguration::new("my-db.bonsaidb")
106
///         .with_schema::<BlogPost>()?
107
///         .with_schema::<MySchema>()?,
108
/// )
109
/// .await?;
110
///
111
/// storage
112
///     .create_database::<BlogPost>("ectons-blog", true)
113
///     .await?;
114
/// let ectons_blog = storage.database::<BlogPost>("ectons-blog").await?;
115
/// storage
116
///     .create_database::<MySchema>("another-db", true)
117
///     .await?;
118
/// let another_db = storage.database::<MySchema>("another-db").await?;
119
///
120
/// #     Ok(())
121
/// # }
122
/// ```
123
195891
#[derive(Debug, Clone)]
124
#[must_use]
125
pub struct AsyncStorage {
126
    pub(crate) storage: Storage,
127
    pub(crate) runtime: Arc<tokio::runtime::Handle>,
128
}
129

            
130
impl AsyncStorage {
131
    /// Creates or opens a multi-database [`AsyncStorage`] with its data stored in `directory`.
132
3490
    pub async fn open(configuration: StorageConfiguration) -> Result<Self, Error> {
133
165
        tokio::task::spawn_blocking(move || Storage::open(configuration))
134
165
            .await?
135
165
            .map(Storage::into_async)
136
165
    }
137

            
138
    /// Restores all data from a previously stored backup `location`.
139
1
    pub async fn restore<L: AnyBackupLocation + 'static>(&self, location: L) -> Result<(), Error> {
140
1
        let task_self = self.clone();
141
1
        self.runtime
142
1
            .spawn_blocking(move || task_self.storage.restore(&location))
143
1
            .await?
144
1
    }
145

            
146
    /// Stores a copy of all data in this instance to `location`.
147
1
    pub async fn backup<L: AnyBackupLocation + 'static>(&self, location: L) -> Result<(), Error> {
148
1
        let task_self = self.clone();
149
1
        self.runtime
150
1
            .spawn_blocking(move || task_self.storage.backup(&location))
151
1
            .await?
152
1
    }
153

            
154
    /// Restricts an unauthenticated instance to having `effective_permissions`.
155
    /// Returns `None` if a session has already been established.
156
    #[must_use]
157
    pub fn with_effective_permissions(&self, effective_permissions: Permissions) -> Option<Self> {
158
        self.storage
159
            .with_effective_permissions(effective_permissions)
160
            .map(|storage| Self {
161
                storage,
162
                runtime: self.runtime.clone(),
163
            })
164
    }
165

            
166
    #[cfg(feature = "internal-apis")]
167
    #[doc(hidden)]
168
2580840
    pub async fn database_without_schema(&self, name: &str) -> Result<AsyncDatabase, Error> {
169
71690
        let name = name.to_owned();
170
71690
        let task_self = self.clone();
171
71690
        self.runtime
172
71690
            .spawn_blocking(move || {
173
71690
                task_self
174
71690
                    .storage
175
71690
                    .database_without_schema(&name)
176
71690
                    .map(Database::into_async)
177
71690
            })
178
43421
            .await?
179
71690
    }
180

            
181
    /// Converts this instance into its blocking version, which is able to be
182
    /// used without async.
183
    pub fn into_blocking(self) -> Storage {
184
        self.storage
185
    }
186

            
187
    /// Converts this instance into its blocking version, which is able to be
188
    /// used without async.
189
    pub fn to_blocking(&self) -> Storage {
190
        self.storage.clone()
191
    }
192

            
193
    /// Returns a reference to this instance's blocking version, which is able
194
    /// to be used without async.
195
    pub fn as_blocking(&self) -> &Storage {
196
        &self.storage
197
    }
198
}
199

            
200
impl<'a> From<&'a AsyncStorage> for Storage {
201
    fn from(storage: &'a AsyncStorage) -> Self {
202
        storage.to_blocking()
203
    }
204
}
205

            
206
impl From<AsyncStorage> for Storage {
207
    fn from(storage: AsyncStorage) -> Self {
208
        storage.into_blocking()
209
    }
210
}
211

            
212
impl StorageNonBlocking for AsyncStorage {
213
6048
    fn path(&self) -> &std::path::Path {
214
6048
        self.storage.path()
215
6048
    }
216

            
217
2626452
    fn assume_session(&self, session: Session) -> Result<Self, bonsaidb_core::Error> {
218
2626452
        self.storage.assume_session(session).map(|storage| Self {
219
2626308
            storage,
220
2626308
            runtime: self.runtime.clone(),
221
2626452
        })
222
2626452
    }
223
}
224

            
225
/// A database stored in BonsaiDb. This type is designed for use with
226
/// [Tokio](https://tokio.rs). For blocking (non-asynchronous) code, see the
227
/// [`Database`] type instead.
228
///
229
/// ## Converting between Async and Blocking Types
230
///
231
/// [`AsyncDatabase`] and [`Database`] can be converted to and from each other
232
/// using:
233
///
234
/// - [`AsyncDatabase::into_blocking()`]
235
/// - [`AsyncDatabase::to_blocking()`]
236
/// - [`AsyncDatabase::as_blocking()`]
237
/// - [`Database::into_async()`]
238
/// - [`Database::to_async()`]
239
/// - [`Database::into_async_with_runtime()`]
240
/// - [`Database::to_async_with_runtime()`]
241
///
242
/// ## Using `Database` to create a single database
243
///
244
/// `Database`provides an easy mechanism to open and access a single database:
245
///
246
/// ```rust
247
/// // `bonsaidb_core` is re-exported to `bonsaidb::core` or `bonsaidb_local::core`.
248
/// use bonsaidb_core::schema::Collection;
249
/// // `bonsaidb_local` is re-exported to `bonsaidb::local` if using the omnibus crate.
250
/// use bonsaidb_local::{
251
///     config::{Builder, StorageConfiguration},
252
///     AsyncDatabase,
253
/// };
254
/// use serde::{Deserialize, Serialize};
255
///
256
/// #[derive(Debug, Serialize, Deserialize, Collection)]
257
/// #[collection(name = "blog-posts")]
258
/// # #[collection(core = bonsaidb_core)]
259
/// struct BlogPost {
260
///     pub title: String,
261
///     pub contents: String,
262
/// }
263
///
264
/// # async fn test_fn() -> Result<(), bonsaidb_core::Error> {
265
/// let db = AsyncDatabase::open::<BlogPost>(StorageConfiguration::new("my-db.bonsaidb")).await?;
266
/// #     Ok(())
267
/// # }
268
/// ```
269
///
270
/// Under the hood, this initializes a [`AsyncStorage`] instance pointing at
271
/// "./my-db.bonsaidb". It then returns (or creates) a database named "default"
272
/// with the schema `BlogPost`.
273
///
274
/// In this example, `BlogPost` implements the
275
/// [`Collection`](schema::Collection) trait, and all collections can be used as
276
/// a [`Schema`].
277
4122038
#[derive(Debug, Clone)]
278
pub struct AsyncDatabase {
279
    pub(crate) database: Database,
280
    pub(crate) runtime: Arc<tokio::runtime::Handle>,
281
}
282

            
283
impl AsyncDatabase {
284
    /// Creates a `Storage` with a single-database named "default" with its data stored at `path`.
285
12
    pub async fn open<DB: Schema>(configuration: StorageConfiguration) -> Result<Self, Error> {
286
12
        tokio::task::spawn_blocking(move || {
287
12
            Database::open::<DB>(configuration).map(Database::into_async)
288
12
        })
289
12
        .await?
290
12
    }
291

            
292
    /// Restricts an unauthenticated instance to having `effective_permissions`.
293
    /// Returns `None` if a session has already been established.
294
    #[must_use]
295
    pub fn with_effective_permissions(&self, effective_permissions: Permissions) -> Option<Self> {
296
        self.database
297
            .with_effective_permissions(effective_permissions)
298
            .map(|database| Self {
299
                database,
300
                runtime: self.runtime.clone(),
301
            })
302
    }
303

            
304
    /// Converts this instance into its blocking version, which is able to be
305
    /// used without async.
306
    #[must_use]
307
    pub fn into_blocking(self) -> Database {
308
        self.database
309
    }
310

            
311
    /// Converts this instance into its blocking version, which is able to be
312
    /// used without async.
313
    #[must_use]
314
    pub fn to_blocking(&self) -> Database {
315
        self.database.clone()
316
    }
317

            
318
    /// Returns a reference to this instance's blocking version, which is able
319
    /// to be used without async.
320
    #[must_use]
321
    pub fn as_blocking(&self) -> &Database {
322
        &self.database
323
    }
324
}
325

            
326
impl From<AsyncDatabase> for Database {
327
    fn from(database: AsyncDatabase) -> Self {
328
        database.into_blocking()
329
    }
330
}
331

            
332
impl<'a> From<&'a AsyncDatabase> for Database {
333
    fn from(database: &'a AsyncDatabase) -> Self {
334
        database.to_blocking()
335
    }
336
}
337

            
338
impl DatabaseNonBlocking for AsyncDatabase {
339
    fn name(&self) -> &str {
340
        self.database.name()
341
    }
342
}
343

            
344
impl HasSession for AsyncStorage {
345
3932
    fn session(&self) -> Option<&Session> {
346
3932
        self.storage.session()
347
3932
    }
348
}
349

            
350
#[async_trait]
351
impl AsyncStorageConnection for AsyncStorage {
352
    type Authenticated = Self;
353
    type Database = AsyncDatabase;
354

            
355
72
    async fn admin(&self) -> Self::Database {
356
72
        let task_self = self.clone();
357
72

            
358
72
        self.runtime
359
72
            .spawn_blocking(move || task_self.storage.admin())
360
72
            .await
361
72
            .unwrap()
362
72
            .into_async()
363
144
    }
364

            
365
29024
    async fn create_database_with_schema(
366
29024
        &self,
367
29024
        name: &str,
368
29024
        schema: SchemaName,
369
29024
        only_if_needed: bool,
370
29024
    ) -> Result<(), bonsaidb_core::Error> {
371
29024
        let task_self = self.clone();
372
29024
        let name = name.to_owned();
373
29024
        self.runtime
374
29024
            .spawn_blocking(move || {
375
29024
                StorageConnection::create_database_with_schema(
376
29024
                    &task_self.storage,
377
29024
                    &name,
378
29024
                    schema,
379
29024
                    only_if_needed,
380
29024
                )
381
29024
            })
382
29024
            .await
383
29024
            .map_err(Error::from)?
384
58048
    }
385

            
386
655
    async fn database<DB: Schema>(
387
655
        &self,
388
655
        name: &str,
389
655
    ) -> Result<Self::Database, bonsaidb_core::Error> {
390
655
        let task_self = self.clone();
391
655
        let name = name.to_owned();
392
655
        self.runtime
393
655
            .spawn_blocking(move || {
394
655
                task_self
395
655
                    .storage
396
655
                    .database::<DB>(&name)
397
655
                    .map(Database::into_async)
398
655
            })
399
645
            .await
400
655
            .map_err(Error::from)?
401
1310
    }
402

            
403
18292
    async fn delete_database(&self, name: &str) -> Result<(), bonsaidb_core::Error> {
404
18292
        let task_self = self.clone();
405
18292
        let name = name.to_owned();
406
18292
        self.runtime
407
18292
            .spawn_blocking(move || task_self.storage.delete_database(&name))
408
18292
            .await
409
18292
            .map_err(Error::from)?
410
36584
    }
411

            
412
146
    async fn list_databases(&self) -> Result<Vec<connection::Database>, bonsaidb_core::Error> {
413
146
        let task_self = self.clone();
414
146
        self.runtime
415
146
            .spawn_blocking(move || task_self.storage.list_databases())
416
146
            .await
417
146
            .map_err(Error::from)?
418
292
    }
419

            
420
146
    async fn list_available_schemas(&self) -> Result<Vec<SchemaSummary>, bonsaidb_core::Error> {
421
146
        let task_self = self.clone();
422
146
        self.runtime
423
146
            .spawn_blocking(move || task_self.storage.list_available_schemas())
424
146
            .await
425
146
            .map_err(Error::from)?
426
292
    }
427

            
428
616
    async fn create_user(&self, username: &str) -> Result<u64, bonsaidb_core::Error> {
429
616
        let task_self = self.clone();
430
616
        let username = username.to_owned();
431
616
        self.runtime
432
616
            .spawn_blocking(move || task_self.storage.create_user(&username))
433
616
            .await
434
616
            .map_err(Error::from)?
435
1232
    }
436

            
437
6
    async fn delete_user<'user, U: Nameable<'user, u64> + Send + Sync>(
438
6
        &self,
439
6
        user: U,
440
6
    ) -> Result<(), bonsaidb_core::Error> {
441
6
        let task_self = self.clone();
442
6
        let user = user.name()?.into_owned();
443
6
        self.runtime
444
6
            .spawn_blocking(move || task_self.storage.delete_user(user))
445
6
            .await
446
6
            .map_err(Error::from)?
447
12
    }
448

            
449
    #[cfg(feature = "password-hashing")]
450
5
    async fn set_user_password<'user, U: Nameable<'user, u64> + Send + Sync>(
451
5
        &self,
452
5
        user: U,
453
5
        password: bonsaidb_core::connection::SensitiveString,
454
5
    ) -> Result<(), bonsaidb_core::Error> {
455
5
        let task_self = self.clone();
456
5
        let user = user.name()?.into_owned();
457
5
        self.runtime
458
5
            .spawn_blocking(move || task_self.storage.set_user_password(user, password))
459
5
            .await
460
5
            .map_err(Error::from)?
461
10
    }
462

            
463
    #[cfg(any(feature = "token-authentication", feature = "password-hashing"))]
464
836
    async fn authenticate(
465
836
        &self,
466
836
        authentication: bonsaidb_core::connection::Authentication,
467
836
    ) -> Result<Self, bonsaidb_core::Error> {
468
836
        let task_self = self.clone();
469
836
        self.runtime
470
836
            .spawn_blocking(move || {
471
836
                task_self
472
836
                    .storage
473
836
                    .authenticate(authentication)
474
836
                    .map(Storage::into_async)
475
836
            })
476
836
            .await
477
836
            .map_err(Error::from)?
478
1672
    }
479

            
480
72
    async fn assume_identity(
481
72
        &self,
482
72
        identity: IdentityReference<'_>,
483
72
    ) -> Result<Self::Authenticated, bonsaidb_core::Error> {
484
72
        let task_self = self.clone();
485
72
        let identity = identity.into_owned();
486
72
        self.runtime
487
72
            .spawn_blocking(move || {
488
72
                task_self
489
72
                    .storage
490
72
                    .assume_identity(identity)
491
72
                    .map(Storage::into_async)
492
72
            })
493
72
            .await
494
72
            .map_err(Error::from)?
495
144
    }
496

            
497
16
    async fn add_permission_group_to_user<
498
16
        'user,
499
16
        'group,
500
16
        U: Nameable<'user, u64> + Send + Sync,
501
16
        G: Nameable<'group, u64> + Send + Sync,
502
16
    >(
503
16
        &self,
504
16
        user: U,
505
16
        permission_group: G,
506
16
    ) -> Result<(), bonsaidb_core::Error> {
507
16
        let task_self = self.clone();
508
16
        let user = user.name()?.into_owned();
509
16
        let group = permission_group.name()?.into_owned();
510
16
        self.runtime
511
16
            .spawn_blocking(move || task_self.storage.add_permission_group_to_user(user, group))
512
16
            .await
513
16
            .map_err(Error::from)?
514
32
    }
515

            
516
12
    async fn remove_permission_group_from_user<
517
12
        'user,
518
12
        'group,
519
12
        U: Nameable<'user, u64> + Send + Sync,
520
12
        G: Nameable<'group, u64> + Send + Sync,
521
12
    >(
522
12
        &self,
523
12
        user: U,
524
12
        permission_group: G,
525
12
    ) -> Result<(), bonsaidb_core::Error> {
526
12
        let task_self = self.clone();
527
12
        let user = user.name()?.into_owned();
528
12
        let group = permission_group.name()?.into_owned();
529
12
        self.runtime
530
12
            .spawn_blocking(move || {
531
12
                task_self
532
12
                    .storage
533
12
                    .remove_permission_group_from_user(user, group)
534
12
            })
535
12
            .await
536
12
            .map_err(Error::from)?
537
24
    }
538

            
539
12
    async fn add_role_to_user<
540
12
        'user,
541
12
        'group,
542
12
        U: Nameable<'user, u64> + Send + Sync,
543
12
        G: Nameable<'group, u64> + Send + Sync,
544
12
    >(
545
12
        &self,
546
12
        user: U,
547
12
        role: G,
548
12
    ) -> Result<(), bonsaidb_core::Error> {
549
12
        let task_self = self.clone();
550
12
        let user = user.name()?.into_owned();
551
12
        let role = role.name()?.into_owned();
552
12
        self.runtime
553
12
            .spawn_blocking(move || task_self.storage.add_role_to_user(user, role))
554
12
            .await
555
12
            .map_err(Error::from)?
556
24
    }
557

            
558
12
    async fn remove_role_from_user<
559
12
        'user,
560
12
        'group,
561
12
        U: Nameable<'user, u64> + Send + Sync,
562
12
        G: Nameable<'group, u64> + Send + Sync,
563
12
    >(
564
12
        &self,
565
12
        user: U,
566
12
        role: G,
567
12
    ) -> Result<(), bonsaidb_core::Error> {
568
12
        let task_self = self.clone();
569
12
        let user = user.name()?.into_owned();
570
12
        let role = role.name()?.into_owned();
571
12
        self.runtime
572
12
            .spawn_blocking(move || task_self.storage.remove_role_from_user(user, role))
573
12
            .await
574
12
            .map_err(Error::from)?
575
24
    }
576
}
577

            
578
impl HasSession for AsyncDatabase {
579
    fn session(&self) -> Option<&Session> {
580
        self.database.session()
581
    }
582
}
583

            
584
#[async_trait]
585
impl AsyncConnection for AsyncDatabase {
586
    type Storage = AsyncStorage;
587

            
588
    fn storage(&self) -> Self::Storage {
589
        AsyncStorage {
590
            storage: self.database.storage(),
591
            runtime: self.runtime.clone(),
592
        }
593
    }
594

            
595
38700
    async fn list_executed_transactions(
596
38700
        &self,
597
38700
        starting_id: Option<u64>,
598
38700
        result_limit: Option<u32>,
599
38700
    ) -> Result<Vec<transaction::Executed>, bonsaidb_core::Error> {
600
38700
        let task_self = self.clone();
601
38700
        self.runtime
602
38700
            .spawn_blocking(move || {
603
38700
                task_self
604
38700
                    .database
605
38700
                    .list_executed_transactions(starting_id, result_limit)
606
38700
            })
607
17928
            .await
608
38700
            .map_err(Error::from)?
609
77400
    }
610

            
611
146
    async fn last_transaction_id(&self) -> Result<Option<u64>, bonsaidb_core::Error> {
612
146
        Ok(self
613
146
            .database
614
146
            .roots()
615
146
            .transactions()
616
146
            .current_transaction_id())
617
146
    }
618

            
619
146
    async fn compact(&self) -> Result<(), bonsaidb_core::Error> {
620
146
        let task_self = self.clone();
621
146
        self.runtime
622
146
            .spawn_blocking(move || Connection::compact(&task_self.database))
623
146
            .await
624
146
            .map_err(Error::from)?
625
292
    }
626

            
627
3
    async fn compact_collection<C: schema::Collection>(&self) -> Result<(), bonsaidb_core::Error> {
628
3
        let task_self = self.clone();
629
3
        self.runtime
630
3
            .spawn_blocking(move || Connection::compact_collection::<C>(&task_self.database))
631
3
            .await
632
3
            .map_err(Error::from)?
633
6
    }
634

            
635
146
    async fn compact_key_value_store(&self) -> Result<(), bonsaidb_core::Error> {
636
146
        let task_self = self.clone();
637
146
        self.runtime
638
146
            .spawn_blocking(move || Connection::compact_key_value_store(&task_self.database))
639
146
            .await
640
146
            .map_err(Error::from)?
641
292
    }
642
}
643

            
644
#[async_trait]
645
impl AsyncKeyValue for AsyncDatabase {
646
1476218
    async fn execute_key_operation(
647
1476218
        &self,
648
1476218
        op: KeyOperation,
649
1476218
    ) -> Result<Output, bonsaidb_core::Error> {
650
1476218
        let task_self = self.clone();
651
1476218
        self.runtime
652
1476218
            .spawn_blocking(move || KeyValue::execute_key_operation(&task_self.database, op))
653
836079
            .await
654
1476218
            .map_err(Error::from)?
655
2952436
    }
656
}
657

            
658
#[async_trait]
659
impl AsyncPubSub for AsyncDatabase {
660
    type Subscriber = Subscriber;
661

            
662
1240
    async fn create_subscriber(&self) -> Result<Self::Subscriber, bonsaidb_core::Error> {
663
1240
        PubSub::create_subscriber(&self.database)
664
1240
    }
665

            
666
1856
    async fn publish_bytes(
667
1856
        &self,
668
1856
        topic: Vec<u8>,
669
1856
        payload: Vec<u8>,
670
1856
    ) -> Result<(), bonsaidb_core::Error> {
671
1856
        PubSub::publish_bytes(&self.database, topic, payload)
672
1856
    }
673

            
674
6
    async fn publish_bytes_to_all(
675
6
        &self,
676
6
        topics: impl IntoIterator<Item = Vec<u8>> + Send + 'async_trait,
677
6
        payload: Vec<u8>,
678
6
    ) -> Result<(), bonsaidb_core::Error> {
679
6
        PubSub::publish_bytes_to_all(&self.database, topics, payload)
680
6
    }
681
}
682

            
683
#[async_trait]
684
impl AsyncSubscriber for Subscriber {
685
566
    async fn subscribe_to_bytes(&self, topic: Vec<u8>) -> Result<(), bonsaidb_core::Error> {
686
566
        pubsub::Subscriber::subscribe_to_bytes(self, topic)
687
566
    }
688

            
689
38
    async fn unsubscribe_from_bytes(&self, topic: &[u8]) -> Result<(), bonsaidb_core::Error> {
690
38
        pubsub::Subscriber::unsubscribe_from_bytes(self, topic)
691
38
    }
692

            
693
1044
    fn receiver(&self) -> &Receiver {
694
1044
        pubsub::Subscriber::receiver(self)
695
1044
    }
696
}
697

            
698
impl HasSchema for AsyncDatabase {
699
528504
    fn schematic(&self) -> &Schematic {
700
528504
        self.database.schematic()
701
528504
    }
702
}
703

            
704
#[async_trait]
705
impl AsyncLowLevelConnection for AsyncDatabase {
706
614604
    async fn apply_transaction(
707
614604
        &self,
708
614604
        transaction: Transaction,
709
614604
    ) -> Result<Vec<OperationResult>, bonsaidb_core::Error> {
710
614604
        let task_self = self.clone();
711
614604
        self.runtime
712
614604
            .spawn_blocking(move || task_self.database.apply_transaction(transaction))
713
604488
            .await
714
614604
            .map_err(Error::from)?
715
1229208
    }
716

            
717
594648
    async fn get_from_collection(
718
594648
        &self,
719
594648
        id: DocumentId,
720
594648
        collection: &CollectionName,
721
594648
    ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error> {
722
594648
        let task_self = self.clone();
723
594648
        let collection = collection.clone();
724
594648
        self.runtime
725
594648
            .spawn_blocking(move || task_self.database.get_from_collection(id, &collection))
726
320004
            .await
727
594648
            .map_err(Error::from)?
728
1189296
    }
729

            
730
584
    async fn list_from_collection(
731
584
        &self,
732
584
        ids: Range<DocumentId>,
733
584
        order: Sort,
734
584
        limit: Option<u32>,
735
584
        collection: &CollectionName,
736
584
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
737
584
        let task_self = self.clone();
738
584
        let collection = collection.clone();
739
584
        self.runtime
740
584
            .spawn_blocking(move || {
741
584
                task_self
742
584
                    .database
743
584
                    .list_from_collection(ids, order, limit, &collection)
744
584
            })
745
512
            .await
746
584
            .map_err(Error::from)?
747
1168
    }
748

            
749
146
    async fn list_headers_from_collection(
750
146
        &self,
751
146
        ids: Range<DocumentId>,
752
146
        order: Sort,
753
146
        limit: Option<u32>,
754
146
        collection: &CollectionName,
755
146
    ) -> Result<Vec<Header>, bonsaidb_core::Error> {
756
146
        let task_self = self.clone();
757
146
        let collection = collection.clone();
758
146
        self.runtime
759
146
            .spawn_blocking(move || {
760
146
                task_self
761
146
                    .database
762
146
                    .list_headers_from_collection(ids, order, limit, &collection)
763
146
            })
764
146
            .await
765
146
            .map_err(Error::from)?
766
292
    }
767

            
768
292
    async fn count_from_collection(
769
292
        &self,
770
292
        ids: Range<DocumentId>,
771
292
        collection: &CollectionName,
772
292
    ) -> Result<u64, bonsaidb_core::Error> {
773
292
        let task_self = self.clone();
774
292
        let collection = collection.clone();
775
292
        self.runtime
776
292
            .spawn_blocking(move || task_self.database.count_from_collection(ids, &collection))
777
292
            .await
778
292
            .map_err(Error::from)?
779
584
    }
780

            
781
349144
    async fn get_multiple_from_collection(
782
349144
        &self,
783
349144
        ids: &[DocumentId],
784
349144
        collection: &CollectionName,
785
349144
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
786
349144
        let task_self = self.clone();
787
349144
        // TODO avoid the allocation here, switch to IntoIterator.
788
349144
        let ids = ids.to_vec();
789
349144
        let collection = collection.clone();
790
349144
        self.runtime
791
349144
            .spawn_blocking(move || {
792
349144
                task_self
793
349144
                    .database
794
349144
                    .get_multiple_from_collection(&ids, &collection)
795
349144
            })
796
200464
            .await
797
349144
            .map_err(Error::from)?
798
698288
    }
799

            
800
108
    async fn compact_collection_by_name(
801
108
        &self,
802
108
        collection: CollectionName,
803
108
    ) -> Result<(), bonsaidb_core::Error> {
804
108
        let task_self = self.clone();
805
108
        self.runtime
806
108
            .spawn_blocking(move || task_self.database.compact_collection_by_name(collection))
807
108
            .await
808
108
            .map_err(Error::from)?
809
216
    }
810

            
811
358888
    async fn query_by_name(
812
358888
        &self,
813
358888
        view: &ViewName,
814
358888
        key: Option<SerializedQueryKey>,
815
358888
        order: Sort,
816
358888
        limit: Option<u32>,
817
358888
        access_policy: AccessPolicy,
818
358888
    ) -> Result<Vec<schema::view::map::Serialized>, bonsaidb_core::Error> {
819
358888
        let task_self = self.clone();
820
358888
        let view = view.clone();
821
358888
        self.runtime
822
358888
            .spawn_blocking(move || {
823
358888
                task_self
824
358888
                    .database
825
358888
                    .query_by_name(&view, key, order, limit, access_policy)
826
358888
            })
827
247792
            .await
828
358888
            .map_err(Error::from)?
829
717776
    }
830

            
831
    async fn query_by_name_with_docs(
832
        &self,
833
        view: &ViewName,
834
        key: Option<SerializedQueryKey>,
835
        order: Sort,
836
        limit: Option<u32>,
837
        access_policy: AccessPolicy,
838
    ) -> Result<schema::view::map::MappedSerializedDocuments, bonsaidb_core::Error> {
839
        let task_self = self.clone();
840
        let view = view.clone();
841
        self.runtime
842
            .spawn_blocking(move || {
843
                task_self
844
                    .database
845
                    .query_by_name_with_docs(&view, key, order, limit, access_policy)
846
            })
847
            .await
848
            .map_err(Error::from)?
849
    }
850

            
851
687216
    async fn reduce_by_name(
852
687216
        &self,
853
687216
        view: &ViewName,
854
687216
        key: Option<SerializedQueryKey>,
855
687216
        access_policy: AccessPolicy,
856
687216
    ) -> Result<Vec<u8>, bonsaidb_core::Error> {
857
687216
        let task_self = self.clone();
858
687216
        let view = view.clone();
859
687216
        self.runtime
860
687216
            .spawn_blocking(move || task_self.database.reduce_by_name(&view, key, access_policy))
861
423084
            .await
862
687216
            .map_err(Error::from)?
863
1374432
    }
864

            
865
364
    async fn reduce_grouped_by_name(
866
364
        &self,
867
364
        view: &ViewName,
868
364
        key: Option<SerializedQueryKey>,
869
364
        access_policy: AccessPolicy,
870
364
    ) -> Result<Vec<MappedSerializedValue>, bonsaidb_core::Error> {
871
364
        let task_self = self.clone();
872
364
        let view = view.clone();
873
364
        self.runtime
874
364
            .spawn_blocking(move || {
875
364
                task_self
876
364
                    .database
877
364
                    .reduce_grouped_by_name(&view, key, access_policy)
878
364
            })
879
364
            .await
880
364
            .map_err(Error::from)?
881
728
    }
882

            
883
400
    async fn delete_docs_by_name(
884
400
        &self,
885
400
        view: &ViewName,
886
400
        key: Option<SerializedQueryKey>,
887
400
        access_policy: AccessPolicy,
888
400
    ) -> Result<u64, bonsaidb_core::Error> {
889
400
        let task_self = self.clone();
890
400
        let view = view.clone();
891
400
        self.runtime
892
400
            .spawn_blocking(move || {
893
400
                task_self
894
400
                    .database
895
400
                    .delete_docs_by_name(&view, key, access_policy)
896
400
            })
897
400
            .await
898
400
            .map_err(Error::from)?
899
800
    }
900
}