1
use std::sync::Arc;
2

            
3
use bonsaidb_core::{
4
    admin::{Admin, ADMIN_DATABASE_NAME},
5
    arc_bytes::serde::Bytes,
6
    connection::{
7
        Connection, Database, IdentityReference, LowLevelConnection, Range, Sort, StorageConnection,
8
    },
9
    document::{DocumentId, Header, OwnedDocument},
10
    keyvalue::KeyValue,
11
    networking::{
12
        AlterUserPermissionGroupMembership, AlterUserRoleMembership, ApplyTransaction,
13
        AssumeIdentity, Compact, CompactCollection, CompactKeyValueStore, Count, CreateDatabase,
14
        CreateSubscriber, CreateUser, DeleteDatabase, DeleteDocs, DeleteUser, ExecuteKeyOperation,
15
        Get, GetMultiple, LastTransactionId, List, ListAvailableSchemas, ListDatabases,
16
        ListExecutedTransactions, ListHeaders, Publish, PublishToAll, Query, QueryWithDocs, Reduce,
17
        ReduceGrouped, SubscribeTo, UnsubscribeFrom,
18
    },
19
    pubsub::{AsyncSubscriber, PubSub, Receiver, Subscriber},
20
    schema::{CollectionName, Schematic},
21
};
22
use futures::Future;
23
use tokio::{
24
    runtime::{Handle, Runtime},
25
    sync::oneshot,
26
    task::JoinHandle,
27
};
28

            
29
use crate::{Client, RemoteDatabase, RemoteSubscriber};
30

            
31
impl StorageConnection for Client {
32
    type Database = RemoteDatabase;
33
    type Authenticated = Self;
34

            
35
    fn admin(&self) -> Self::Database {
36
        self.database::<Admin>(ADMIN_DATABASE_NAME).unwrap()
37
    }
38

            
39
65
    fn database<DB: bonsaidb_core::schema::Schema>(
40
65
        &self,
41
65
        name: &str,
42
65
    ) -> Result<Self::Database, bonsaidb_core::Error> {
43
65
        self.database::<DB>(name)
44
65
    }
45

            
46
    fn create_database_with_schema(
47
        &self,
48
        name: &str,
49
        schema: bonsaidb_core::schema::SchemaName,
50
        only_if_needed: bool,
51
    ) -> Result<(), bonsaidb_core::Error> {
52
684
        self.send_api_request(&CreateDatabase {
53
684
            database: Database {
54
684
                name: name.to_string(),
55
684
                schema,
56
684
            },
57
684
            only_if_needed,
58
684
        })?;
59
627
        Ok(())
60
684
    }
61

            
62
    fn delete_database(&self, name: &str) -> Result<(), bonsaidb_core::Error> {
63
38
        self.send_api_request(&DeleteDatabase {
64
38
            name: name.to_string(),
65
38
        })?;
66
19
        Ok(())
67
38
    }
68

            
69
19
    fn list_databases(
70
19
        &self,
71
19
    ) -> Result<Vec<bonsaidb_core::connection::Database>, bonsaidb_core::Error> {
72
19
        Ok(self.send_api_request(&ListDatabases)?)
73
19
    }
74

            
75
19
    fn list_available_schemas(
76
19
        &self,
77
19
    ) -> Result<Vec<bonsaidb_core::schema::SchemaName>, bonsaidb_core::Error> {
78
19
        Ok(self.send_api_request(&ListAvailableSchemas)?)
79
19
    }
80

            
81
19
    fn create_user(&self, username: &str) -> Result<u64, bonsaidb_core::Error> {
82
19
        Ok(self.send_api_request(&CreateUser {
83
19
            username: username.to_string(),
84
19
        })?)
85
19
    }
86

            
87
1
    fn delete_user<'user, U: bonsaidb_core::schema::Nameable<'user, u64> + Send + Sync>(
88
1
        &self,
89
1
        user: U,
90
1
    ) -> Result<(), bonsaidb_core::Error> {
91
1
        Ok(self.send_api_request(&DeleteUser {
92
1
            user: user.name()?.into_owned(),
93
        })?)
94
1
    }
95

            
96
    #[cfg(feature = "password-hashing")]
97
    fn set_user_password<'user, U: bonsaidb_core::schema::Nameable<'user, u64> + Send + Sync>(
98
        &self,
99
        user: U,
100
        password: bonsaidb_core::connection::SensitiveString,
101
    ) -> Result<(), bonsaidb_core::Error> {
102
        use bonsaidb_core::networking::SetUserPassword;
103

            
104
        Ok(self.send_api_request(&SetUserPassword {
105
            user: user.name()?.into_owned(),
106
            password,
107
        })?)
108
    }
109

            
110
    #[cfg(feature = "password-hashing")]
111
    fn authenticate<'user, U: bonsaidb_core::schema::Nameable<'user, u64> + Send + Sync>(
112
        &self,
113
        user: U,
114
        authentication: bonsaidb_core::connection::Authentication,
115
    ) -> Result<Self::Authenticated, bonsaidb_core::Error> {
116
        let session = self.send_api_request(&bonsaidb_core::networking::Authenticate {
117
            user: user.name()?.into_owned(),
118
            authentication,
119
        })?;
120
        Ok(Self {
121
            data: self.data.clone(),
122
            session: Arc::new(session),
123
        })
124
    }
125

            
126
    fn assume_identity(
127
        &self,
128
        identity: IdentityReference<'_>,
129
    ) -> Result<Self::Authenticated, bonsaidb_core::Error> {
130
        let session = self.send_api_request(&AssumeIdentity(identity.into_owned()))?;
131
        Ok(Self {
132
            data: self.data.clone(),
133
            session: Arc::new(session),
134
        })
135
    }
136

            
137
2
    fn add_permission_group_to_user<
138
2
        'user,
139
2
        'group,
140
2
        U: bonsaidb_core::schema::Nameable<'user, u64> + Send + Sync,
141
2
        G: bonsaidb_core::schema::Nameable<'group, u64> + Send + Sync,
142
2
    >(
143
2
        &self,
144
2
        user: U,
145
2
        permission_group: G,
146
2
    ) -> Result<(), bonsaidb_core::Error> {
147
2
        self.send_api_request(&AlterUserPermissionGroupMembership {
148
2
            user: user.name()?.into_owned(),
149
2
            group: permission_group.name()?.into_owned(),
150
            should_be_member: true,
151
        })?;
152
2
        Ok(())
153
2
    }
154

            
155
2
    fn remove_permission_group_from_user<
156
2
        'user,
157
2
        'group,
158
2
        U: bonsaidb_core::schema::Nameable<'user, u64> + Send + Sync,
159
2
        G: bonsaidb_core::schema::Nameable<'group, u64> + Send + Sync,
160
2
    >(
161
2
        &self,
162
2
        user: U,
163
2
        permission_group: G,
164
2
    ) -> Result<(), bonsaidb_core::Error> {
165
2
        self.send_api_request(&AlterUserPermissionGroupMembership {
166
2
            user: user.name()?.into_owned(),
167
2
            group: permission_group.name()?.into_owned(),
168
            should_be_member: false,
169
        })?;
170
2
        Ok(())
171
2
    }
172

            
173
2
    fn add_role_to_user<
174
2
        'user,
175
2
        'role,
176
2
        U: bonsaidb_core::schema::Nameable<'user, u64> + Send + Sync,
177
2
        R: bonsaidb_core::schema::Nameable<'role, u64> + Send + Sync,
178
2
    >(
179
2
        &self,
180
2
        user: U,
181
2
        role: R,
182
2
    ) -> Result<(), bonsaidb_core::Error> {
183
2
        self.send_api_request(&AlterUserRoleMembership {
184
2
            user: user.name()?.into_owned(),
185
2
            role: role.name()?.into_owned(),
186
            should_be_member: true,
187
        })?;
188
2
        Ok(())
189
2
    }
190

            
191
2
    fn remove_role_from_user<
192
2
        'user,
193
2
        'role,
194
2
        U: bonsaidb_core::schema::Nameable<'user, u64> + Send + Sync,
195
2
        R: bonsaidb_core::schema::Nameable<'role, u64> + Send + Sync,
196
2
    >(
197
2
        &self,
198
2
        user: U,
199
2
        role: R,
200
2
    ) -> Result<(), bonsaidb_core::Error> {
201
2
        self.send_api_request(&AlterUserRoleMembership {
202
2
            user: user.name()?.into_owned(),
203
2
            role: role.name()?.into_owned(),
204
            should_be_member: false,
205
        })?;
206
2
        Ok(())
207
2
    }
208
}
209

            
210
impl Connection for RemoteDatabase {
211
    type Storage = Client;
212

            
213
    fn storage(&self) -> Self::Storage {
214
        self.client.clone()
215
    }
216

            
217
342
    fn list_executed_transactions(
218
342
        &self,
219
342
        starting_id: Option<u64>,
220
342
        result_limit: Option<u32>,
221
342
    ) -> Result<Vec<bonsaidb_core::transaction::Executed>, bonsaidb_core::Error> {
222
342
        Ok(self.client.send_api_request(&ListExecutedTransactions {
223
342
            database: self.name.to_string(),
224
342
            starting_id,
225
342
            result_limit,
226
342
        })?)
227
342
    }
228

            
229
19
    fn last_transaction_id(&self) -> Result<Option<u64>, bonsaidb_core::Error> {
230
19
        Ok(self.client.send_api_request(&LastTransactionId {
231
19
            database: self.name.to_string(),
232
19
        })?)
233
19
    }
234

            
235
    fn compact(&self) -> Result<(), bonsaidb_core::Error> {
236
19
        self.send_api_request(&Compact {
237
19
            database: self.name.to_string(),
238
19
        })?;
239
19
        Ok(())
240
19
    }
241

            
242
    fn compact_key_value_store(&self) -> Result<(), bonsaidb_core::Error> {
243
19
        self.send_api_request(&CompactKeyValueStore {
244
19
            database: self.name.to_string(),
245
19
        })?;
246
19
        Ok(())
247
19
    }
248
}
249

            
250
impl LowLevelConnection for RemoteDatabase {
251
779
    fn schematic(&self) -> &Schematic {
252
779
        &self.schema
253
779
    }
254

            
255
19950
    fn apply_transaction(
256
19950
        &self,
257
19950
        transaction: bonsaidb_core::transaction::Transaction,
258
19950
    ) -> Result<Vec<bonsaidb_core::transaction::OperationResult>, bonsaidb_core::Error> {
259
19950
        Ok(self.client.send_api_request(&ApplyTransaction {
260
19950
            database: self.name.to_string(),
261
19950
            transaction,
262
19950
        })?)
263
19950
    }
264

            
265
266
    fn get_from_collection(
266
266
        &self,
267
266
        id: bonsaidb_core::document::DocumentId,
268
266
        collection: &CollectionName,
269
266
    ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error> {
270
266
        Ok(self.client.send_api_request(&Get {
271
266
            database: self.name.to_string(),
272
266
            collection: collection.clone(),
273
266
            id,
274
266
        })?)
275
266
    }
276

            
277
152
    fn get_multiple_from_collection(
278
152
        &self,
279
152
        ids: &[bonsaidb_core::document::DocumentId],
280
152
        collection: &CollectionName,
281
152
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
282
152
        Ok(self.client.send_api_request(&GetMultiple {
283
152
            database: self.name.to_string(),
284
152
            collection: collection.clone(),
285
152
            ids: ids.to_vec(),
286
152
        })?)
287
152
    }
288

            
289
76
    fn list_from_collection(
290
76
        &self,
291
76
        ids: Range<bonsaidb_core::document::DocumentId>,
292
76
        order: Sort,
293
76
        limit: Option<u32>,
294
76
        collection: &CollectionName,
295
76
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
296
76
        Ok(self.client.send_api_request(&List {
297
76
            database: self.name.to_string(),
298
76
            collection: collection.clone(),
299
76
            ids,
300
76
            order,
301
76
            limit,
302
76
        })?)
303
76
    }
304

            
305
19
    fn list_headers_from_collection(
306
19
        &self,
307
19
        ids: Range<DocumentId>,
308
19
        order: Sort,
309
19
        limit: Option<u32>,
310
19
        collection: &CollectionName,
311
19
    ) -> Result<Vec<Header>, bonsaidb_core::Error> {
312
19
        Ok(self.client.send_api_request(&ListHeaders(List {
313
19
            database: self.name.to_string(),
314
19
            collection: collection.clone(),
315
19
            ids,
316
19
            order,
317
19
            limit,
318
19
        }))?)
319
19
    }
320

            
321
38
    fn count_from_collection(
322
38
        &self,
323
38
        ids: Range<bonsaidb_core::document::DocumentId>,
324
38
        collection: &CollectionName,
325
38
    ) -> Result<u64, bonsaidb_core::Error> {
326
38
        Ok(self.client.send_api_request(&Count {
327
38
            database: self.name.to_string(),
328
38
            collection: collection.clone(),
329
38
            ids,
330
38
        })?)
331
38
    }
332

            
333
    fn compact_collection_by_name(
334
        &self,
335
        collection: CollectionName,
336
    ) -> Result<(), bonsaidb_core::Error> {
337
19
        self.send_api_request(&CompactCollection {
338
19
            database: self.name.to_string(),
339
19
            name: collection,
340
19
        })?;
341
19
        Ok(())
342
19
    }
343

            
344
608
    fn query_by_name(
345
608
        &self,
346
608
        view: &bonsaidb_core::schema::ViewName,
347
608
        key: Option<bonsaidb_core::connection::QueryKey<bonsaidb_core::arc_bytes::serde::Bytes>>,
348
608
        order: Sort,
349
608
        limit: Option<u32>,
350
608
        access_policy: bonsaidb_core::connection::AccessPolicy,
351
608
    ) -> Result<Vec<bonsaidb_core::schema::view::map::Serialized>, bonsaidb_core::Error> {
352
608
        Ok(self.client.send_api_request(&Query {
353
608
            database: self.name.to_string(),
354
608
            view: view.clone(),
355
608
            key,
356
608
            order,
357
608
            limit,
358
608
            access_policy,
359
608
        })?)
360
608
    }
361

            
362
    fn query_by_name_with_docs(
363
        &self,
364
        view: &bonsaidb_core::schema::ViewName,
365
        key: Option<bonsaidb_core::connection::QueryKey<bonsaidb_core::arc_bytes::serde::Bytes>>,
366
        order: Sort,
367
        limit: Option<u32>,
368
        access_policy: bonsaidb_core::connection::AccessPolicy,
369
    ) -> Result<bonsaidb_core::schema::view::map::MappedSerializedDocuments, bonsaidb_core::Error>
370
    {
371
        Ok(self.client.send_api_request(&QueryWithDocs(Query {
372
            database: self.name.to_string(),
373
            view: view.clone(),
374
            key,
375
            order,
376
            limit,
377
            access_policy,
378
        }))?)
379
    }
380

            
381
95
    fn reduce_by_name(
382
95
        &self,
383
95
        view: &bonsaidb_core::schema::ViewName,
384
95
        key: Option<bonsaidb_core::connection::QueryKey<bonsaidb_core::arc_bytes::serde::Bytes>>,
385
95
        access_policy: bonsaidb_core::connection::AccessPolicy,
386
95
    ) -> Result<Vec<u8>, bonsaidb_core::Error> {
387
95
        Ok(self
388
95
            .client
389
95
            .send_api_request(&Reduce {
390
95
                database: self.name.to_string(),
391
95
                view: view.clone(),
392
95
                key,
393
95
                access_policy,
394
95
            })?
395
76
            .into_vec())
396
95
    }
397

            
398
38
    fn reduce_grouped_by_name(
399
38
        &self,
400
38
        view: &bonsaidb_core::schema::ViewName,
401
38
        key: Option<bonsaidb_core::connection::QueryKey<bonsaidb_core::arc_bytes::serde::Bytes>>,
402
38
        access_policy: bonsaidb_core::connection::AccessPolicy,
403
38
    ) -> Result<Vec<bonsaidb_core::schema::view::map::MappedSerializedValue>, bonsaidb_core::Error>
404
38
    {
405
38
        Ok(self.client.send_api_request(&ReduceGrouped(Reduce {
406
38
            database: self.name.to_string(),
407
38
            view: view.clone(),
408
38
            key,
409
38
            access_policy,
410
38
        }))?)
411
38
    }
412

            
413
38
    fn delete_docs_by_name(
414
38
        &self,
415
38
        view: &bonsaidb_core::schema::ViewName,
416
38
        key: Option<bonsaidb_core::connection::QueryKey<bonsaidb_core::arc_bytes::serde::Bytes>>,
417
38
        access_policy: bonsaidb_core::connection::AccessPolicy,
418
38
    ) -> Result<u64, bonsaidb_core::Error> {
419
38
        Ok(self.client.send_api_request(&DeleteDocs {
420
38
            database: self.name.to_string(),
421
38
            view: view.clone(),
422
38
            key,
423
38
            access_policy,
424
38
        })?)
425
38
    }
426
}
427

            
428
impl PubSub for RemoteDatabase {
429
    type Subscriber = RemoteSubscriber;
430

            
431
152
    fn create_subscriber(&self) -> Result<Self::Subscriber, bonsaidb_core::Error> {
432
152
        let subscriber_id = self.client.send_api_request(&CreateSubscriber {
433
152
            database: self.name.to_string(),
434
152
        })?;
435

            
436
152
        let (sender, receiver) = flume::unbounded();
437
152
        self.client.register_subscriber(subscriber_id, sender);
438
152
        Ok(RemoteSubscriber {
439
152
            client: self.client.clone(),
440
152
            database: self.name.clone(),
441
152
            id: subscriber_id,
442
152
            receiver: Receiver::new(receiver),
443
152
            tokio: None,
444
152
        })
445
152
    }
446

            
447
    fn publish_bytes(&self, topic: Vec<u8>, payload: Vec<u8>) -> Result<(), bonsaidb_core::Error> {
448
190
        self.client.send_api_request(&Publish {
449
190
            database: self.name.to_string(),
450
190
            topic: Bytes::from(topic),
451
190
            payload: Bytes::from(payload),
452
190
        })?;
453
190
        Ok(())
454
190
    }
455

            
456
1
    fn publish_bytes_to_all(
457
1
        &self,
458
1
        topics: impl IntoIterator<Item = Vec<u8>> + Send,
459
1
        payload: Vec<u8>,
460
1
    ) -> Result<(), bonsaidb_core::Error> {
461
1
        let topics = topics.into_iter().map(Bytes::from).collect();
462
1
        self.client.send_api_request(&PublishToAll {
463
1
            database: self.name.to_string(),
464
1
            topics,
465
1
            payload: Bytes::from(payload),
466
1
        })?;
467
1
        Ok(())
468
1
    }
469
}
470

            
471
impl Subscriber for RemoteSubscriber {
472
    fn subscribe_to_bytes(&self, topic: Vec<u8>) -> Result<(), bonsaidb_core::Error> {
473
247
        self.client.send_api_request(&SubscribeTo {
474
247
            database: self.database.to_string(),
475
247
            subscriber_id: self.id,
476
247
            topic: Bytes::from(topic),
477
247
        })?;
478
247
        Ok(())
479
247
    }
480

            
481
    fn unsubscribe_from_bytes(&self, topic: &[u8]) -> Result<(), bonsaidb_core::Error> {
482
19
        self.client.send_api_request(&UnsubscribeFrom {
483
19
            database: self.database.to_string(),
484
19
            subscriber_id: self.id,
485
19
            topic: Bytes::from(topic),
486
19
        })?;
487
19
        Ok(())
488
19
    }
489

            
490
342
    fn receiver(&self) -> &Receiver {
491
342
        AsyncSubscriber::receiver(self)
492
342
    }
493
}
494

            
495
impl KeyValue for RemoteDatabase {
496
191881
    fn execute_key_operation(
497
191881
        &self,
498
191881
        op: bonsaidb_core::keyvalue::KeyOperation,
499
191881
    ) -> Result<bonsaidb_core::keyvalue::Output, bonsaidb_core::Error> {
500
191881
        Ok(self.client.send_api_request(&ExecuteKeyOperation {
501
191881
            database: self.name.to_string(),
502
191881

            
503
191881
            op,
504
191881
        })?)
505
191881
    }
506
}
507

            
508
pub enum Tokio {
509
    Runtime(Runtime),
510
    Handle(Handle),
511
}
512

            
513
impl Tokio {
514
2831
    pub fn spawn<F: Future<Output = Result<(), crate::Error>> + Send + 'static>(
515
2831
        self,
516
2831
        task: F,
517
2831
    ) -> JoinHandle<Result<(), crate::Error>> {
518
2831
        match self {
519
589
            Self::Runtime(tokio) => {
520
589
                // When we have an owned runtime, we must have a thread driving
521
589
                // the runtime. To keep the interface to `Client` simple, we are
522
589
                // going to spawn the task and let the main block_on task simply
523
589
                // wait for the completion event. If the JoinHandle is
524
589
                // cancelled, the sender will be dropped and everything will
525
589
                // clean up.
526
589
                let (completion_sender, completion_receiver) = oneshot::channel();
527
589
                let task = async move {
528
339625
                    task.await?;
529
19
                    let _ = completion_sender.send(());
530
19
                    Ok(())
531
19
                };
532
589
                let task = tokio.spawn(task);
533
589

            
534
589
                std::thread::spawn(move || {
535
589
                    tokio.block_on(async move {
536
589
                        let _ = completion_receiver.await;
537
589
                    });
538
589
                });
539
589
                task
540
            }
541
2242
            Self::Handle(tokio) => tokio.spawn(task),
542
        }
543
2831
    }
544
}
545

            
546
2831
pub fn spawn_client<F: Future<Output = Result<(), crate::Error>> + Send + 'static>(
547
2831
    task: F,
548
2831
    handle: Option<Handle>,
549
2831
) -> JoinHandle<Result<(), crate::Error>> {
550
    // We need to spawn a runtime or
551
2831
    let tokio = if let Some(handle) = handle {
552
2242
        Tokio::Handle(handle)
553
    } else {
554
589
        Tokio::Runtime(Runtime::new().unwrap())
555
    };
556
2831
    tokio.spawn(task)
557
2831
}