1
use std::sync::Arc;
2

            
3
use async_trait::async_trait;
4
#[cfg(feature = "password-hashing")]
5
use bonsaidb_core::connection::Authentication;
6
use bonsaidb_core::{
7
    arc_bytes::serde::Bytes,
8
    connection::{
9
        self, AccessPolicy, AsyncConnection, AsyncLowLevelConnection, AsyncStorageConnection,
10
        Connection, HasSession, IdentityReference, LowLevelConnection, QueryKey, Range, Session,
11
        Sort, StorageConnection,
12
    },
13
    document::{DocumentId, Header, OwnedDocument},
14
    keyvalue::{AsyncKeyValue, KeyOperation, KeyValue, Output},
15
    permissions::Permissions,
16
    pubsub::{self, AsyncPubSub, AsyncSubscriber, PubSub, Receiver},
17
    schema::{
18
        self, view::map::MappedSerializedValue, CollectionName, Nameable, Schema, SchemaName,
19
        Schematic, ViewName,
20
    },
21
    transaction::{self, OperationResult, Transaction},
22
};
23

            
24
use crate::{
25
    config::StorageConfiguration,
26
    database::DatabaseNonBlocking,
27
    storage::{AnyBackupLocation, StorageNonBlocking},
28
    Database, Error, Storage, Subscriber,
29
};
30

            
31
/// A file-based, multi-database, multi-user database engine. This type is
32
/// designed for use with [Tokio](https://tokio.rs). For blocking
33
/// (non-asynchronous) code, see the [`Storage`] type instead.
34
///
35
/// ## Converting between Blocking and Async Types
36
///
37
/// [`AsyncDatabase`] and [`Database`] can be converted to and from each other
38
/// using:
39
///
40
/// - [`AsyncStorage::into_blocking()`]
41
/// - [`AsyncStorage::to_blocking()`]
42
/// - [`AsyncStorage::as_blocking()`]
43
/// - [`Storage::into_async()`]
44
/// - [`Storage::to_async()`]
45
/// - [`Storage::into_async_with_runtime()`]
46
/// - [`Storage::to_async_with_runtime()`]
47
///
48
/// ## Converting from `AsyncDatabase::open` to `AsyncStorage::open`
49
///
50
/// [`AsyncDatabase::open`](AsyncDatabase::open) is a simple method that uses
51
/// `AsyncStorage` to create a database named `default` with the schema
52
/// provided. These two ways of opening the database are the same:
53
///
54
/// ```rust
55
/// // `bonsaidb_core` is re-exported to `bonsaidb::core` or `bonsaidb_local::core`.
56
/// use bonsaidb_core::{connection::AsyncStorageConnection, schema::Schema};
57
/// // `bonsaidb_local` is re-exported to `bonsaidb::local` if using the omnibus crate.
58
/// use bonsaidb_local::{
59
///     config::{Builder, StorageConfiguration},
60
///     AsyncDatabase, AsyncStorage,
61
/// };
62
/// # async fn open<MySchema: Schema>() -> anyhow::Result<()> {
63
/// // This creates a Storage instance, creates a database, and returns it.
64
/// let db = AsyncDatabase::open::<MySchema>(StorageConfiguration::new("my-db.bonsaidb")).await?;
65
///
66
/// // This is the equivalent code being executed:
67
/// let storage =
68
///     AsyncStorage::open(StorageConfiguration::new("my-db.bonsaidb").with_schema::<MySchema>()?)
69
///         .await?;
70
/// storage.create_database::<MySchema>("default", true).await?;
71
/// let db = storage.database::<MySchema>("default").await?;
72
/// #     Ok(())
73
/// # }
74
/// ```
75
///
76
/// ## Using multiple databases
77
///
78
/// This example shows how to use `AsyncStorage` to create and use multiple
79
/// databases with multiple schemas:
80
///
81
/// ```rust
82
/// use bonsaidb_core::{
83
///     connection::AsyncStorageConnection,
84
///     schema::{Collection, Schema},
85
/// };
86
/// use bonsaidb_local::{
87
///     config::{Builder, StorageConfiguration},
88
///     AsyncStorage,
89
/// };
90
/// use serde::{Deserialize, Serialize};
91
///
92
/// #[derive(Debug, Schema)]
93
/// #[schema(name = "my-schema", collections = [BlogPost, Author])]
94
/// # #[schema(core = bonsaidb_core)]
95
/// struct MySchema;
96
///
97
/// #[derive(Debug, Serialize, Deserialize, Collection)]
98
/// #[collection(name = "blog-posts")]
99
/// # #[collection(core = bonsaidb_core)]
100
/// struct BlogPost {
101
///     pub title: String,
102
///     pub contents: String,
103
///     pub author_id: u64,
104
/// }
105
///
106
/// #[derive(Debug, Serialize, Deserialize, Collection)]
107
/// #[collection(name = "blog-posts")]
108
/// # #[collection(core = bonsaidb_core)]
109
/// struct Author {
110
///     pub name: String,
111
/// }
112
///
113
/// # async fn test_fn() -> Result<(), bonsaidb_core::Error> {
114
/// let storage = AsyncStorage::open(
115
///     StorageConfiguration::new("my-db.bonsaidb")
116
///         .with_schema::<BlogPost>()?
117
///         .with_schema::<MySchema>()?,
118
/// )
119
/// .await?;
120
///
121
/// storage
122
///     .create_database::<BlogPost>("ectons-blog", true)
123
///     .await?;
124
/// let ectons_blog = storage.database::<BlogPost>("ectons-blog").await?;
125
/// storage
126
///     .create_database::<MySchema>("another-db", true)
127
///     .await?;
128
/// let another_db = storage.database::<MySchema>("another-db").await?;
129
///
130
/// #     Ok(())
131
/// # }
132
/// ```
133
181971
#[derive(Debug, Clone)]
134
#[must_use]
135
pub struct AsyncStorage {
136
    pub(crate) storage: Storage,
137
    pub(crate) runtime: Arc<tokio::runtime::Handle>,
138
}
139

            
140
impl AsyncStorage {
141
    /// Creates or opens a multi-database [`AsyncStorage`] with its data stored in `directory`.
142
2612
    pub async fn open(configuration: StorageConfiguration) -> Result<Self, Error> {
143
147
        tokio::task::spawn_blocking(move || Storage::open(configuration))
144
146
            .await?
145
147
            .map(Storage::into_async)
146
147
    }
147

            
148
    /// Restores all data from a previously stored backup `location`.
149
1
    pub async fn restore<L: AnyBackupLocation + 'static>(&self, location: L) -> Result<(), Error> {
150
1
        let task_self = self.clone();
151
1
        self.runtime
152
1
            .spawn_blocking(move || task_self.storage.restore(&location))
153
1
            .await?
154
1
    }
155

            
156
    /// Stores a copy of all data in this instance to `location`.
157
1
    pub async fn backup<L: AnyBackupLocation + 'static>(&self, location: L) -> Result<(), Error> {
158
1
        let task_self = self.clone();
159
1
        self.runtime
160
1
            .spawn_blocking(move || task_self.storage.backup(&location))
161
1
            .await?
162
1
    }
163

            
164
    /// Restricts an unauthenticated instance to having `effective_permissions`.
165
    /// Returns `None` if a session has already been established.
166
    #[must_use]
167
    pub fn with_effective_permissions(&self, effective_permissions: Permissions) -> Option<Self> {
168
        self.storage
169
            .with_effective_permissions(effective_permissions)
170
            .map(|storage| Self {
171
                storage,
172
                runtime: self.runtime.clone(),
173
            })
174
    }
175

            
176
    #[cfg(feature = "internal-apis")]
177
    #[doc(hidden)]
178
2097390
    pub async fn database_without_schema(&self, name: &str) -> Result<AsyncDatabase, Error> {
179
69913
        let name = name.to_owned();
180
69913
        let task_self = self.clone();
181
69913
        self.runtime
182
69914
            .spawn_blocking(move || {
183
69914
                task_self
184
69914
                    .storage
185
69914
                    .database_without_schema(&name)
186
69914
                    .map(Database::into_async)
187
69914
            })
188
48356
            .await?
189
69914
    }
190

            
191
    /// Converts this instance into its blocking version, which is able to be
192
    /// used without async.
193
    pub fn into_blocking(self) -> Storage {
194
        self.storage
195
    }
196

            
197
    /// Converts this instance into its blocking version, which is able to be
198
    /// used without async.
199
    pub fn to_blocking(&self) -> Storage {
200
        self.storage.clone()
201
    }
202

            
203
    /// Returns a reference to this instance's blocking version, which is able
204
    /// to be used without async.
205
    pub fn as_blocking(&self) -> &Storage {
206
        &self.storage
207
    }
208
}
209

            
210
impl<'a> From<&'a AsyncStorage> for Storage {
211
    fn from(storage: &'a AsyncStorage) -> Self {
212
        storage.to_blocking()
213
    }
214
}
215

            
216
impl From<AsyncStorage> for Storage {
217
    fn from(storage: AsyncStorage) -> Self {
218
        storage.into_blocking()
219
    }
220
}
221

            
222
impl StorageNonBlocking for AsyncStorage {
223
4560
    fn path(&self) -> &std::path::Path {
224
4560
        self.storage.path()
225
4560
    }
226
2134080
    fn assume_session(&self, session: Session) -> Result<Self, bonsaidb_core::Error> {
227
2134170
        self.storage.assume_session(session).map(|storage| Self {
228
2134170
            storage,
229
2134170
            runtime: self.runtime.clone(),
230
2134170
        })
231
2134080
    }
232
}
233

            
234
/// A database stored in BonsaiDb. This type is designed for use with
235
/// [Tokio](https://tokio.rs). For blocking (non-asynchronous) code, see the
236
/// [`Database`] type instead.
237
///
238
/// ## Converting between Async and Blocking Types
239
///
240
/// [`AsyncDatabase`] and [`Database`] can be converted to and from each other
241
/// using:
242
///
243
/// - [`AsyncDatabase::into_blocking()`]
244
/// - [`AsyncDatabase::to_blocking()`]
245
/// - [`AsyncDatabase::as_blocking()`]
246
/// - [`Database::into_async()`]
247
/// - [`Database::to_async()`]
248
/// - [`Database::into_async_with_runtime()`]
249
/// - [`Database::to_async_with_runtime()`]
250
///
251
/// ## Using `Database` to create a single database
252
///
253
/// `Database`provides an easy mechanism to open and access a single database:
254
///
255
/// ```rust
256
/// // `bonsaidb_core` is re-exported to `bonsaidb::core` or `bonsaidb_local::core`.
257
/// use bonsaidb_core::schema::Collection;
258
/// // `bonsaidb_local` is re-exported to `bonsaidb::local` if using the omnibus crate.
259
/// use bonsaidb_local::{
260
///     config::{Builder, StorageConfiguration},
261
///     AsyncDatabase,
262
/// };
263
/// use serde::{Deserialize, Serialize};
264
///
265
/// #[derive(Debug, Serialize, Deserialize, Collection)]
266
/// #[collection(name = "blog-posts")]
267
/// # #[collection(core = bonsaidb_core)]
268
/// struct BlogPost {
269
///     pub title: String,
270
///     pub contents: String,
271
/// }
272
///
273
/// # async fn test_fn() -> Result<(), bonsaidb_core::Error> {
274
/// let db = AsyncDatabase::open::<BlogPost>(StorageConfiguration::new("my-db.bonsaidb")).await?;
275
/// #     Ok(())
276
/// # }
277
/// ```
278
///
279
/// Under the hood, this initializes a [`AsyncStorage`] instance pointing at
280
/// "./my-db.bonsaidb". It then returns (or creates) a database named "default"
281
/// with the schema `BlogPost`.
282
///
283
/// In this example, `BlogPost` implements the
284
/// [`Collection`](schema::Collection) trait, and all collections can be used as
285
/// a [`Schema`].
286
3332173
#[derive(Debug, Clone)]
287
pub struct AsyncDatabase {
288
    pub(crate) database: Database,
289
    pub(crate) runtime: Arc<tokio::runtime::Handle>,
290
}
291

            
292
impl AsyncDatabase {
293
    /// Creates a `Storage` with a single-database named "default" with its data stored at `path`.
294
4
    pub async fn open<DB: Schema>(configuration: StorageConfiguration) -> Result<Self, Error> {
295
4
        tokio::task::spawn_blocking(move || {
296
4
            Database::open::<DB>(configuration).map(Database::into_async)
297
4
        })
298
4
        .await?
299
4
    }
300

            
301
    /// Restricts an unauthenticated instance to having `effective_permissions`.
302
    /// Returns `None` if a session has already been established.
303
    #[must_use]
304
    pub fn with_effective_permissions(&self, effective_permissions: Permissions) -> Option<Self> {
305
        self.database
306
            .with_effective_permissions(effective_permissions)
307
            .map(|database| Self {
308
                database,
309
                runtime: self.runtime.clone(),
310
            })
311
    }
312

            
313
    /// Converts this instance into its blocking version, which is able to be
314
    /// used without async.
315
    #[must_use]
316
    pub fn into_blocking(self) -> Database {
317
        self.database
318
    }
319

            
320
    /// Converts this instance into its blocking version, which is able to be
321
    /// used without async.
322
    #[must_use]
323
    pub fn to_blocking(&self) -> Database {
324
        self.database.clone()
325
    }
326

            
327
    /// Returns a reference to this instance's blocking version, which is able
328
    /// to be used without async.
329
    #[must_use]
330
    pub fn as_blocking(&self) -> &Database {
331
        &self.database
332
    }
333
}
334

            
335
impl From<AsyncDatabase> for Database {
336
    fn from(database: AsyncDatabase) -> Self {
337
        database.into_blocking()
338
    }
339
}
340

            
341
impl<'a> From<&'a AsyncDatabase> for Database {
342
    fn from(database: &'a AsyncDatabase) -> Self {
343
        database.to_blocking()
344
    }
345
}
346

            
347
impl DatabaseNonBlocking for AsyncDatabase {
348
    fn name(&self) -> &str {
349
        self.database.name()
350
    }
351
}
352

            
353
impl HasSession for AsyncStorage {
354
2730
    fn session(&self) -> Option<&Session> {
355
2730
        self.storage.session()
356
2730
    }
357
}
358

            
359
#[async_trait]
360
impl AsyncStorageConnection for AsyncStorage {
361
    type Database = AsyncDatabase;
362
    type Authenticated = Self;
363

            
364
60
    async fn admin(&self) -> Self::Database {
365
60
        let task_self = self.clone();
366
60

            
367
60
        self.runtime
368
60
            .spawn_blocking(move || task_self.storage.admin())
369
60
            .await
370
60
            .unwrap()
371
60
            .into_async()
372
120
    }
373

            
374
23322
    async fn create_database_with_schema(
375
23322
        &self,
376
23322
        name: &str,
377
23322
        schema: SchemaName,
378
23322
        only_if_needed: bool,
379
23322
    ) -> Result<(), bonsaidb_core::Error> {
380
23322
        let task_self = self.clone();
381
23322
        let name = name.to_owned();
382
23322
        self.runtime
383
23322
            .spawn_blocking(move || {
384
23322
                StorageConnection::create_database_with_schema(
385
23322
                    &task_self.storage,
386
23322
                    &name,
387
23322
                    schema,
388
23322
                    only_if_needed,
389
23322
                )
390
23322
            })
391
23202
            .await
392
23322
            .map_err(Error::from)?
393
46644
    }
394

            
395
594
    async fn database<DB: Schema>(
396
594
        &self,
397
594
        name: &str,
398
594
    ) -> Result<Self::Database, bonsaidb_core::Error> {
399
594
        let task_self = self.clone();
400
594
        let name = name.to_owned();
401
594
        self.runtime
402
594
            .spawn_blocking(move || {
403
594
                task_self
404
594
                    .storage
405
594
                    .database::<DB>(&name)
406
594
                    .map(Database::into_async)
407
594
            })
408
585
            .await
409
594
            .map_err(Error::from)?
410
1188
    }
411

            
412
15244
    async fn delete_database(&self, name: &str) -> Result<(), bonsaidb_core::Error> {
413
15244
        let task_self = self.clone();
414
15244
        let name = name.to_owned();
415
15244
        self.runtime
416
15244
            .spawn_blocking(move || task_self.storage.delete_database(&name))
417
15244
            .await
418
15244
            .map_err(Error::from)?
419
30488
    }
420

            
421
122
    async fn list_databases(&self) -> Result<Vec<connection::Database>, bonsaidb_core::Error> {
422
122
        let task_self = self.clone();
423
122
        self.runtime
424
122
            .spawn_blocking(move || task_self.storage.list_databases())
425
122
            .await
426
122
            .map_err(Error::from)?
427
244
    }
428

            
429
122
    async fn list_available_schemas(&self) -> Result<Vec<SchemaName>, bonsaidb_core::Error> {
430
122
        let task_self = self.clone();
431
122
        self.runtime
432
122
            .spawn_blocking(move || task_self.storage.list_available_schemas())
433
122
            .await
434
122
            .map_err(Error::from)?
435
244
    }
436

            
437
302
    async fn create_user(&self, username: &str) -> Result<u64, bonsaidb_core::Error> {
438
302
        let task_self = self.clone();
439
302
        let username = username.to_owned();
440
302
        self.runtime
441
302
            .spawn_blocking(move || task_self.storage.create_user(&username))
442
302
            .await
443
302
            .map_err(Error::from)?
444
604
    }
445

            
446
6
    async fn delete_user<'user, U: Nameable<'user, u64> + Send + Sync>(
447
6
        &self,
448
6
        user: U,
449
6
    ) -> Result<(), bonsaidb_core::Error> {
450
6
        let task_self = self.clone();
451
6
        let user = user.name()?.into_owned();
452
6
        self.runtime
453
6
            .spawn_blocking(move || task_self.storage.delete_user(user))
454
6
            .await
455
6
            .map_err(Error::from)?
456
12
    }
457

            
458
    #[cfg(feature = "password-hashing")]
459
3
    async fn set_user_password<'user, U: Nameable<'user, u64> + Send + Sync>(
460
3
        &self,
461
3
        user: U,
462
3
        password: bonsaidb_core::connection::SensitiveString,
463
3
    ) -> Result<(), bonsaidb_core::Error> {
464
3
        let task_self = self.clone();
465
3
        let user = user.name()?.into_owned();
466
3
        self.runtime
467
3
            .spawn_blocking(move || task_self.storage.set_user_password(user, password))
468
3
            .await
469
3
            .map_err(Error::from)?
470
6
    }
471

            
472
    #[cfg(feature = "password-hashing")]
473
5
    async fn authenticate<'user, U: Nameable<'user, u64> + Send + Sync>(
474
5
        &self,
475
5
        user: U,
476
5
        authentication: Authentication,
477
5
    ) -> Result<Self, bonsaidb_core::Error> {
478
5
        let task_self = self.clone();
479
5
        let user = user.name()?.into_owned();
480
5
        self.runtime
481
5
            .spawn_blocking(move || {
482
5
                task_self
483
5
                    .storage
484
5
                    .authenticate(user, authentication)
485
5
                    .map(Storage::into_async)
486
5
            })
487
5
            .await
488
5
            .map_err(Error::from)?
489
10
    }
490

            
491
60
    async fn assume_identity(
492
60
        &self,
493
60
        identity: IdentityReference<'_>,
494
60
    ) -> Result<Self::Authenticated, bonsaidb_core::Error> {
495
60
        let task_self = self.clone();
496
60
        let identity = identity.into_owned();
497
60
        self.runtime
498
60
            .spawn_blocking(move || {
499
60
                task_self
500
60
                    .storage
501
60
                    .assume_identity(identity)
502
60
                    .map(Storage::into_async)
503
60
            })
504
30
            .await
505
60
            .map_err(Error::from)?
506
120
    }
507

            
508
16
    async fn add_permission_group_to_user<
509
16
        'user,
510
16
        'group,
511
16
        U: Nameable<'user, u64> + Send + Sync,
512
16
        G: Nameable<'group, u64> + Send + Sync,
513
16
    >(
514
16
        &self,
515
16
        user: U,
516
16
        permission_group: G,
517
16
    ) -> Result<(), bonsaidb_core::Error> {
518
16
        let task_self = self.clone();
519
16
        let user = user.name()?.into_owned();
520
16
        let group = permission_group.name()?.into_owned();
521
16
        self.runtime
522
16
            .spawn_blocking(move || task_self.storage.add_permission_group_to_user(user, group))
523
16
            .await
524
16
            .map_err(Error::from)?
525
32
    }
526

            
527
12
    async fn remove_permission_group_from_user<
528
12
        'user,
529
12
        'group,
530
12
        U: Nameable<'user, u64> + Send + Sync,
531
12
        G: Nameable<'group, u64> + Send + Sync,
532
12
    >(
533
12
        &self,
534
12
        user: U,
535
12
        permission_group: G,
536
12
    ) -> Result<(), bonsaidb_core::Error> {
537
12
        let task_self = self.clone();
538
12
        let user = user.name()?.into_owned();
539
12
        let group = permission_group.name()?.into_owned();
540
12
        self.runtime
541
12
            .spawn_blocking(move || {
542
12
                task_self
543
12
                    .storage
544
12
                    .remove_permission_group_from_user(user, group)
545
12
            })
546
12
            .await
547
12
            .map_err(Error::from)?
548
24
    }
549

            
550
12
    async fn add_role_to_user<
551
12
        'user,
552
12
        'group,
553
12
        U: Nameable<'user, u64> + Send + Sync,
554
12
        G: Nameable<'group, u64> + Send + Sync,
555
12
    >(
556
12
        &self,
557
12
        user: U,
558
12
        role: G,
559
12
    ) -> Result<(), bonsaidb_core::Error> {
560
12
        let task_self = self.clone();
561
12
        let user = user.name()?.into_owned();
562
12
        let role = role.name()?.into_owned();
563
12
        self.runtime
564
12
            .spawn_blocking(move || task_self.storage.add_role_to_user(user, role))
565
12
            .await
566
12
            .map_err(Error::from)?
567
24
    }
568

            
569
12
    async fn remove_role_from_user<
570
12
        'user,
571
12
        'group,
572
12
        U: Nameable<'user, u64> + Send + Sync,
573
12
        G: Nameable<'group, u64> + Send + Sync,
574
12
    >(
575
12
        &self,
576
12
        user: U,
577
12
        role: G,
578
12
    ) -> Result<(), bonsaidb_core::Error> {
579
12
        let task_self = self.clone();
580
12
        let user = user.name()?.into_owned();
581
12
        let role = role.name()?.into_owned();
582
12
        self.runtime
583
12
            .spawn_blocking(move || task_self.storage.remove_role_from_user(user, role))
584
12
            .await
585
12
            .map_err(Error::from)?
586
24
    }
587
}
588

            
589
impl HasSession for AsyncDatabase {
590
    fn session(&self) -> Option<&Session> {
591
        self.database.session()
592
    }
593
}
594

            
595
#[async_trait]
596
impl AsyncConnection for AsyncDatabase {
597
    type Storage = AsyncStorage;
598

            
599
    fn storage(&self) -> Self::Storage {
600
        AsyncStorage {
601
            storage: self.database.storage(),
602
            runtime: self.runtime.clone(),
603
        }
604
    }
605

            
606
    #[cfg_attr(
607
        feature = "tracing",
608
96768
        tracing::instrument(skip(starting_id, result_limit))
609
    )]
610
    async fn list_executed_transactions(
611
        &self,
612
        starting_id: Option<u64>,
613
        result_limit: Option<u32>,
614
32256
    ) -> Result<Vec<transaction::Executed>, bonsaidb_core::Error> {
615
32256
        let task_self = self.clone();
616
32256
        self.runtime
617
32256
            .spawn_blocking(move || {
618
32256
                task_self
619
32256
                    .database
620
32256
                    .list_executed_transactions(starting_id, result_limit)
621
32256
            })
622
20346
            .await
623
32256
            .map_err(Error::from)?
624
64512
    }
625

            
626
366
    #[cfg_attr(feature = "tracing", tracing::instrument)]
627
122
    async fn last_transaction_id(&self) -> Result<Option<u64>, bonsaidb_core::Error> {
628
122
        Ok(self
629
122
            .database
630
122
            .roots()
631
122
            .transactions()
632
122
            .current_transaction_id())
633
122
    }
634

            
635
366
    #[cfg_attr(feature = "tracing", tracing::instrument)]
636
122
    async fn compact(&self) -> Result<(), bonsaidb_core::Error> {
637
122
        let task_self = self.clone();
638
122
        self.runtime
639
122
            .spawn_blocking(move || Connection::compact(&task_self.database))
640
122
            .await
641
122
            .map_err(Error::from)?
642
244
    }
643

            
644
9
    #[cfg_attr(feature = "tracing", tracing::instrument)]
645
3
    async fn compact_collection<C: schema::Collection>(&self) -> Result<(), bonsaidb_core::Error> {
646
3
        let task_self = self.clone();
647
3
        self.runtime
648
3
            .spawn_blocking(move || Connection::compact_collection::<C>(&task_self.database))
649
3
            .await
650
3
            .map_err(Error::from)?
651
6
    }
652

            
653
366
    #[cfg_attr(feature = "tracing", tracing::instrument)]
654
122
    async fn compact_key_value_store(&self) -> Result<(), bonsaidb_core::Error> {
655
122
        let task_self = self.clone();
656
122
        self.runtime
657
122
            .spawn_blocking(move || Connection::compact_key_value_store(&task_self.database))
658
122
            .await
659
122
            .map_err(Error::from)?
660
244
    }
661
}
662

            
663
#[async_trait]
664
impl AsyncKeyValue for AsyncDatabase {
665
1233548
    async fn execute_key_operation(
666
1233548
        &self,
667
1233548
        op: KeyOperation,
668
1233548
    ) -> Result<Output, bonsaidb_core::Error> {
669
1233548
        let task_self = self.clone();
670
1233548
        self.runtime
671
1233548
            .spawn_blocking(move || KeyValue::execute_key_operation(&task_self.database, op))
672
739099
            .await
673
1233518
            .map_err(Error::from)?
674
2467066
    }
675
}
676

            
677
#[async_trait]
678
impl AsyncPubSub for AsyncDatabase {
679
    type Subscriber = Subscriber;
680

            
681
1036
    async fn create_subscriber(&self) -> Result<Self::Subscriber, bonsaidb_core::Error> {
682
1036
        PubSub::create_subscriber(&self.database)
683
1036
    }
684

            
685
1550
    async fn publish_bytes(
686
1550
        &self,
687
1550
        topic: Vec<u8>,
688
1550
        payload: Vec<u8>,
689
1550
    ) -> Result<(), bonsaidb_core::Error> {
690
1550
        PubSub::publish_bytes(&self.database, topic, payload)
691
1550
    }
692

            
693
6
    async fn publish_bytes_to_all(
694
6
        &self,
695
6
        topics: impl IntoIterator<Item = Vec<u8>> + Send + 'async_trait,
696
6
        payload: Vec<u8>,
697
6
    ) -> Result<(), bonsaidb_core::Error> {
698
6
        PubSub::publish_bytes_to_all(&self.database, topics, payload)
699
6
    }
700
}
701

            
702
#[async_trait]
703
impl AsyncSubscriber for Subscriber {
704
476
    async fn subscribe_to_bytes(&self, topic: Vec<u8>) -> Result<(), bonsaidb_core::Error> {
705
476
        pubsub::Subscriber::subscribe_to_bytes(self, topic)
706
476
    }
707

            
708
32
    async fn unsubscribe_from_bytes(&self, topic: &[u8]) -> Result<(), bonsaidb_core::Error> {
709
32
        pubsub::Subscriber::unsubscribe_from_bytes(self, topic)
710
32
    }
711

            
712
876
    fn receiver(&self) -> &Receiver {
713
876
        pubsub::Subscriber::receiver(self)
714
876
    }
715
}
716

            
717
#[async_trait]
718
impl AsyncLowLevelConnection for AsyncDatabase {
719
412522
    fn schematic(&self) -> &Schematic {
720
412522
        self.database.schematic()
721
412522
    }
722

            
723
1501380
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(transaction)))]
724
    async fn apply_transaction(
725
        &self,
726
        transaction: Transaction,
727
500460
    ) -> Result<Vec<OperationResult>, bonsaidb_core::Error> {
728
500460
        let task_self = self.clone();
729
500460
        self.runtime
730
500460
            .spawn_blocking(move || task_self.database.apply_transaction(transaction))
731
446220
            .await
732
500460
            .map_err(Error::from)?
733
1000920
    }
734

            
735
473458
    async fn get_from_collection(
736
473458
        &self,
737
473458
        id: DocumentId,
738
473458
        collection: &CollectionName,
739
473458
    ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error> {
740
473458
        let task_self = self.clone();
741
473458
        let collection = collection.clone();
742
473458
        self.runtime
743
473458
            .spawn_blocking(move || task_self.database.get_from_collection(id, &collection))
744
354328
            .await
745
473458
            .map_err(Error::from)?
746
946916
    }
747

            
748
488
    async fn list_from_collection(
749
488
        &self,
750
488
        ids: Range<DocumentId>,
751
488
        order: Sort,
752
488
        limit: Option<u32>,
753
488
        collection: &CollectionName,
754
488
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
755
488
        let task_self = self.clone();
756
488
        let collection = collection.clone();
757
488
        self.runtime
758
488
            .spawn_blocking(move || {
759
488
                task_self
760
488
                    .database
761
488
                    .list_from_collection(ids, order, limit, &collection)
762
488
            })
763
398
            .await
764
488
            .map_err(Error::from)?
765
976
    }
766

            
767
122
    async fn list_headers_from_collection(
768
122
        &self,
769
122
        ids: Range<DocumentId>,
770
122
        order: Sort,
771
122
        limit: Option<u32>,
772
122
        collection: &CollectionName,
773
122
    ) -> Result<Vec<Header>, bonsaidb_core::Error> {
774
122
        let task_self = self.clone();
775
122
        let collection = collection.clone();
776
122
        self.runtime
777
122
            .spawn_blocking(move || {
778
122
                task_self
779
122
                    .database
780
122
                    .list_headers_from_collection(ids, order, limit, &collection)
781
122
            })
782
92
            .await
783
122
            .map_err(Error::from)?
784
244
    }
785

            
786
244
    async fn count_from_collection(
787
244
        &self,
788
244
        ids: Range<DocumentId>,
789
244
        collection: &CollectionName,
790
244
    ) -> Result<u64, bonsaidb_core::Error> {
791
244
        let task_self = self.clone();
792
244
        let collection = collection.clone();
793
244
        self.runtime
794
244
            .spawn_blocking(move || task_self.database.count_from_collection(ids, &collection))
795
184
            .await
796
244
            .map_err(Error::from)?
797
488
    }
798

            
799
272296
    async fn get_multiple_from_collection(
800
272296
        &self,
801
272296
        ids: &[DocumentId],
802
272296
        collection: &CollectionName,
803
272296
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
804
272296
        let task_self = self.clone();
805
272296
        // TODO avoid the allocation here, switch to IntoIterator.
806
272296
        let ids = ids.to_vec();
807
272296
        let collection = collection.clone();
808
272296
        self.runtime
809
272296
            .spawn_blocking(move || {
810
272296
                task_self
811
272296
                    .database
812
272296
                    .get_multiple_from_collection(&ids, &collection)
813
272296
            })
814
207496
            .await
815
272296
            .map_err(Error::from)?
816
544592
    }
817

            
818
90
    async fn compact_collection_by_name(
819
90
        &self,
820
90
        collection: CollectionName,
821
90
    ) -> Result<(), bonsaidb_core::Error> {
822
90
        let task_self = self.clone();
823
90
        self.runtime
824
90
            .spawn_blocking(move || task_self.database.compact_collection_by_name(collection))
825
90
            .await
826
90
            .map_err(Error::from)?
827
180
    }
828

            
829
278614
    async fn query_by_name(
830
278614
        &self,
831
278614
        view: &ViewName,
832
278614
        key: Option<QueryKey<Bytes>>,
833
278614
        order: Sort,
834
278614
        limit: Option<u32>,
835
278614
        access_policy: AccessPolicy,
836
278644
    ) -> Result<Vec<schema::view::map::Serialized>, bonsaidb_core::Error> {
837
278644
        let task_self = self.clone();
838
278644
        let view = view.clone();
839
278644
        self.runtime
840
278644
            .spawn_blocking(move || {
841
278644
                task_self
842
278644
                    .database
843
278644
                    .query_by_name(&view, key, order, limit, access_policy)
844
278644
            })
845
234304
            .await
846
278644
            .map_err(Error::from)?
847
557288
    }
848

            
849
    async fn query_by_name_with_docs(
850
        &self,
851
        view: &ViewName,
852
        key: Option<QueryKey<Bytes>>,
853
        order: Sort,
854
        limit: Option<u32>,
855
        access_policy: AccessPolicy,
856
    ) -> Result<schema::view::map::MappedSerializedDocuments, bonsaidb_core::Error> {
857
        let task_self = self.clone();
858
        let view = view.clone();
859
        self.runtime
860
            .spawn_blocking(move || {
861
                task_self
862
                    .database
863
                    .query_by_name_with_docs(&view, key, order, limit, access_policy)
864
            })
865
            .await
866
            .map_err(Error::from)?
867
    }
868

            
869
539410
    async fn reduce_by_name(
870
539410
        &self,
871
539410
        view: &ViewName,
872
539410
        key: Option<QueryKey<Bytes>>,
873
539410
        access_policy: AccessPolicy,
874
539410
    ) -> Result<Vec<u8>, bonsaidb_core::Error> {
875
539410
        let task_self = self.clone();
876
539410
        let view = view.clone();
877
539410
        self.runtime
878
539410
            .spawn_blocking(move || task_self.database.reduce_by_name(&view, key, access_policy))
879
407080
            .await
880
539410
            .map_err(Error::from)?
881
1078820
    }
882

            
883
304
    async fn reduce_grouped_by_name(
884
304
        &self,
885
304
        view: &ViewName,
886
304
        key: Option<QueryKey<Bytes>>,
887
304
        access_policy: AccessPolicy,
888
304
    ) -> Result<Vec<MappedSerializedValue>, bonsaidb_core::Error> {
889
304
        let task_self = self.clone();
890
304
        let view = view.clone();
891
304
        self.runtime
892
304
            .spawn_blocking(move || {
893
304
                task_self
894
304
                    .database
895
304
                    .reduce_grouped_by_name(&view, key, access_policy)
896
304
            })
897
274
            .await
898
304
            .map_err(Error::from)?
899
608
    }
900

            
901
244
    async fn delete_docs_by_name(
902
244
        &self,
903
244
        view: &ViewName,
904
244
        key: Option<QueryKey<Bytes>>,
905
244
        access_policy: AccessPolicy,
906
244
    ) -> Result<u64, bonsaidb_core::Error> {
907
244
        let task_self = self.clone();
908
244
        let view = view.clone();
909
244
        self.runtime
910
244
            .spawn_blocking(move || {
911
244
                task_self
912
244
                    .database
913
244
                    .delete_docs_by_name(&view, key, access_policy)
914
244
            })
915
214
            .await
916
244
            .map_err(Error::from)?
917
488
    }
918
}