1
use std::collections::HashMap;
2
use std::sync::atomic::Ordering;
3
use std::sync::Arc;
4
use std::time::Duration;
5

            
6
use bonsaidb_core::admin::{Admin, ADMIN_DATABASE_NAME};
7
use bonsaidb_core::api;
8
use bonsaidb_core::arc_bytes::serde::Bytes;
9
use bonsaidb_core::connection::{
10
    AccessPolicy, Connection, Database, HasSchema, HasSession, IdentityReference,
11
    LowLevelConnection, Range, SerializedQueryKey, Sort, StorageConnection,
12
};
13
use bonsaidb_core::document::{DocumentId, Header, OwnedDocument};
14
use bonsaidb_core::keyvalue::KeyValue;
15
use bonsaidb_core::networking::{
16
    AlterUserPermissionGroupMembership, AlterUserRoleMembership, ApplyTransaction, AssumeIdentity,
17
    Compact, CompactCollection, CompactKeyValueStore, Count, CreateDatabase, CreateSubscriber,
18
    CreateUser, DeleteDatabase, DeleteDocs, DeleteUser, ExecuteKeyOperation, Get, GetMultiple,
19
    LastTransactionId, List, ListAvailableSchemas, ListDatabases, ListExecutedTransactions,
20
    ListHeaders, Publish, PublishToAll, Query, QueryWithDocs, Reduce, ReduceGrouped, SubscribeTo,
21
    UnsubscribeFrom, CURRENT_PROTOCOL_VERSION,
22
};
23
use bonsaidb_core::pubsub::{AsyncSubscriber, PubSub, Receiver, Subscriber};
24
use bonsaidb_core::schema::view::map;
25
use bonsaidb_core::schema::{CollectionName, ViewName};
26
use futures::Future;
27
use tokio::runtime::{Handle, Runtime};
28
use tokio::sync::oneshot;
29
use tokio::task::JoinHandle;
30
use url::Url;
31

            
32
use crate::builder::Blocking;
33
use crate::client::ClientSession;
34
use crate::{ApiError, AsyncClient, AsyncRemoteDatabase, AsyncRemoteSubscriber, Builder, Error};
35

            
36
/// A BonsaiDb client that blocks the current thread when performing requests.
37
#[derive(Debug, Clone)]
38
pub struct BlockingClient(pub(crate) AsyncClient);
39

            
40
impl BlockingClient {
41
    /// Returns a builder for a new client connecting to `url`.
42
68
    pub fn build(url: Url) -> Builder<Blocking> {
43
68
        Builder::new(url)
44
68
    }
45

            
46
    /// Initialize a client connecting to `url`. This client can be shared by
47
    /// cloning it. All requests are done asynchronously over the same
48
    /// connection.
49
    ///
50
    /// If the client has an error connecting, the first request made will
51
    /// present that error. If the client disconnects while processing requests,
52
    /// all requests being processed will exit and return
53
    /// [`Error::Disconnected`](bonsaidb_core::networking::Error::Disconnected).
54
    /// The client will automatically try reconnecting.
55
    ///
56
    /// The goal of this design of this reconnection strategy is to make it
57
    /// easier to build resilliant apps. By allowing existing Client instances
58
    /// to recover and reconnect, each component of the apps built can adopt a
59
    /// "retry-to-recover" design, or "abort-and-fail" depending on how critical
60
    /// the database is to operation.
61
612
    pub fn new(url: Url) -> Result<Self, Error> {
62
612
        AsyncClient::new_from_parts(
63
612
            url,
64
612
            CURRENT_PROTOCOL_VERSION,
65
612
            HashMap::default(),
66
612
            None,
67
612
            None,
68
612
            #[cfg(not(target_arch = "wasm32"))]
69
612
            None,
70
612
            #[cfg(not(target_arch = "wasm32"))]
71
612
            Handle::try_current().ok(),
72
612
        )
73
612
        .map(Self)
74
612
    }
75

            
76
    /// Sends an api `request`.
77
912
    pub fn send_api_request<Api: api::Api>(
78
912
        &self,
79
912
        request: &Api,
80
912
    ) -> Result<Api::Response, ApiError<Api::Error>> {
81
912
        self.0.send_blocking_api_request(request)
82
912
    }
83

            
84
    /// Sends an api `request` without waiting for a result. The response from
85
    /// the server will be ignored.
86
    pub fn invoke_api_request<Api: api::Api>(&self, request: &Api) -> Result<(), Error> {
87
        let request = Bytes::from(pot::to_vec(request).map_err(Error::from)?);
88
        self.0
89
            .send_request_without_confirmation(Api::name(), request)
90
            .map(|_| ())
91
    }
92

            
93
    /// Returns a reference to an async-compatible version of this client.
94
    #[must_use]
95
    pub fn as_async(&self) -> &AsyncClient {
96
        &self.0
97
    }
98

            
99
    /// Sets this instance's request timeout.
100
    ///
101
    /// Each client has its own timeout. When cloning a client, this timeout
102
    /// setting will be copied to the clone.
103
    pub fn set_request_timeout(&mut self, timeout: impl Into<Duration>) {
104
        self.0.request_timeout = timeout.into();
105
    }
106
}
107

            
108
impl From<AsyncClient> for BlockingClient {
109
    fn from(client: AsyncClient) -> Self {
110
        Self(client)
111
    }
112
}
113

            
114
impl From<BlockingClient> for AsyncClient {
115
    fn from(client: BlockingClient) -> Self {
116
        client.0
117
    }
118
}
119

            
120
impl StorageConnection for BlockingClient {
121
    type Authenticated = Self;
122
    type Database = BlockingRemoteDatabase;
123

            
124
    fn admin(&self) -> Self::Database {
125
        BlockingRemoteDatabase(
126
            self.0
127
                .remote_database::<Admin>(ADMIN_DATABASE_NAME)
128
                .unwrap(),
129
        )
130
    }
131

            
132
76
    fn database<DB: bonsaidb_core::schema::Schema>(
133
76
        &self,
134
76
        name: &str,
135
76
    ) -> Result<Self::Database, bonsaidb_core::Error> {
136
76
        self.0
137
76
            .remote_database::<DB>(name)
138
76
            .map(BlockingRemoteDatabase)
139
76
    }
140

            
141
    fn create_database_with_schema(
142
        &self,
143
        name: &str,
144
        schema: bonsaidb_core::schema::SchemaName,
145
        only_if_needed: bool,
146
    ) -> Result<(), bonsaidb_core::Error> {
147
697
        self.send_api_request(&CreateDatabase {
148
697
            database: Database {
149
697
                name: name.to_string(),
150
697
                schema,
151
697
            },
152
697
            only_if_needed,
153
697
        })?;
154
646
        Ok(())
155
697
    }
156

            
157
    fn delete_database(&self, name: &str) -> Result<(), bonsaidb_core::Error> {
158
34
        self.send_api_request(&DeleteDatabase {
159
34
            name: name.to_string(),
160
34
        })?;
161
17
        Ok(())
162
34
    }
163

            
164
51
    fn list_databases(
165
51
        &self,
166
51
    ) -> Result<Vec<bonsaidb_core::connection::Database>, bonsaidb_core::Error> {
167
51
        Ok(self.send_api_request(&ListDatabases)?)
168
51
    }
169

            
170
17
    fn list_available_schemas(
171
17
        &self,
172
17
    ) -> Result<Vec<bonsaidb_core::schema::SchemaSummary>, bonsaidb_core::Error> {
173
17
        Ok(self.send_api_request(&ListAvailableSchemas)?)
174
17
    }
175

            
176
34
    fn create_user(&self, username: &str) -> Result<u64, bonsaidb_core::Error> {
177
34
        Ok(self.send_api_request(&CreateUser {
178
34
            username: username.to_string(),
179
34
        })?)
180
34
    }
181

            
182
1
    fn delete_user<'user, U: bonsaidb_core::schema::Nameable<'user, u64> + Send + Sync>(
183
1
        &self,
184
1
        user: U,
185
1
    ) -> Result<(), bonsaidb_core::Error> {
186
1
        Ok(self.send_api_request(&DeleteUser {
187
1
            user: user.name()?.into_owned(),
188
        })?)
189
1
    }
190

            
191
    #[cfg(feature = "password-hashing")]
192
    fn set_user_password<'user, U: bonsaidb_core::schema::Nameable<'user, u64> + Send + Sync>(
193
        &self,
194
        user: U,
195
        password: bonsaidb_core::connection::SensitiveString,
196
    ) -> Result<(), bonsaidb_core::Error> {
197
        use bonsaidb_core::networking::SetUserPassword;
198

            
199
        Ok(self.send_api_request(&SetUserPassword {
200
            user: user.name()?.into_owned(),
201
            password,
202
        })?)
203
    }
204

            
205
    #[cfg(any(feature = "token-authentication", feature = "password-hashing"))]
206
68
    fn authenticate(
207
68
        &self,
208
68
        authentication: bonsaidb_core::connection::Authentication,
209
68
    ) -> Result<Self::Authenticated, bonsaidb_core::Error> {
210
68
        let session =
211
68
            self.send_api_request(&bonsaidb_core::networking::Authenticate { authentication })?;
212
68
        Ok(Self(AsyncClient {
213
68
            data: self.0.data.clone(),
214
68
            session: ClientSession {
215
68
                session: Arc::new(session),
216
68
                connection_id: self.0.data.connection_counter.load(Ordering::SeqCst),
217
68
            },
218
68
            request_timeout: self.0.request_timeout,
219
68
        }))
220
68
    }
221

            
222
    fn assume_identity(
223
        &self,
224
        identity: IdentityReference<'_>,
225
    ) -> Result<Self::Authenticated, bonsaidb_core::Error> {
226
        let session = self.send_api_request(&AssumeIdentity(identity.into_owned()))?;
227
        Ok(Self(AsyncClient {
228
            data: self.0.data.clone(),
229
            session: ClientSession {
230
                session: Arc::new(session),
231
                connection_id: self.0.data.connection_counter.load(Ordering::SeqCst),
232
            },
233
            request_timeout: self.0.request_timeout,
234
        }))
235
    }
236

            
237
2
    fn add_permission_group_to_user<
238
2
        'user,
239
2
        'group,
240
2
        U: bonsaidb_core::schema::Nameable<'user, u64> + Send + Sync,
241
2
        G: bonsaidb_core::schema::Nameable<'group, u64> + Send + Sync,
242
2
    >(
243
2
        &self,
244
2
        user: U,
245
2
        permission_group: G,
246
2
    ) -> Result<(), bonsaidb_core::Error> {
247
2
        self.send_api_request(&AlterUserPermissionGroupMembership {
248
2
            user: user.name()?.into_owned(),
249
2
            group: permission_group.name()?.into_owned(),
250
            should_be_member: true,
251
        })?;
252
2
        Ok(())
253
2
    }
254

            
255
2
    fn remove_permission_group_from_user<
256
2
        'user,
257
2
        'group,
258
2
        U: bonsaidb_core::schema::Nameable<'user, u64> + Send + Sync,
259
2
        G: bonsaidb_core::schema::Nameable<'group, u64> + Send + Sync,
260
2
    >(
261
2
        &self,
262
2
        user: U,
263
2
        permission_group: G,
264
2
    ) -> Result<(), bonsaidb_core::Error> {
265
2
        self.send_api_request(&AlterUserPermissionGroupMembership {
266
2
            user: user.name()?.into_owned(),
267
2
            group: permission_group.name()?.into_owned(),
268
            should_be_member: false,
269
        })?;
270
2
        Ok(())
271
2
    }
272

            
273
2
    fn add_role_to_user<
274
2
        'user,
275
2
        'role,
276
2
        U: bonsaidb_core::schema::Nameable<'user, u64> + Send + Sync,
277
2
        R: bonsaidb_core::schema::Nameable<'role, u64> + Send + Sync,
278
2
    >(
279
2
        &self,
280
2
        user: U,
281
2
        role: R,
282
2
    ) -> Result<(), bonsaidb_core::Error> {
283
2
        self.send_api_request(&AlterUserRoleMembership {
284
2
            user: user.name()?.into_owned(),
285
2
            role: role.name()?.into_owned(),
286
            should_be_member: true,
287
        })?;
288
2
        Ok(())
289
2
    }
290

            
291
2
    fn remove_role_from_user<
292
2
        'user,
293
2
        'role,
294
2
        U: bonsaidb_core::schema::Nameable<'user, u64> + Send + Sync,
295
2
        R: bonsaidb_core::schema::Nameable<'role, u64> + Send + Sync,
296
2
    >(
297
2
        &self,
298
2
        user: U,
299
2
        role: R,
300
2
    ) -> Result<(), bonsaidb_core::Error> {
301
2
        self.send_api_request(&AlterUserRoleMembership {
302
2
            user: user.name()?.into_owned(),
303
2
            role: role.name()?.into_owned(),
304
            should_be_member: false,
305
        })?;
306
2
        Ok(())
307
2
    }
308
}
309

            
310
impl HasSession for BlockingClient {
311
68
    fn session(&self) -> Option<&bonsaidb_core::connection::Session> {
312
68
        self.0.session()
313
68
    }
314
}
315

            
316
/// A remote database that blocks the current thread when performing its
317
/// requests.
318
137
#[derive(Debug, Clone)]
319
pub struct BlockingRemoteDatabase(AsyncRemoteDatabase);
320

            
321
impl Connection for BlockingRemoteDatabase {
322
    type Storage = BlockingClient;
323

            
324
    fn storage(&self) -> Self::Storage {
325
        BlockingClient(self.0.client.clone())
326
    }
327

            
328
306
    fn list_executed_transactions(
329
306
        &self,
330
306
        starting_id: Option<u64>,
331
306
        result_limit: Option<u32>,
332
306
    ) -> Result<Vec<bonsaidb_core::transaction::Executed>, bonsaidb_core::Error> {
333
306
        Ok(self
334
306
            .0
335
306
            .client
336
306
            .send_blocking_api_request(&ListExecutedTransactions {
337
306
                database: self.0.name.to_string(),
338
306
                starting_id,
339
306
                result_limit,
340
306
            })?)
341
306
    }
342

            
343
17
    fn last_transaction_id(&self) -> Result<Option<u64>, bonsaidb_core::Error> {
344
17
        Ok(self
345
17
            .0
346
17
            .client
347
17
            .send_blocking_api_request(&LastTransactionId {
348
17
                database: self.0.name.to_string(),
349
17
            })?)
350
17
    }
351

            
352
    fn compact(&self) -> Result<(), bonsaidb_core::Error> {
353
17
        self.0.send_blocking_api_request(&Compact {
354
17
            database: self.0.name.to_string(),
355
17
        })?;
356
17
        Ok(())
357
17
    }
358

            
359
    fn compact_key_value_store(&self) -> Result<(), bonsaidb_core::Error> {
360
17
        self.0.send_blocking_api_request(&CompactKeyValueStore {
361
17
            database: self.0.name.to_string(),
362
17
        })?;
363
17
        Ok(())
364
17
    }
365
}
366

            
367
impl LowLevelConnection for BlockingRemoteDatabase {
368
18173
    fn apply_transaction(
369
18173
        &self,
370
18173
        transaction: bonsaidb_core::transaction::Transaction,
371
18173
    ) -> Result<Vec<bonsaidb_core::transaction::OperationResult>, bonsaidb_core::Error> {
372
18173
        Ok(self.0.client.send_blocking_api_request(&ApplyTransaction {
373
18173
            database: self.0.name.to_string(),
374
18173
            transaction,
375
18173
        })?)
376
18173
    }
377

            
378
306
    fn get_from_collection(
379
306
        &self,
380
306
        id: bonsaidb_core::document::DocumentId,
381
306
        collection: &CollectionName,
382
306
    ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error> {
383
306
        Ok(self.0.client.send_blocking_api_request(&Get {
384
306
            database: self.0.name.to_string(),
385
306
            collection: collection.clone(),
386
306
            id,
387
306
        })?)
388
306
    }
389

            
390
136
    fn get_multiple_from_collection(
391
136
        &self,
392
136
        ids: &[bonsaidb_core::document::DocumentId],
393
136
        collection: &CollectionName,
394
136
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
395
136
        Ok(self.0.client.send_blocking_api_request(&GetMultiple {
396
136
            database: self.0.name.to_string(),
397
136
            collection: collection.clone(),
398
136
            ids: ids.to_vec(),
399
136
        })?)
400
136
    }
401

            
402
68
    fn list_from_collection(
403
68
        &self,
404
68
        ids: Range<bonsaidb_core::document::DocumentId>,
405
68
        order: Sort,
406
68
        limit: Option<u32>,
407
68
        collection: &CollectionName,
408
68
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
409
68
        Ok(self.0.client.send_blocking_api_request(&List {
410
68
            database: self.0.name.to_string(),
411
68
            collection: collection.clone(),
412
68
            ids,
413
68
            order,
414
68
            limit,
415
68
        })?)
416
68
    }
417

            
418
17
    fn list_headers_from_collection(
419
17
        &self,
420
17
        ids: Range<DocumentId>,
421
17
        order: Sort,
422
17
        limit: Option<u32>,
423
17
        collection: &CollectionName,
424
17
    ) -> Result<Vec<Header>, bonsaidb_core::Error> {
425
17
        Ok(self.0.client.send_blocking_api_request(&ListHeaders(List {
426
17
            database: self.0.name.to_string(),
427
17
            collection: collection.clone(),
428
17
            ids,
429
17
            order,
430
17
            limit,
431
17
        }))?)
432
17
    }
433

            
434
34
    fn count_from_collection(
435
34
        &self,
436
34
        ids: Range<bonsaidb_core::document::DocumentId>,
437
34
        collection: &CollectionName,
438
34
    ) -> Result<u64, bonsaidb_core::Error> {
439
34
        Ok(self.0.client.send_blocking_api_request(&Count {
440
34
            database: self.0.name.to_string(),
441
34
            collection: collection.clone(),
442
34
            ids,
443
34
        })?)
444
34
    }
445

            
446
    fn compact_collection_by_name(
447
        &self,
448
        collection: CollectionName,
449
    ) -> Result<(), bonsaidb_core::Error> {
450
17
        self.0.send_blocking_api_request(&CompactCollection {
451
17
            database: self.0.name.to_string(),
452
17
            name: collection,
453
17
        })?;
454
17
        Ok(())
455
17
    }
456

            
457
663
    fn query_by_name(
458
663
        &self,
459
663
        view: &ViewName,
460
663
        key: Option<SerializedQueryKey>,
461
663
        order: Sort,
462
663
        limit: Option<u32>,
463
663
        access_policy: AccessPolicy,
464
663
    ) -> Result<Vec<map::Serialized>, bonsaidb_core::Error> {
465
663
        Ok(self.0.client.send_blocking_api_request(&Query {
466
663
            database: self.0.name.to_string(),
467
663
            view: view.clone(),
468
663
            key,
469
663
            order,
470
663
            limit,
471
663
            access_policy,
472
663
        })?)
473
663
    }
474

            
475
    fn query_by_name_with_docs(
476
        &self,
477
        view: &bonsaidb_core::schema::ViewName,
478
        key: Option<SerializedQueryKey>,
479
        order: Sort,
480
        limit: Option<u32>,
481
        access_policy: AccessPolicy,
482
    ) -> Result<bonsaidb_core::schema::view::map::MappedSerializedDocuments, bonsaidb_core::Error>
483
    {
484
        Ok(self
485
            .0
486
            .client
487
            .send_blocking_api_request(&QueryWithDocs(Query {
488
                database: self.0.name.to_string(),
489
                view: view.clone(),
490
                key,
491
                order,
492
                limit,
493
                access_policy,
494
            }))?)
495
    }
496

            
497
119
    fn reduce_by_name(
498
119
        &self,
499
119
        view: &bonsaidb_core::schema::ViewName,
500
119
        key: Option<SerializedQueryKey>,
501
119
        access_policy: AccessPolicy,
502
119
    ) -> Result<Vec<u8>, bonsaidb_core::Error> {
503
119
        Ok(self
504
119
            .0
505
119
            .client
506
119
            .send_blocking_api_request(&Reduce {
507
119
                database: self.0.name.to_string(),
508
119
                view: view.clone(),
509
119
                key,
510
119
                access_policy,
511
119
            })?
512
102
            .into_vec())
513
119
    }
514

            
515
34
    fn reduce_grouped_by_name(
516
34
        &self,
517
34
        view: &bonsaidb_core::schema::ViewName,
518
34
        key: Option<SerializedQueryKey>,
519
34
        access_policy: AccessPolicy,
520
34
    ) -> Result<Vec<bonsaidb_core::schema::view::map::MappedSerializedValue>, bonsaidb_core::Error>
521
34
    {
522
34
        Ok(self
523
34
            .0
524
34
            .client
525
34
            .send_blocking_api_request(&ReduceGrouped(Reduce {
526
34
                database: self.0.name.to_string(),
527
34
                view: view.clone(),
528
34
                key,
529
34
                access_policy,
530
34
            }))?)
531
34
    }
532

            
533
34
    fn delete_docs_by_name(
534
34
        &self,
535
34
        view: &bonsaidb_core::schema::ViewName,
536
34
        key: Option<SerializedQueryKey>,
537
34
        access_policy: AccessPolicy,
538
34
    ) -> Result<u64, bonsaidb_core::Error> {
539
34
        Ok(self.0.client.send_blocking_api_request(&DeleteDocs {
540
34
            database: self.0.name.to_string(),
541
34
            view: view.clone(),
542
34
            key,
543
34
            access_policy,
544
34
        })?)
545
34
    }
546
}
547

            
548
impl HasSession for BlockingRemoteDatabase {
549
    fn session(&self) -> Option<&bonsaidb_core::connection::Session> {
550
        self.0.session()
551
    }
552
}
553

            
554
impl HasSchema for BlockingRemoteDatabase {
555
850
    fn schematic(&self) -> &bonsaidb_core::schema::Schematic {
556
850
        self.0.schematic()
557
850
    }
558
}
559

            
560
impl PubSub for BlockingRemoteDatabase {
561
    type Subscriber = BlockingRemoteSubscriber;
562

            
563
136
    fn create_subscriber(&self) -> Result<Self::Subscriber, bonsaidb_core::Error> {
564
136
        let subscriber_id = self.0.client.send_blocking_api_request(&CreateSubscriber {
565
136
            database: self.0.name.to_string(),
566
136
        })?;
567

            
568
136
        let (sender, receiver) = flume::unbounded();
569
136
        self.0.client.register_subscriber(subscriber_id, sender);
570
136
        Ok(BlockingRemoteSubscriber(AsyncRemoteSubscriber {
571
136
            client: self.0.client.clone(),
572
136
            database: self.0.name.clone(),
573
136
            id: subscriber_id,
574
136
            receiver: Receiver::new(receiver),
575
136
            tokio: None,
576
136
        }))
577
136
    }
578

            
579
    fn publish_bytes(&self, topic: Vec<u8>, payload: Vec<u8>) -> Result<(), bonsaidb_core::Error> {
580
170
        self.0.client.send_blocking_api_request(&Publish {
581
170
            database: self.0.name.to_string(),
582
170
            topic: Bytes::from(topic),
583
170
            payload: Bytes::from(payload),
584
170
        })?;
585
170
        Ok(())
586
170
    }
587

            
588
1
    fn publish_bytes_to_all(
589
1
        &self,
590
1
        topics: impl IntoIterator<Item = Vec<u8>> + Send,
591
1
        payload: Vec<u8>,
592
1
    ) -> Result<(), bonsaidb_core::Error> {
593
1
        let topics = topics.into_iter().map(Bytes::from).collect();
594
1
        self.0.client.send_blocking_api_request(&PublishToAll {
595
1
            database: self.0.name.to_string(),
596
1
            topics,
597
1
            payload: Bytes::from(payload),
598
1
        })?;
599
1
        Ok(())
600
1
    }
601
}
602

            
603
/// A remote PubSub [`Subscriber`] that blocks the current thread when
604
/// performing requests.
605
#[derive(Debug)]
606
pub struct BlockingRemoteSubscriber(AsyncRemoteSubscriber);
607

            
608
impl Subscriber for BlockingRemoteSubscriber {
609
    fn subscribe_to_bytes(&self, topic: Vec<u8>) -> Result<(), bonsaidb_core::Error> {
610
221
        self.0.client.send_blocking_api_request(&SubscribeTo {
611
221
            database: self.0.database.to_string(),
612
221
            subscriber_id: self.0.id,
613
221
            topic: Bytes::from(topic),
614
221
        })?;
615
221
        Ok(())
616
221
    }
617

            
618
    fn unsubscribe_from_bytes(&self, topic: &[u8]) -> Result<(), bonsaidb_core::Error> {
619
17
        self.0.client.send_blocking_api_request(&UnsubscribeFrom {
620
17
            database: self.0.database.to_string(),
621
17
            subscriber_id: self.0.id,
622
17
            topic: Bytes::from(topic),
623
17
        })?;
624
17
        Ok(())
625
17
    }
626

            
627
306
    fn receiver(&self) -> &Receiver {
628
306
        AsyncSubscriber::receiver(&self.0)
629
306
    }
630
}
631

            
632
impl KeyValue for BlockingRemoteDatabase {
633
171683
    fn execute_key_operation(
634
171683
        &self,
635
171683
        op: bonsaidb_core::keyvalue::KeyOperation,
636
171683
    ) -> Result<bonsaidb_core::keyvalue::Output, bonsaidb_core::Error> {
637
171683
        Ok(self
638
171683
            .0
639
171683
            .client
640
171683
            .send_blocking_api_request(&ExecuteKeyOperation {
641
171683
                database: self.0.name.to_string(),
642
171683

            
643
171683
                op,
644
171683
            })?)
645
171683
    }
646
}
647

            
648
pub enum Tokio {
649
    Runtime(Runtime),
650
    Handle(Handle),
651
}
652

            
653
impl Tokio {
654
2941
    pub fn spawn<F: Future<Output = Result<(), crate::Error>> + Send + 'static>(
655
2941
        self,
656
2941
        task: F,
657
2941
    ) -> JoinHandle<Result<(), crate::Error>> {
658
2941
        match self {
659
680
            Self::Runtime(tokio) => {
660
680
                // When we have an owned runtime, we must have a thread driving
661
680
                // the runtime. To keep the interface to `Client` simple, we are
662
680
                // going to spawn the task and let the main block_on task simply
663
680
                // wait for the completion event. If the JoinHandle is
664
680
                // cancelled, the sender will be dropped and everything will
665
680
                // clean up.
666
680
                let (completion_sender, completion_receiver) = oneshot::channel();
667
680
                let task = async move {
668
176902
                    task.await?;
669
680
                    let _: Result<_, _> = completion_sender.send(());
670
680
                    Ok(())
671
680
                };
672
680
                let task = tokio.spawn(task);
673
680

            
674
680
                std::thread::spawn(move || {
675
680
                    tokio.block_on(async move {
676
680
                        let _: Result<_, _> = completion_receiver.await;
677
680
                    });
678
680
                });
679
680
                task
680
            }
681
2261
            Self::Handle(tokio) => tokio.spawn(task),
682
        }
683
2941
    }
684
}
685

            
686
2941
pub fn spawn_client<F: Future<Output = Result<(), crate::Error>> + Send + 'static>(
687
2941
    task: F,
688
2941
    handle: Option<Handle>,
689
2941
) -> JoinHandle<Result<(), crate::Error>> {
690
    // We need to spawn a runtime or
691
2941
    let tokio = if let Some(handle) = handle {
692
2261
        Tokio::Handle(handle)
693
    } else {
694
680
        Tokio::Runtime(Runtime::new().unwrap())
695
    };
696
2941
    tokio.spawn(task)
697
2941
}