1
use bonsaidb_client::{Client, RemoteDatabase};
2
#[cfg(feature = "password-hashing")]
3
use bonsaidb_core::connection::Authentication;
4
use bonsaidb_core::{
5
    arc_bytes::serde::Bytes,
6
    async_trait::async_trait,
7
    connection::{
8
        self, AccessPolicy, AsyncConnection, AsyncLowLevelConnection, AsyncStorageConnection,
9
        HasSession, IdentityReference, QueryKey, Range, Session, Sort,
10
    },
11
    document::{DocumentId, Header, OwnedDocument},
12
    schema::{
13
        self, view::map::MappedSerializedValue, Collection, CollectionName, Nameable, Schema,
14
        SchemaName, Schematic, ViewName,
15
    },
16
    transaction::{Executed, OperationResult, Transaction},
17
};
18
use bonsaidb_server::{Backend, CustomServer, NoBackend, ServerDatabase};
19

            
20
/// A local server or a server over a network connection.
21
pub enum AnyServerConnection<B: Backend> {
22
    /// A local server.
23
    Local(CustomServer<B>),
24
    /// A server accessed with a [`Client`].
25
    Networked(Client),
26
}
27

            
28
impl<B: Backend> HasSession for AnyServerConnection<B> {
29
    fn session(&self) -> Option<&Session> {
30
        match self {
31
            Self::Local(server) => server.session(),
32
            Self::Networked(client) => client.session(),
33
        }
34
    }
35
}
36

            
37
#[async_trait]
38
impl<B: Backend> AsyncStorageConnection for AnyServerConnection<B> {
39
    type Database = AnyDatabase<B>;
40
    type Authenticated = Self;
41

            
42
    async fn admin(&self) -> Self::Database {
43
        match self {
44
            Self::Local(server) => AnyDatabase::Local(server.admin().await),
45
            Self::Networked(client) => AnyDatabase::Networked(client.admin().await),
46
        }
47
    }
48

            
49
3
    async fn database<DB: Schema>(
50
3
        &self,
51
3
        name: &str,
52
3
    ) -> Result<Self::Database, bonsaidb_core::Error> {
53
3
        match self {
54
1
            Self::Local(server) => server.database::<DB>(name).await.map(AnyDatabase::Local),
55
2
            Self::Networked(client) => client
56
2
                .database::<DB>(name)
57
                .await
58
2
                .map(AnyDatabase::Networked),
59
        }
60
6
    }
61

            
62
    async fn create_database_with_schema(
63
        &self,
64
        name: &str,
65
        schema: SchemaName,
66
        only_if_needed: bool,
67
    ) -> Result<(), bonsaidb_core::Error> {
68
        match self {
69
            Self::Local(server) => {
70
                server
71
                    .create_database_with_schema(name, schema, only_if_needed)
72
                    .await
73
            }
74
            Self::Networked(client) => {
75
                client
76
                    .create_database_with_schema(name, schema, only_if_needed)
77
                    .await
78
            }
79
        }
80
    }
81

            
82
    async fn delete_database(&self, name: &str) -> Result<(), bonsaidb_core::Error> {
83
        match self {
84
            Self::Local(server) => server.delete_database(name).await,
85
            Self::Networked(client) => client.delete_database(name).await,
86
        }
87
    }
88

            
89
    async fn list_databases(&self) -> Result<Vec<connection::Database>, bonsaidb_core::Error> {
90
        match self {
91
            Self::Local(server) => server.list_databases().await,
92
            Self::Networked(client) => client.list_databases().await,
93
        }
94
    }
95

            
96
    async fn list_available_schemas(&self) -> Result<Vec<SchemaName>, bonsaidb_core::Error> {
97
        match self {
98
            Self::Local(server) => server.list_available_schemas().await,
99
            Self::Networked(client) => client.list_available_schemas().await,
100
        }
101
    }
102

            
103
    async fn create_user(&self, username: &str) -> Result<u64, bonsaidb_core::Error> {
104
        match self {
105
            Self::Local(server) => server.create_user(username).await,
106
            Self::Networked(client) => client.create_user(username).await,
107
        }
108
    }
109

            
110
    async fn delete_user<'user, U: Nameable<'user, u64> + Send + Sync>(
111
        &self,
112
        user: U,
113
    ) -> Result<(), bonsaidb_core::Error> {
114
        match self {
115
            Self::Local(server) => server.delete_user(user).await,
116
            Self::Networked(client) => client.delete_user(user).await,
117
        }
118
    }
119

            
120
    #[cfg(feature = "password-hashing")]
121
    async fn set_user_password<'user, U: Nameable<'user, u64> + Send + Sync>(
122
        &self,
123
        user: U,
124
        password: bonsaidb_core::connection::SensitiveString,
125
    ) -> Result<(), bonsaidb_core::Error> {
126
        match self {
127
            Self::Local(server) => server.set_user_password(user, password).await,
128
            Self::Networked(client) => client.set_user_password(user, password).await,
129
        }
130
    }
131

            
132
    #[cfg(feature = "password-hashing")]
133
    async fn authenticate<'user, U: Nameable<'user, u64> + Send + Sync>(
134
        &self,
135
        user: U,
136
        authentication: Authentication,
137
    ) -> Result<Self::Authenticated, bonsaidb_core::Error> {
138
        match self {
139
            Self::Local(server) => server
140
                .authenticate(user, authentication)
141
                .await
142
                .map(Self::Local),
143
            Self::Networked(client) => client
144
                .authenticate(user, authentication)
145
                .await
146
                .map(Self::Networked),
147
        }
148
    }
149

            
150
    async fn assume_identity(
151
        &self,
152
        identity: IdentityReference<'_>,
153
    ) -> Result<Self::Authenticated, bonsaidb_core::Error> {
154
        match self {
155
            Self::Local(server) => server.assume_identity(identity).await.map(Self::Local),
156
            Self::Networked(client) => client.assume_identity(identity).await.map(Self::Networked),
157
        }
158
    }
159

            
160
    async fn add_permission_group_to_user<
161
        'user,
162
        'group,
163
        U: Nameable<'user, u64> + Send + Sync,
164
        G: Nameable<'group, u64> + Send + Sync,
165
    >(
166
        &self,
167
        user: U,
168
        permission_group: G,
169
    ) -> Result<(), bonsaidb_core::Error> {
170
        match self {
171
            Self::Local(server) => {
172
                server
173
                    .add_permission_group_to_user(user, permission_group)
174
                    .await
175
            }
176
            Self::Networked(client) => {
177
                client
178
                    .add_permission_group_to_user(user, permission_group)
179
                    .await
180
            }
181
        }
182
    }
183

            
184
    async fn remove_permission_group_from_user<
185
        'user,
186
        'group,
187
        U: Nameable<'user, u64> + Send + Sync,
188
        G: Nameable<'group, u64> + Send + Sync,
189
    >(
190
        &self,
191
        user: U,
192
        permission_group: G,
193
    ) -> Result<(), bonsaidb_core::Error> {
194
        match self {
195
            Self::Local(server) => {
196
                server
197
                    .remove_permission_group_from_user(user, permission_group)
198
                    .await
199
            }
200
            Self::Networked(client) => {
201
                client
202
                    .remove_permission_group_from_user(user, permission_group)
203
                    .await
204
            }
205
        }
206
    }
207

            
208
    async fn add_role_to_user<
209
        'user,
210
        'role,
211
        U: Nameable<'user, u64> + Send + Sync,
212
        R: Nameable<'role, u64> + Send + Sync,
213
    >(
214
        &self,
215
        user: U,
216
        role: R,
217
    ) -> Result<(), bonsaidb_core::Error> {
218
        match self {
219
            Self::Local(server) => server.add_role_to_user(user, role).await,
220
            Self::Networked(client) => client.add_role_to_user(user, role).await,
221
        }
222
    }
223

            
224
    async fn remove_role_from_user<
225
        'user,
226
        'role,
227
        U: Nameable<'user, u64> + Send + Sync,
228
        R: Nameable<'role, u64> + Send + Sync,
229
    >(
230
        &self,
231
        user: U,
232
        role: R,
233
    ) -> Result<(), bonsaidb_core::Error> {
234
        match self {
235
            Self::Local(server) => server.remove_role_from_user(user, role).await,
236
            Self::Networked(client) => client.remove_role_from_user(user, role).await,
237
        }
238
    }
239
}
240

            
241
/// A database connection that can be either from a local server or a server
242
/// over a network connection.
243
pub enum AnyDatabase<B: Backend = NoBackend> {
244
    /// A local database.
245
    Local(ServerDatabase<B>),
246
    /// A networked database accessed with a [`Client`].
247
    Networked(RemoteDatabase),
248
}
249

            
250
impl<B: Backend> HasSession for AnyDatabase<B> {
251
    fn session(&self) -> Option<&Session> {
252
        match self {
253
            Self::Local(server) => server.session(),
254
            Self::Networked(client) => client.session(),
255
        }
256
    }
257
}
258

            
259
#[async_trait]
260
impl<B: Backend> AsyncConnection for AnyDatabase<B> {
261
    type Storage = AnyServerConnection<B>;
262

            
263
    fn storage(&self) -> Self::Storage {
264
        match self {
265
            Self::Local(server) => AnyServerConnection::Local(server.storage()),
266
            Self::Networked(client) => AnyServerConnection::Networked(client.storage()),
267
        }
268
    }
269

            
270
    async fn list_executed_transactions(
271
        &self,
272
        starting_id: Option<u64>,
273
        result_limit: Option<u32>,
274
    ) -> Result<Vec<Executed>, bonsaidb_core::Error> {
275
        match self {
276
            Self::Local(server) => {
277
                server
278
                    .list_executed_transactions(starting_id, result_limit)
279
                    .await
280
            }
281
            Self::Networked(client) => {
282
                client
283
                    .list_executed_transactions(starting_id, result_limit)
284
                    .await
285
            }
286
        }
287
    }
288

            
289
    async fn last_transaction_id(&self) -> Result<Option<u64>, bonsaidb_core::Error> {
290
        match self {
291
            Self::Local(server) => server.last_transaction_id().await,
292
            Self::Networked(client) => client.last_transaction_id().await,
293
        }
294
    }
295

            
296
    async fn compact_collection<C: Collection>(&self) -> Result<(), bonsaidb_core::Error> {
297
        match self {
298
            Self::Local(server) => server.compact_collection::<C>().await,
299
            Self::Networked(client) => client.compact_collection::<C>().await,
300
        }
301
    }
302

            
303
    async fn compact(&self) -> Result<(), bonsaidb_core::Error> {
304
        match self {
305
            Self::Local(server) => server.compact().await,
306
            Self::Networked(client) => client.compact().await,
307
        }
308
    }
309

            
310
    async fn compact_key_value_store(&self) -> Result<(), bonsaidb_core::Error> {
311
        match self {
312
            Self::Local(server) => server.compact_key_value_store().await,
313
            Self::Networked(client) => client.compact_key_value_store().await,
314
        }
315
    }
316
}
317

            
318
#[async_trait]
319
impl<B: Backend> AsyncLowLevelConnection for AnyDatabase<B> {
320
26821
    fn schematic(&self) -> &Schematic {
321
26821
        match self {
322
13410
            Self::Local(server) => server.schematic(),
323
13411
            Self::Networked(client) => client.schematic(),
324
        }
325
26821
    }
326

            
327
7590
    async fn apply_transaction(
328
7590
        &self,
329
7590
        transaction: Transaction,
330
7590
    ) -> Result<Vec<OperationResult>, bonsaidb_core::Error> {
331
7590
        match self {
332
3795
            Self::Local(server) => server.apply_transaction(transaction).await,
333
3795
            Self::Networked(client) => client.apply_transaction(transaction).await,
334
        }
335
15180
    }
336

            
337
13720
    async fn get_from_collection(
338
13720
        &self,
339
13720
        id: DocumentId,
340
13720
        collection: &CollectionName,
341
13720
    ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error> {
342
13720
        match self {
343
6860
            Self::Local(server) => server.get_from_collection(id, collection).await,
344
6860
            Self::Networked(client) => client.get_from_collection(id, collection).await,
345
        }
346
27440
    }
347

            
348
    async fn list_from_collection(
349
        &self,
350
        ids: Range<DocumentId>,
351
        order: Sort,
352
        limit: Option<u32>,
353
        collection: &CollectionName,
354
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
355
        match self {
356
            Self::Local(server) => {
357
                server
358
                    .list_from_collection(ids, order, limit, collection)
359
                    .await
360
            }
361
            Self::Networked(client) => {
362
                client
363
                    .list_from_collection(ids, order, limit, collection)
364
                    .await
365
            }
366
        }
367
    }
368

            
369
    async fn list_headers_from_collection(
370
        &self,
371
        ids: Range<DocumentId>,
372
        order: Sort,
373
        limit: Option<u32>,
374
        collection: &CollectionName,
375
    ) -> Result<Vec<Header>, bonsaidb_core::Error> {
376
        match self {
377
            Self::Local(server) => {
378
                server
379
                    .list_headers_from_collection(ids, order, limit, collection)
380
                    .await
381
            }
382
            Self::Networked(client) => {
383
                client
384
                    .list_headers_from_collection(ids, order, limit, collection)
385
                    .await
386
            }
387
        }
388
    }
389

            
390
    async fn count_from_collection(
391
        &self,
392
        ids: Range<DocumentId>,
393
        collection: &CollectionName,
394
    ) -> Result<u64, bonsaidb_core::Error> {
395
        match self {
396
            Self::Local(server) => server.count_from_collection(ids, collection).await,
397
            Self::Networked(client) => client.count_from_collection(ids, collection).await,
398
        }
399
    }
400

            
401
8868
    async fn get_multiple_from_collection(
402
8868
        &self,
403
8868
        ids: &[DocumentId],
404
8868
        collection: &CollectionName,
405
8868
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
406
8868
        match self {
407
4434
            Self::Local(server) => server.get_multiple_from_collection(ids, collection).await,
408
4434
            Self::Networked(client) => client.get_multiple_from_collection(ids, collection).await,
409
        }
410
17736
    }
411

            
412
    async fn compact_collection_by_name(
413
        &self,
414
        collection: CollectionName,
415
    ) -> Result<(), bonsaidb_core::Error> {
416
        match self {
417
            Self::Local(server) => server.compact_collection_by_name(collection).await,
418
            Self::Networked(client) => client.compact_collection_by_name(collection).await,
419
        }
420
    }
421

            
422
8868
    async fn query_by_name(
423
8868
        &self,
424
8868
        view: &ViewName,
425
8868
        key: Option<QueryKey<Bytes>>,
426
8868
        order: Sort,
427
8868
        limit: Option<u32>,
428
8868
        access_policy: AccessPolicy,
429
8868
    ) -> Result<Vec<schema::view::map::Serialized>, bonsaidb_core::Error> {
430
8868
        match self {
431
4434
            Self::Local(server) => {
432
4434
                server
433
4434
                    .query_by_name(view, key, order, limit, access_policy)
434
3834
                    .await
435
            }
436
4434
            Self::Networked(client) => {
437
4434
                client
438
4434
                    .query_by_name(view, key, order, limit, access_policy)
439
4434
                    .await
440
            }
441
        }
442
17736
    }
443

            
444
    async fn query_by_name_with_docs(
445
        &self,
446
        view: &ViewName,
447
        key: Option<QueryKey<Bytes>>,
448
        order: Sort,
449
        limit: Option<u32>,
450
        access_policy: AccessPolicy,
451
    ) -> Result<schema::view::map::MappedSerializedDocuments, bonsaidb_core::Error> {
452
        match self {
453
            Self::Local(server) => {
454
                server
455
                    .query_by_name_with_docs(view, key, order, limit, access_policy)
456
                    .await
457
            }
458
            Self::Networked(client) => {
459
                client
460
                    .query_by_name_with_docs(view, key, order, limit, access_policy)
461
                    .await
462
            }
463
        }
464
    }
465

            
466
17953
    async fn reduce_by_name(
467
17953
        &self,
468
17953
        view: &ViewName,
469
17953
        key: Option<QueryKey<Bytes>>,
470
17953
        access_policy: AccessPolicy,
471
17953
    ) -> Result<Vec<u8>, bonsaidb_core::Error> {
472
17953
        match self {
473
8976
            Self::Local(server) => server.reduce_by_name(view, key, access_policy).await,
474
8977
            Self::Networked(client) => client.reduce_by_name(view, key, access_policy).await,
475
        }
476
35906
    }
477

            
478
    async fn reduce_grouped_by_name(
479
        &self,
480
        view: &ViewName,
481
        key: Option<QueryKey<Bytes>>,
482
        access_policy: AccessPolicy,
483
    ) -> Result<Vec<MappedSerializedValue>, bonsaidb_core::Error> {
484
        match self {
485
            Self::Local(server) => {
486
                server
487
                    .reduce_grouped_by_name(view, key, access_policy)
488
                    .await
489
            }
490
            Self::Networked(client) => {
491
                client
492
                    .reduce_grouped_by_name(view, key, access_policy)
493
                    .await
494
            }
495
        }
496
    }
497

            
498
    async fn delete_docs_by_name(
499
        &self,
500
        view: &ViewName,
501
        key: Option<QueryKey<Bytes>>,
502
        access_policy: AccessPolicy,
503
    ) -> Result<u64, bonsaidb_core::Error> {
504
        match self {
505
            Self::Local(server) => server.delete_docs_by_name(view, key, access_policy).await,
506
            Self::Networked(client) => client.delete_docs_by_name(view, key, access_policy).await,
507
        }
508
    }
509
}