1
use bonsaidb_client::{AsyncClient, AsyncRemoteDatabase};
2
use bonsaidb_core::async_trait::async_trait;
3
use bonsaidb_core::connection::{
4
    self, AccessPolicy, AsyncConnection, AsyncLowLevelConnection, AsyncStorageConnection,
5
    HasSchema, HasSession, IdentityReference, Range, SerializedQueryKey, Session, Sort,
6
};
7
use bonsaidb_core::document::{DocumentId, Header, OwnedDocument};
8
use bonsaidb_core::schema::view::map::MappedSerializedValue;
9
use bonsaidb_core::schema::{
10
    self, Collection, CollectionName, Nameable, Schema, SchemaName, SchemaSummary, Schematic,
11
    ViewName,
12
};
13
use bonsaidb_core::transaction::{Executed, OperationResult, Transaction};
14
use bonsaidb_server::{Backend, CustomServer, NoBackend, ServerDatabase};
15
use derive_where::derive_where;
16

            
17
/// A local server or a server over a network connection.
18
#[derive_where(Clone, Debug)]
19
pub enum AnyServerConnection<B: Backend> {
20
    /// A local server.
21
    Local(CustomServer<B>),
22
    /// A server accessed with an [`AsyncClient`].
23
    Networked(AsyncClient),
24
}
25

            
26
impl<B: Backend> HasSession for AnyServerConnection<B> {
27
    fn session(&self) -> Option<&Session> {
28
        match self {
29
            Self::Local(server) => server.session(),
30
            Self::Networked(client) => client.session(),
31
        }
32
    }
33
}
34

            
35
#[async_trait]
36
impl<B: Backend> AsyncStorageConnection for AnyServerConnection<B> {
37
    type Authenticated = Self;
38
    type Database = AnyDatabase<B>;
39

            
40
    async fn admin(&self) -> Self::Database {
41
        match self {
42
            Self::Local(server) => AnyDatabase::Local(server.admin().await),
43
            Self::Networked(client) => AnyDatabase::Networked(client.admin().await),
44
        }
45
    }
46

            
47
3
    async fn database<DB: Schema>(
48
3
        &self,
49
3
        name: &str,
50
3
    ) -> Result<Self::Database, bonsaidb_core::Error> {
51
3
        match self {
52
1
            Self::Local(server) => server.database::<DB>(name).await.map(AnyDatabase::Local),
53
2
            Self::Networked(client) => client
54
2
                .database::<DB>(name)
55
                .await
56
2
                .map(AnyDatabase::Networked),
57
        }
58
6
    }
59

            
60
    async fn create_database_with_schema(
61
        &self,
62
        name: &str,
63
        schema: SchemaName,
64
        only_if_needed: bool,
65
    ) -> Result<(), bonsaidb_core::Error> {
66
        match self {
67
            Self::Local(server) => {
68
                server
69
                    .create_database_with_schema(name, schema, only_if_needed)
70
                    .await
71
            }
72
            Self::Networked(client) => {
73
                client
74
                    .create_database_with_schema(name, schema, only_if_needed)
75
                    .await
76
            }
77
        }
78
    }
79

            
80
    async fn delete_database(&self, name: &str) -> Result<(), bonsaidb_core::Error> {
81
        match self {
82
            Self::Local(server) => server.delete_database(name).await,
83
            Self::Networked(client) => client.delete_database(name).await,
84
        }
85
    }
86

            
87
    async fn list_databases(&self) -> Result<Vec<connection::Database>, bonsaidb_core::Error> {
88
        match self {
89
            Self::Local(server) => server.list_databases().await,
90
            Self::Networked(client) => client.list_databases().await,
91
        }
92
    }
93

            
94
    async fn list_available_schemas(&self) -> Result<Vec<SchemaSummary>, bonsaidb_core::Error> {
95
        match self {
96
            Self::Local(server) => server.list_available_schemas().await,
97
            Self::Networked(client) => client.list_available_schemas().await,
98
        }
99
    }
100

            
101
    async fn create_user(&self, username: &str) -> Result<u64, bonsaidb_core::Error> {
102
        match self {
103
            Self::Local(server) => server.create_user(username).await,
104
            Self::Networked(client) => client.create_user(username).await,
105
        }
106
    }
107

            
108
    async fn delete_user<'user, U: Nameable<'user, u64> + Send + Sync>(
109
        &self,
110
        user: U,
111
    ) -> Result<(), bonsaidb_core::Error> {
112
        match self {
113
            Self::Local(server) => server.delete_user(user).await,
114
            Self::Networked(client) => client.delete_user(user).await,
115
        }
116
    }
117

            
118
    #[cfg(feature = "password-hashing")]
119
    async fn set_user_password<'user, U: Nameable<'user, u64> + Send + Sync>(
120
        &self,
121
        user: U,
122
        password: bonsaidb_core::connection::SensitiveString,
123
    ) -> Result<(), bonsaidb_core::Error> {
124
        match self {
125
            Self::Local(server) => server.set_user_password(user, password).await,
126
            Self::Networked(client) => client.set_user_password(user, password).await,
127
        }
128
    }
129

            
130
    #[cfg(any(feature = "token-authentication", feature = "password-hashing"))]
131
    async fn authenticate(
132
        &self,
133
        authentication: bonsaidb_core::connection::Authentication,
134
    ) -> Result<Self::Authenticated, bonsaidb_core::Error> {
135
        match self {
136
            Self::Local(server) => server.authenticate(authentication).await.map(Self::Local),
137
            Self::Networked(client) => client
138
                .authenticate(authentication)
139
                .await
140
                .map(Self::Networked),
141
        }
142
    }
143

            
144
    async fn assume_identity(
145
        &self,
146
        identity: IdentityReference<'_>,
147
    ) -> Result<Self::Authenticated, bonsaidb_core::Error> {
148
        match self {
149
            Self::Local(server) => server.assume_identity(identity).await.map(Self::Local),
150
            Self::Networked(client) => client.assume_identity(identity).await.map(Self::Networked),
151
        }
152
    }
153

            
154
    async fn add_permission_group_to_user<
155
        'user,
156
        'group,
157
        U: Nameable<'user, u64> + Send + Sync,
158
        G: Nameable<'group, u64> + Send + Sync,
159
    >(
160
        &self,
161
        user: U,
162
        permission_group: G,
163
    ) -> Result<(), bonsaidb_core::Error> {
164
        match self {
165
            Self::Local(server) => {
166
                server
167
                    .add_permission_group_to_user(user, permission_group)
168
                    .await
169
            }
170
            Self::Networked(client) => {
171
                client
172
                    .add_permission_group_to_user(user, permission_group)
173
                    .await
174
            }
175
        }
176
    }
177

            
178
    async fn remove_permission_group_from_user<
179
        'user,
180
        'group,
181
        U: Nameable<'user, u64> + Send + Sync,
182
        G: Nameable<'group, u64> + Send + Sync,
183
    >(
184
        &self,
185
        user: U,
186
        permission_group: G,
187
    ) -> Result<(), bonsaidb_core::Error> {
188
        match self {
189
            Self::Local(server) => {
190
                server
191
                    .remove_permission_group_from_user(user, permission_group)
192
                    .await
193
            }
194
            Self::Networked(client) => {
195
                client
196
                    .remove_permission_group_from_user(user, permission_group)
197
                    .await
198
            }
199
        }
200
    }
201

            
202
    async fn add_role_to_user<
203
        'user,
204
        'role,
205
        U: Nameable<'user, u64> + Send + Sync,
206
        R: Nameable<'role, u64> + Send + Sync,
207
    >(
208
        &self,
209
        user: U,
210
        role: R,
211
    ) -> Result<(), bonsaidb_core::Error> {
212
        match self {
213
            Self::Local(server) => server.add_role_to_user(user, role).await,
214
            Self::Networked(client) => client.add_role_to_user(user, role).await,
215
        }
216
    }
217

            
218
    async fn remove_role_from_user<
219
        'user,
220
        'role,
221
        U: Nameable<'user, u64> + Send + Sync,
222
        R: Nameable<'role, u64> + Send + Sync,
223
    >(
224
        &self,
225
        user: U,
226
        role: R,
227
    ) -> Result<(), bonsaidb_core::Error> {
228
        match self {
229
            Self::Local(server) => server.remove_role_from_user(user, role).await,
230
            Self::Networked(client) => client.remove_role_from_user(user, role).await,
231
        }
232
    }
233
}
234

            
235
/// A database connection that can be either from a local server or a server
236
/// over a network connection.
237
#[derive_where(Clone, Debug)]
238
pub enum AnyDatabase<B: Backend = NoBackend> {
239
    /// A local database.
240
    Local(ServerDatabase<B>),
241
    /// A networked database accessed with an [`AsyncRemoteDatabase`].
242
    Networked(AsyncRemoteDatabase),
243
}
244

            
245
impl<B: Backend> HasSession for AnyDatabase<B> {
246
    fn session(&self) -> Option<&Session> {
247
        match self {
248
            Self::Local(server) => server.session(),
249
            Self::Networked(client) => client.session(),
250
        }
251
    }
252
}
253

            
254
#[async_trait]
255
impl<B: Backend> AsyncConnection for AnyDatabase<B> {
256
    type Storage = AnyServerConnection<B>;
257

            
258
    fn storage(&self) -> Self::Storage {
259
        match self {
260
            Self::Local(server) => AnyServerConnection::Local(server.storage()),
261
            Self::Networked(client) => AnyServerConnection::Networked(client.storage()),
262
        }
263
    }
264

            
265
    async fn list_executed_transactions(
266
        &self,
267
        starting_id: Option<u64>,
268
        result_limit: Option<u32>,
269
    ) -> Result<Vec<Executed>, bonsaidb_core::Error> {
270
        match self {
271
            Self::Local(server) => {
272
                server
273
                    .list_executed_transactions(starting_id, result_limit)
274
                    .await
275
            }
276
            Self::Networked(client) => {
277
                client
278
                    .list_executed_transactions(starting_id, result_limit)
279
                    .await
280
            }
281
        }
282
    }
283

            
284
    async fn last_transaction_id(&self) -> Result<Option<u64>, bonsaidb_core::Error> {
285
        match self {
286
            Self::Local(server) => server.last_transaction_id().await,
287
            Self::Networked(client) => client.last_transaction_id().await,
288
        }
289
    }
290

            
291
    async fn compact_collection<C: Collection>(&self) -> Result<(), bonsaidb_core::Error> {
292
        match self {
293
            Self::Local(server) => server.compact_collection::<C>().await,
294
            Self::Networked(client) => client.compact_collection::<C>().await,
295
        }
296
    }
297

            
298
    async fn compact(&self) -> Result<(), bonsaidb_core::Error> {
299
        match self {
300
            Self::Local(server) => server.compact().await,
301
            Self::Networked(client) => client.compact().await,
302
        }
303
    }
304

            
305
    async fn compact_key_value_store(&self) -> Result<(), bonsaidb_core::Error> {
306
        match self {
307
            Self::Local(server) => server.compact_key_value_store().await,
308
            Self::Networked(client) => client.compact_key_value_store().await,
309
        }
310
    }
311
}
312

            
313
#[async_trait]
314
impl<B: Backend> AsyncLowLevelConnection for AnyDatabase<B> {
315
8082
    async fn apply_transaction(
316
8082
        &self,
317
8082
        transaction: Transaction,
318
8082
    ) -> Result<Vec<OperationResult>, bonsaidb_core::Error> {
319
8082
        match self {
320
4041
            Self::Local(server) => server.apply_transaction(transaction).await,
321
4041
            Self::Networked(client) => client.apply_transaction(transaction).await,
322
        }
323
16164
    }
324

            
325
13948
    async fn get_from_collection(
326
13948
        &self,
327
13948
        id: DocumentId,
328
13948
        collection: &CollectionName,
329
13948
    ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error> {
330
13948
        match self {
331
6974
            Self::Local(server) => server.get_from_collection(id, collection).await,
332
6974
            Self::Networked(client) => client.get_from_collection(id, collection).await,
333
        }
334
27896
    }
335

            
336
    async fn list_from_collection(
337
        &self,
338
        ids: Range<DocumentId>,
339
        order: Sort,
340
        limit: Option<u32>,
341
        collection: &CollectionName,
342
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
343
        match self {
344
            Self::Local(server) => {
345
                server
346
                    .list_from_collection(ids, order, limit, collection)
347
                    .await
348
            }
349
            Self::Networked(client) => {
350
                client
351
                    .list_from_collection(ids, order, limit, collection)
352
                    .await
353
            }
354
        }
355
    }
356

            
357
    async fn list_headers_from_collection(
358
        &self,
359
        ids: Range<DocumentId>,
360
        order: Sort,
361
        limit: Option<u32>,
362
        collection: &CollectionName,
363
    ) -> Result<Vec<Header>, bonsaidb_core::Error> {
364
        match self {
365
            Self::Local(server) => {
366
                server
367
                    .list_headers_from_collection(ids, order, limit, collection)
368
                    .await
369
            }
370
            Self::Networked(client) => {
371
                client
372
                    .list_headers_from_collection(ids, order, limit, collection)
373
                    .await
374
            }
375
        }
376
    }
377

            
378
    async fn count_from_collection(
379
        &self,
380
        ids: Range<DocumentId>,
381
        collection: &CollectionName,
382
    ) -> Result<u64, bonsaidb_core::Error> {
383
        match self {
384
            Self::Local(server) => server.count_from_collection(ids, collection).await,
385
            Self::Networked(client) => client.count_from_collection(ids, collection).await,
386
        }
387
    }
388

            
389
9372
    async fn get_multiple_from_collection(
390
9372
        &self,
391
9372
        ids: &[DocumentId],
392
9372
        collection: &CollectionName,
393
9372
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
394
9372
        match self {
395
4686
            Self::Local(server) => server.get_multiple_from_collection(ids, collection).await,
396
4686
            Self::Networked(client) => client.get_multiple_from_collection(ids, collection).await,
397
        }
398
18744
    }
399

            
400
    async fn compact_collection_by_name(
401
        &self,
402
        collection: CollectionName,
403
    ) -> Result<(), bonsaidb_core::Error> {
404
        match self {
405
            Self::Local(server) => server.compact_collection_by_name(collection).await,
406
            Self::Networked(client) => client.compact_collection_by_name(collection).await,
407
        }
408
    }
409

            
410
9372
    async fn query_by_name(
411
9372
        &self,
412
9372
        view: &ViewName,
413
9372
        key: Option<SerializedQueryKey>,
414
9372
        order: Sort,
415
9372
        limit: Option<u32>,
416
9372
        access_policy: AccessPolicy,
417
9372
    ) -> Result<Vec<schema::view::map::Serialized>, bonsaidb_core::Error> {
418
9372
        match self {
419
4686
            Self::Local(server) => {
420
4686
                server
421
4686
                    .query_by_name(view, key, order, limit, access_policy)
422
3909
                    .await
423
            }
424
4686
            Self::Networked(client) => {
425
4686
                client
426
4686
                    .query_by_name(view, key, order, limit, access_policy)
427
4686
                    .await
428
            }
429
        }
430
18744
    }
431

            
432
    async fn query_by_name_with_docs(
433
        &self,
434
        view: &ViewName,
435
        key: Option<SerializedQueryKey>,
436
        order: Sort,
437
        limit: Option<u32>,
438
        access_policy: AccessPolicy,
439
    ) -> Result<schema::view::map::MappedSerializedDocuments, bonsaidb_core::Error> {
440
        match self {
441
            Self::Local(server) => {
442
                server
443
                    .query_by_name_with_docs(view, key, order, limit, access_policy)
444
                    .await
445
            }
446
            Self::Networked(client) => {
447
                client
448
                    .query_by_name_with_docs(view, key, order, limit, access_policy)
449
                    .await
450
            }
451
        }
452
    }
453

            
454
18481
    async fn reduce_by_name(
455
18481
        &self,
456
18481
        view: &ViewName,
457
18481
        key: Option<SerializedQueryKey>,
458
18481
        access_policy: AccessPolicy,
459
18481
    ) -> Result<Vec<u8>, bonsaidb_core::Error> {
460
18481
        match self {
461
9240
            Self::Local(server) => server.reduce_by_name(view, key, access_policy).await,
462
9241
            Self::Networked(client) => client.reduce_by_name(view, key, access_policy).await,
463
        }
464
36962
    }
465

            
466
    async fn reduce_grouped_by_name(
467
        &self,
468
        view: &ViewName,
469
        key: Option<SerializedQueryKey>,
470
        access_policy: AccessPolicy,
471
    ) -> Result<Vec<MappedSerializedValue>, bonsaidb_core::Error> {
472
        match self {
473
            Self::Local(server) => {
474
                server
475
                    .reduce_grouped_by_name(view, key, access_policy)
476
                    .await
477
            }
478
            Self::Networked(client) => {
479
                client
480
                    .reduce_grouped_by_name(view, key, access_policy)
481
                    .await
482
            }
483
        }
484
    }
485

            
486
    async fn delete_docs_by_name(
487
        &self,
488
        view: &ViewName,
489
        key: Option<SerializedQueryKey>,
490
        access_policy: AccessPolicy,
491
    ) -> Result<u64, bonsaidb_core::Error> {
492
        match self {
493
            Self::Local(server) => server.delete_docs_by_name(view, key, access_policy).await,
494
            Self::Networked(client) => client.delete_docs_by_name(view, key, access_policy).await,
495
        }
496
    }
497
}
498

            
499
impl<B: Backend> HasSchema for AnyDatabase<B> {
500
27853
    fn schematic(&self) -> &Schematic {
501
27853
        match self {
502
13926
            Self::Local(server) => server.schematic(),
503
13927
            Self::Networked(client) => client.schematic(),
504
        }
505
27853
    }
506
}