1
use std::collections::HashMap;
2
use std::sync::atomic::Ordering;
3
use std::sync::Arc;
4
use std::time::Duration;
5

            
6
use bonsaidb_core::admin::{Admin, ADMIN_DATABASE_NAME};
7
use bonsaidb_core::api;
8
use bonsaidb_core::arc_bytes::serde::Bytes;
9
use bonsaidb_core::connection::{
10
    AccessPolicy, Connection, Database, HasSchema, HasSession, IdentityReference,
11
    LowLevelConnection, Range, SerializedQueryKey, Sort, StorageConnection,
12
};
13
use bonsaidb_core::document::{DocumentId, Header, OwnedDocument};
14
use bonsaidb_core::keyvalue::KeyValue;
15
use bonsaidb_core::networking::{
16
    AlterUserPermissionGroupMembership, AlterUserRoleMembership, ApplyTransaction, AssumeIdentity,
17
    Compact, CompactCollection, CompactKeyValueStore, Count, CreateDatabase, CreateSubscriber,
18
    CreateUser, DeleteDatabase, DeleteDocs, DeleteUser, ExecuteKeyOperation, Get, GetMultiple,
19
    LastTransactionId, List, ListAvailableSchemas, ListDatabases, ListExecutedTransactions,
20
    ListHeaders, Publish, PublishToAll, Query, QueryWithDocs, Reduce, ReduceGrouped, SubscribeTo,
21
    UnsubscribeFrom, CURRENT_PROTOCOL_VERSION,
22
};
23
use bonsaidb_core::pubsub::{AsyncSubscriber, PubSub, Receiver, Subscriber};
24
use bonsaidb_core::schema::view::map;
25
use bonsaidb_core::schema::{CollectionName, ViewName};
26
use futures::Future;
27
use tokio::runtime::{Handle, Runtime};
28
use tokio::sync::oneshot;
29
use tokio::task::JoinHandle;
30
use url::Url;
31

            
32
use crate::builder::Blocking;
33
use crate::client::ClientSession;
34
use crate::{ApiError, AsyncClient, AsyncRemoteDatabase, AsyncRemoteSubscriber, Builder, Error};
35

            
36
/// A BonsaiDb client that blocks the current thread when performing requests.
37
#[derive(Debug, Clone)]
38
pub struct BlockingClient(pub(crate) AsyncClient);
39

            
40
impl BlockingClient {
41
    /// Returns a builder for a new client connecting to `url`.
42
72
    pub fn build(url: Url) -> Builder<Blocking> {
43
72
        Builder::new(url)
44
72
    }
45

            
46
    /// Initialize a client connecting to `url`. This client can be shared by
47
    /// cloning it. All requests are done asynchronously over the same
48
    /// connection.
49
    ///
50
    /// If the client has an error connecting, the first request made will
51
    /// present that error. If the client disconnects while processing requests,
52
    /// all requests being processed will exit and return
53
    /// [`Error::Disconnected`](bonsaidb_core::networking::Error::Disconnected).
54
    /// The client will automatically try reconnecting.
55
    ///
56
    /// The goal of this design of this reconnection strategy is to make it
57
    /// easier to build resilliant apps. By allowing existing Client instances
58
    /// to recover and reconnect, each component of the apps built can adopt a
59
    /// "retry-to-recover" design, or "abort-and-fail" depending on how critical
60
    /// the database is to operation.
61
648
    pub fn new(url: Url) -> Result<Self, Error> {
62
648
        AsyncClient::new_from_parts(
63
648
            url,
64
648
            CURRENT_PROTOCOL_VERSION,
65
648
            HashMap::default(),
66
648
            None,
67
648
            None,
68
648
            #[cfg(not(target_arch = "wasm32"))]
69
648
            None,
70
648
            #[cfg(not(target_arch = "wasm32"))]
71
648
            Handle::try_current().ok(),
72
648
        )
73
648
        .map(Self)
74
648
    }
75

            
76
    /// Sends an api `request`.
77
965
    pub fn send_api_request<Api: api::Api>(
78
965
        &self,
79
965
        request: &Api,
80
965
    ) -> Result<Api::Response, ApiError<Api::Error>> {
81
965
        self.0.send_blocking_api_request(request)
82
965
    }
83

            
84
    /// Sends an api `request` without waiting for a result. The response from
85
    /// the server will be ignored.
86
    pub fn invoke_api_request<Api: api::Api>(&self, request: &Api) -> Result<(), Error> {
87
        let request = Bytes::from(pot::to_vec(request).map_err(Error::from)?);
88
        self.0
89
            .send_request_without_confirmation(Api::name(), request)
90
            .map(|_| ())
91
    }
92

            
93
    /// Returns a reference to an async-compatible version of this client.
94
    #[must_use]
95
    pub fn as_async(&self) -> &AsyncClient {
96
        &self.0
97
    }
98

            
99
    /// Sets this instance's request timeout.
100
    ///
101
    /// Each client has its own timeout. When cloning a client, this timeout
102
    /// setting will be copied to the clone.
103
    pub fn set_request_timeout(&mut self, timeout: impl Into<Duration>) {
104
        self.0.request_timeout = timeout.into();
105
    }
106
}
107

            
108
impl From<AsyncClient> for BlockingClient {
109
    fn from(client: AsyncClient) -> Self {
110
        Self(client)
111
    }
112
}
113

            
114
impl From<BlockingClient> for AsyncClient {
115
    fn from(client: BlockingClient) -> Self {
116
        client.0
117
    }
118
}
119

            
120
impl StorageConnection for BlockingClient {
121
    type Authenticated = Self;
122
    type Database = BlockingRemoteDatabase;
123

            
124
    fn admin(&self) -> Self::Database {
125
        BlockingRemoteDatabase(
126
            self.0
127
                .remote_database::<Admin>(ADMIN_DATABASE_NAME)
128
                .unwrap(),
129
        )
130
    }
131

            
132
76
    fn database<DB: bonsaidb_core::schema::Schema>(
133
76
        &self,
134
76
        name: &str,
135
76
    ) -> Result<Self::Database, bonsaidb_core::Error> {
136
76
        self.0
137
76
            .remote_database::<DB>(name)
138
76
            .map(BlockingRemoteDatabase)
139
76
    }
140

            
141
    fn create_database_with_schema(
142
        &self,
143
        name: &str,
144
        schema: bonsaidb_core::schema::SchemaName,
145
        only_if_needed: bool,
146
    ) -> Result<(), bonsaidb_core::Error> {
147
738
        self.send_api_request(&CreateDatabase {
148
738
            database: Database {
149
738
                name: name.to_string(),
150
738
                schema,
151
738
            },
152
738
            only_if_needed,
153
738
        })?;
154
684
        Ok(())
155
738
    }
156

            
157
    fn delete_database(&self, name: &str) -> Result<(), bonsaidb_core::Error> {
158
36
        self.send_api_request(&DeleteDatabase {
159
36
            name: name.to_string(),
160
36
        })?;
161
18
        Ok(())
162
36
    }
163

            
164
54
    fn list_databases(
165
54
        &self,
166
54
    ) -> Result<Vec<bonsaidb_core::connection::Database>, bonsaidb_core::Error> {
167
54
        Ok(self.send_api_request(&ListDatabases)?)
168
54
    }
169

            
170
18
    fn list_available_schemas(
171
18
        &self,
172
18
    ) -> Result<Vec<bonsaidb_core::schema::SchemaSummary>, bonsaidb_core::Error> {
173
18
        Ok(self.send_api_request(&ListAvailableSchemas)?)
174
18
    }
175

            
176
36
    fn create_user(&self, username: &str) -> Result<u64, bonsaidb_core::Error> {
177
36
        Ok(self.send_api_request(&CreateUser {
178
36
            username: username.to_string(),
179
36
        })?)
180
36
    }
181

            
182
1
    fn delete_user<'user, U: bonsaidb_core::schema::Nameable<'user, u64> + Send + Sync>(
183
1
        &self,
184
1
        user: U,
185
1
    ) -> Result<(), bonsaidb_core::Error> {
186
1
        Ok(self.send_api_request(&DeleteUser {
187
1
            user: user.name()?.into_owned(),
188
        })?)
189
1
    }
190

            
191
    #[cfg(feature = "password-hashing")]
192
    fn set_user_password<'user, U: bonsaidb_core::schema::Nameable<'user, u64> + Send + Sync>(
193
        &self,
194
        user: U,
195
        password: bonsaidb_core::connection::SensitiveString,
196
    ) -> Result<(), bonsaidb_core::Error> {
197
        use bonsaidb_core::networking::SetUserPassword;
198

            
199
        Ok(self.send_api_request(&SetUserPassword {
200
            user: user.name()?.into_owned(),
201
            password,
202
        })?)
203
    }
204

            
205
    #[cfg(any(feature = "token-authentication", feature = "password-hashing"))]
206
72
    fn authenticate(
207
72
        &self,
208
72
        authentication: bonsaidb_core::connection::Authentication,
209
72
    ) -> Result<Self::Authenticated, bonsaidb_core::Error> {
210
72
        let session =
211
72
            self.send_api_request(&bonsaidb_core::networking::Authenticate { authentication })?;
212
72
        Ok(Self(AsyncClient {
213
72
            data: self.0.data.clone(),
214
72
            session: ClientSession {
215
72
                session: Arc::new(session),
216
72
                connection_id: self.0.data.connection_counter.load(Ordering::SeqCst),
217
72
            },
218
72
            request_timeout: self.0.request_timeout,
219
72
        }))
220
72
    }
221

            
222
    fn assume_identity(
223
        &self,
224
        identity: IdentityReference<'_>,
225
    ) -> Result<Self::Authenticated, bonsaidb_core::Error> {
226
        let session = self.send_api_request(&AssumeIdentity(identity.into_owned()))?;
227
        Ok(Self(AsyncClient {
228
            data: self.0.data.clone(),
229
            session: ClientSession {
230
                session: Arc::new(session),
231
                connection_id: self.0.data.connection_counter.load(Ordering::SeqCst),
232
            },
233
            request_timeout: self.0.request_timeout,
234
        }))
235
    }
236

            
237
2
    fn add_permission_group_to_user<
238
2
        'user,
239
2
        'group,
240
2
        U: bonsaidb_core::schema::Nameable<'user, u64> + Send + Sync,
241
2
        G: bonsaidb_core::schema::Nameable<'group, u64> + Send + Sync,
242
2
    >(
243
2
        &self,
244
2
        user: U,
245
2
        permission_group: G,
246
2
    ) -> Result<(), bonsaidb_core::Error> {
247
2
        self.send_api_request(&AlterUserPermissionGroupMembership {
248
2
            user: user.name()?.into_owned(),
249
2
            group: permission_group.name()?.into_owned(),
250
            should_be_member: true,
251
        })?;
252
2
        Ok(())
253
2
    }
254

            
255
2
    fn remove_permission_group_from_user<
256
2
        'user,
257
2
        'group,
258
2
        U: bonsaidb_core::schema::Nameable<'user, u64> + Send + Sync,
259
2
        G: bonsaidb_core::schema::Nameable<'group, u64> + Send + Sync,
260
2
    >(
261
2
        &self,
262
2
        user: U,
263
2
        permission_group: G,
264
2
    ) -> Result<(), bonsaidb_core::Error> {
265
2
        self.send_api_request(&AlterUserPermissionGroupMembership {
266
2
            user: user.name()?.into_owned(),
267
2
            group: permission_group.name()?.into_owned(),
268
            should_be_member: false,
269
        })?;
270
2
        Ok(())
271
2
    }
272

            
273
2
    fn add_role_to_user<
274
2
        'user,
275
2
        'role,
276
2
        U: bonsaidb_core::schema::Nameable<'user, u64> + Send + Sync,
277
2
        R: bonsaidb_core::schema::Nameable<'role, u64> + Send + Sync,
278
2
    >(
279
2
        &self,
280
2
        user: U,
281
2
        role: R,
282
2
    ) -> Result<(), bonsaidb_core::Error> {
283
2
        self.send_api_request(&AlterUserRoleMembership {
284
2
            user: user.name()?.into_owned(),
285
2
            role: role.name()?.into_owned(),
286
            should_be_member: true,
287
        })?;
288
2
        Ok(())
289
2
    }
290

            
291
2
    fn remove_role_from_user<
292
2
        'user,
293
2
        'role,
294
2
        U: bonsaidb_core::schema::Nameable<'user, u64> + Send + Sync,
295
2
        R: bonsaidb_core::schema::Nameable<'role, u64> + Send + Sync,
296
2
    >(
297
2
        &self,
298
2
        user: U,
299
2
        role: R,
300
2
    ) -> Result<(), bonsaidb_core::Error> {
301
2
        self.send_api_request(&AlterUserRoleMembership {
302
2
            user: user.name()?.into_owned(),
303
2
            role: role.name()?.into_owned(),
304
            should_be_member: false,
305
        })?;
306
2
        Ok(())
307
2
    }
308
}
309

            
310
impl HasSession for BlockingClient {
311
72
    fn session(&self) -> Option<&bonsaidb_core::connection::Session> {
312
72
        self.0.session()
313
72
    }
314
}
315

            
316
/// A remote database that blocks the current thread when performing its
317
/// requests.
318
137
#[derive(Debug, Clone)]
319
pub struct BlockingRemoteDatabase(AsyncRemoteDatabase);
320

            
321
impl Connection for BlockingRemoteDatabase {
322
    type Storage = BlockingClient;
323

            
324
    fn storage(&self) -> Self::Storage {
325
        BlockingClient(self.0.client.clone())
326
    }
327

            
328
324
    fn list_executed_transactions(
329
324
        &self,
330
324
        starting_id: Option<u64>,
331
324
        result_limit: Option<u32>,
332
324
    ) -> Result<Vec<bonsaidb_core::transaction::Executed>, bonsaidb_core::Error> {
333
324
        Ok(self
334
324
            .0
335
324
            .client
336
324
            .send_blocking_api_request(&ListExecutedTransactions {
337
324
                database: self.0.name.to_string(),
338
324
                starting_id,
339
324
                result_limit,
340
324
            })?)
341
324
    }
342

            
343
18
    fn last_transaction_id(&self) -> Result<Option<u64>, bonsaidb_core::Error> {
344
18
        Ok(self
345
18
            .0
346
18
            .client
347
18
            .send_blocking_api_request(&LastTransactionId {
348
18
                database: self.0.name.to_string(),
349
18
            })?)
350
18
    }
351

            
352
    fn compact(&self) -> Result<(), bonsaidb_core::Error> {
353
18
        self.0.send_blocking_api_request(&Compact {
354
18
            database: self.0.name.to_string(),
355
18
        })?;
356
18
        Ok(())
357
18
    }
358

            
359
    fn compact_key_value_store(&self) -> Result<(), bonsaidb_core::Error> {
360
18
        self.0.send_blocking_api_request(&CompactKeyValueStore {
361
18
            database: self.0.name.to_string(),
362
18
        })?;
363
18
        Ok(())
364
18
    }
365
}
366

            
367
impl LowLevelConnection for BlockingRemoteDatabase {
368
19242
    fn apply_transaction(
369
19242
        &self,
370
19242
        transaction: bonsaidb_core::transaction::Transaction,
371
19242
    ) -> Result<Vec<bonsaidb_core::transaction::OperationResult>, bonsaidb_core::Error> {
372
19242
        Ok(self.0.client.send_blocking_api_request(&ApplyTransaction {
373
19242
            database: self.0.name.to_string(),
374
19242
            transaction,
375
19242
        })?)
376
19242
    }
377

            
378
324
    fn get_from_collection(
379
324
        &self,
380
324
        id: bonsaidb_core::document::DocumentId,
381
324
        collection: &CollectionName,
382
324
    ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error> {
383
324
        Ok(self.0.client.send_blocking_api_request(&Get {
384
324
            database: self.0.name.to_string(),
385
324
            collection: collection.clone(),
386
324
            id,
387
324
        })?)
388
324
    }
389

            
390
144
    fn get_multiple_from_collection(
391
144
        &self,
392
144
        ids: &[bonsaidb_core::document::DocumentId],
393
144
        collection: &CollectionName,
394
144
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
395
144
        Ok(self.0.client.send_blocking_api_request(&GetMultiple {
396
144
            database: self.0.name.to_string(),
397
144
            collection: collection.clone(),
398
144
            ids: ids.to_vec(),
399
144
        })?)
400
144
    }
401

            
402
72
    fn list_from_collection(
403
72
        &self,
404
72
        ids: Range<bonsaidb_core::document::DocumentId>,
405
72
        order: Sort,
406
72
        limit: Option<u32>,
407
72
        collection: &CollectionName,
408
72
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
409
72
        Ok(self.0.client.send_blocking_api_request(&List {
410
72
            database: self.0.name.to_string(),
411
72
            collection: collection.clone(),
412
72
            ids,
413
72
            order,
414
72
            limit,
415
72
        })?)
416
72
    }
417

            
418
18
    fn list_headers_from_collection(
419
18
        &self,
420
18
        ids: Range<DocumentId>,
421
18
        order: Sort,
422
18
        limit: Option<u32>,
423
18
        collection: &CollectionName,
424
18
    ) -> Result<Vec<Header>, bonsaidb_core::Error> {
425
18
        Ok(self.0.client.send_blocking_api_request(&ListHeaders(List {
426
18
            database: self.0.name.to_string(),
427
18
            collection: collection.clone(),
428
18
            ids,
429
18
            order,
430
18
            limit,
431
18
        }))?)
432
18
    }
433

            
434
36
    fn count_from_collection(
435
36
        &self,
436
36
        ids: Range<bonsaidb_core::document::DocumentId>,
437
36
        collection: &CollectionName,
438
36
    ) -> Result<u64, bonsaidb_core::Error> {
439
36
        Ok(self.0.client.send_blocking_api_request(&Count {
440
36
            database: self.0.name.to_string(),
441
36
            collection: collection.clone(),
442
36
            ids,
443
36
        })?)
444
36
    }
445

            
446
    fn compact_collection_by_name(
447
        &self,
448
        collection: CollectionName,
449
    ) -> Result<(), bonsaidb_core::Error> {
450
18
        self.0.send_blocking_api_request(&CompactCollection {
451
18
            database: self.0.name.to_string(),
452
18
            name: collection,
453
18
        })?;
454
18
        Ok(())
455
18
    }
456

            
457
702
    fn query_by_name(
458
702
        &self,
459
702
        view: &ViewName,
460
702
        key: Option<SerializedQueryKey>,
461
702
        order: Sort,
462
702
        limit: Option<u32>,
463
702
        access_policy: AccessPolicy,
464
702
    ) -> Result<Vec<map::Serialized>, bonsaidb_core::Error> {
465
702
        Ok(self.0.client.send_blocking_api_request(&Query {
466
702
            database: self.0.name.to_string(),
467
702
            view: view.clone(),
468
702
            key,
469
702
            order,
470
702
            limit,
471
702
            access_policy,
472
702
        })?)
473
702
    }
474

            
475
    fn query_by_name_with_docs(
476
        &self,
477
        view: &bonsaidb_core::schema::ViewName,
478
        key: Option<SerializedQueryKey>,
479
        order: Sort,
480
        limit: Option<u32>,
481
        access_policy: AccessPolicy,
482
    ) -> Result<bonsaidb_core::schema::view::map::MappedSerializedDocuments, bonsaidb_core::Error>
483
    {
484
        Ok(self
485
            .0
486
            .client
487
            .send_blocking_api_request(&QueryWithDocs(Query {
488
                database: self.0.name.to_string(),
489
                view: view.clone(),
490
                key,
491
                order,
492
                limit,
493
                access_policy,
494
            }))?)
495
    }
496

            
497
126
    fn reduce_by_name(
498
126
        &self,
499
126
        view: &bonsaidb_core::schema::ViewName,
500
126
        key: Option<SerializedQueryKey>,
501
126
        access_policy: AccessPolicy,
502
126
    ) -> Result<Vec<u8>, bonsaidb_core::Error> {
503
126
        Ok(self
504
126
            .0
505
126
            .client
506
126
            .send_blocking_api_request(&Reduce {
507
126
                database: self.0.name.to_string(),
508
126
                view: view.clone(),
509
126
                key,
510
126
                access_policy,
511
126
            })?
512
108
            .into_vec())
513
126
    }
514

            
515
36
    fn reduce_grouped_by_name(
516
36
        &self,
517
36
        view: &bonsaidb_core::schema::ViewName,
518
36
        key: Option<SerializedQueryKey>,
519
36
        access_policy: AccessPolicy,
520
36
    ) -> Result<Vec<bonsaidb_core::schema::view::map::MappedSerializedValue>, bonsaidb_core::Error>
521
36
    {
522
36
        Ok(self
523
36
            .0
524
36
            .client
525
36
            .send_blocking_api_request(&ReduceGrouped(Reduce {
526
36
                database: self.0.name.to_string(),
527
36
                view: view.clone(),
528
36
                key,
529
36
                access_policy,
530
36
            }))?)
531
36
    }
532

            
533
36
    fn delete_docs_by_name(
534
36
        &self,
535
36
        view: &bonsaidb_core::schema::ViewName,
536
36
        key: Option<SerializedQueryKey>,
537
36
        access_policy: AccessPolicy,
538
36
    ) -> Result<u64, bonsaidb_core::Error> {
539
36
        Ok(self.0.client.send_blocking_api_request(&DeleteDocs {
540
36
            database: self.0.name.to_string(),
541
36
            view: view.clone(),
542
36
            key,
543
36
            access_policy,
544
36
        })?)
545
36
    }
546
}
547

            
548
impl HasSession for BlockingRemoteDatabase {
549
    fn session(&self) -> Option<&bonsaidb_core::connection::Session> {
550
        self.0.session()
551
    }
552
}
553

            
554
impl HasSchema for BlockingRemoteDatabase {
555
900
    fn schematic(&self) -> &bonsaidb_core::schema::Schematic {
556
900
        self.0.schematic()
557
900
    }
558
}
559

            
560
impl PubSub for BlockingRemoteDatabase {
561
    type Subscriber = BlockingRemoteSubscriber;
562

            
563
144
    fn create_subscriber(&self) -> Result<Self::Subscriber, bonsaidb_core::Error> {
564
144
        let subscriber_id = self.0.client.send_blocking_api_request(&CreateSubscriber {
565
144
            database: self.0.name.to_string(),
566
144
        })?;
567

            
568
144
        let (sender, receiver) = flume::unbounded();
569
144
        self.0.client.register_subscriber(subscriber_id, sender);
570
144
        Ok(BlockingRemoteSubscriber(AsyncRemoteSubscriber {
571
144
            client: self.0.client.clone(),
572
144
            database: self.0.name.clone(),
573
144
            id: subscriber_id,
574
144
            receiver: Receiver::new(receiver),
575
144
            tokio: None,
576
144
        }))
577
144
    }
578

            
579
    fn publish_bytes(&self, topic: Vec<u8>, payload: Vec<u8>) -> Result<(), bonsaidb_core::Error> {
580
180
        self.0.client.send_blocking_api_request(&Publish {
581
180
            database: self.0.name.to_string(),
582
180
            topic: Bytes::from(topic),
583
180
            payload: Bytes::from(payload),
584
180
        })?;
585
180
        Ok(())
586
180
    }
587

            
588
1
    fn publish_bytes_to_all(
589
1
        &self,
590
1
        topics: impl IntoIterator<Item = Vec<u8>> + Send,
591
1
        payload: Vec<u8>,
592
1
    ) -> Result<(), bonsaidb_core::Error> {
593
1
        let topics = topics.into_iter().map(Bytes::from).collect();
594
1
        self.0.client.send_blocking_api_request(&PublishToAll {
595
1
            database: self.0.name.to_string(),
596
1
            topics,
597
1
            payload: Bytes::from(payload),
598
1
        })?;
599
1
        Ok(())
600
1
    }
601
}
602

            
603
/// A remote PubSub [`Subscriber`] that blocks the current thread when
604
/// performing requests.
605
#[derive(Debug)]
606
pub struct BlockingRemoteSubscriber(AsyncRemoteSubscriber);
607

            
608
impl Subscriber for BlockingRemoteSubscriber {
609
    fn subscribe_to_bytes(&self, topic: Vec<u8>) -> Result<(), bonsaidb_core::Error> {
610
234
        self.0.client.send_blocking_api_request(&SubscribeTo {
611
234
            database: self.0.database.to_string(),
612
234
            subscriber_id: self.0.id,
613
234
            topic: Bytes::from(topic),
614
234
        })?;
615
234
        Ok(())
616
234
    }
617

            
618
    fn unsubscribe_from_bytes(&self, topic: &[u8]) -> Result<(), bonsaidb_core::Error> {
619
18
        self.0.client.send_blocking_api_request(&UnsubscribeFrom {
620
18
            database: self.0.database.to_string(),
621
18
            subscriber_id: self.0.id,
622
18
            topic: Bytes::from(topic),
623
18
        })?;
624
18
        Ok(())
625
18
    }
626

            
627
324
    fn receiver(&self) -> &Receiver {
628
324
        AsyncSubscriber::receiver(&self.0)
629
324
    }
630
}
631

            
632
impl KeyValue for BlockingRemoteDatabase {
633
181782
    fn execute_key_operation(
634
181782
        &self,
635
181782
        op: bonsaidb_core::keyvalue::KeyOperation,
636
181782
    ) -> Result<bonsaidb_core::keyvalue::Output, bonsaidb_core::Error> {
637
181782
        Ok(self
638
181782
            .0
639
181782
            .client
640
181782
            .send_blocking_api_request(&ExecuteKeyOperation {
641
181782
                database: self.0.name.to_string(),
642
181782

            
643
181782
                op,
644
181782
            })?)
645
181782
    }
646
}
647

            
648
pub enum Tokio {
649
    Runtime(Runtime),
650
    Handle(Handle),
651
}
652

            
653
impl Tokio {
654
3114
    pub fn spawn<F: Future<Output = Result<(), crate::Error>> + Send + 'static>(
655
3114
        self,
656
3114
        task: F,
657
3114
    ) -> JoinHandle<Result<(), crate::Error>> {
658
3114
        match self {
659
720
            Self::Runtime(tokio) => {
660
720
                // When we have an owned runtime, we must have a thread driving
661
720
                // the runtime. To keep the interface to `Client` simple, we are
662
720
                // going to spawn the task and let the main block_on task simply
663
720
                // wait for the completion event. If the JoinHandle is
664
720
                // cancelled, the sender will be dropped and everything will
665
720
                // clean up.
666
720
                let (completion_sender, completion_receiver) = oneshot::channel();
667
720
                let task = async move {
668
205884
                    task.await?;
669
720
                    let _: Result<_, _> = completion_sender.send(());
670
720
                    Ok(())
671
720
                };
672
720
                let task = tokio.spawn(task);
673
720

            
674
720
                std::thread::spawn(move || {
675
720
                    tokio.block_on(async move {
676
720
                        let _: Result<_, _> = completion_receiver.await;
677
720
                    });
678
720
                });
679
720
                task
680
            }
681
2394
            Self::Handle(tokio) => tokio.spawn(task),
682
        }
683
3114
    }
684
}
685

            
686
3114
pub fn spawn_client<F: Future<Output = Result<(), crate::Error>> + Send + 'static>(
687
3114
    task: F,
688
3114
    handle: Option<Handle>,
689
3114
) -> JoinHandle<Result<(), crate::Error>> {
690
    // We need to spawn a runtime or
691
3114
    let tokio = if let Some(handle) = handle {
692
2394
        Tokio::Handle(handle)
693
    } else {
694
720
        Tokio::Runtime(Runtime::new().unwrap())
695
    };
696
3114
    tokio.spawn(task)
697
3114
}