1
use std::collections::HashMap;
2
use std::sync::atomic::Ordering;
3
use std::sync::Arc;
4
use std::time::Duration;
5

            
6
use bonsaidb_core::admin::{Admin, ADMIN_DATABASE_NAME};
7
use bonsaidb_core::api;
8
use bonsaidb_core::arc_bytes::serde::Bytes;
9
use bonsaidb_core::connection::{
10
    AccessPolicy, Connection, Database, HasSchema, HasSession, IdentityReference,
11
    LowLevelConnection, Range, SerializedQueryKey, Sort, StorageConnection,
12
};
13
use bonsaidb_core::document::{DocumentId, Header, OwnedDocument};
14
use bonsaidb_core::keyvalue::KeyValue;
15
use bonsaidb_core::networking::{
16
    AlterUserPermissionGroupMembership, AlterUserRoleMembership, ApplyTransaction, AssumeIdentity,
17
    Compact, CompactCollection, CompactKeyValueStore, Count, CreateDatabase, CreateSubscriber,
18
    CreateUser, DeleteDatabase, DeleteDocs, DeleteUser, ExecuteKeyOperation, Get, GetMultiple,
19
    LastTransactionId, List, ListAvailableSchemas, ListDatabases, ListExecutedTransactions,
20
    ListHeaders, Publish, PublishToAll, Query, QueryWithDocs, Reduce, ReduceGrouped, SubscribeTo,
21
    UnsubscribeFrom, CURRENT_PROTOCOL_VERSION,
22
};
23
use bonsaidb_core::pubsub::{AsyncSubscriber, PubSub, Receiver, Subscriber};
24
use bonsaidb_core::schema::view::map;
25
use bonsaidb_core::schema::{CollectionName, ViewName};
26
use futures::Future;
27
use tokio::runtime::{Handle, Runtime};
28
use tokio::sync::oneshot;
29
use tokio::task::JoinHandle;
30
use url::Url;
31

            
32
use crate::builder::Blocking;
33
use crate::client::ClientSession;
34
use crate::{ApiError, AsyncClient, AsyncRemoteDatabase, AsyncRemoteSubscriber, Builder, Error};
35

            
36
/// A BonsaiDb client that blocks the current thread when performing requests.
37
#[derive(Debug, Clone)]
38
pub struct BlockingClient(pub(crate) AsyncClient);
39

            
40
impl BlockingClient {
41
    /// Returns a builder for a new client connecting to `url`.
42
120
    pub fn build(url: Url) -> Builder<Blocking> {
43
120
        Builder::new(url)
44
120
    }
45

            
46
    /// Initialize a client connecting to `url`. This client can be shared by
47
    /// cloning it. All requests are done asynchronously over the same
48
    /// connection.
49
    ///
50
    /// If the client has an error connecting, the first request made will
51
    /// present that error. If the client disconnects while processing requests,
52
    /// all requests being processed will exit and return
53
    /// [`Error::Disconnected`](bonsaidb_core::networking::Error::Disconnected).
54
    /// The client will automatically try reconnecting.
55
    ///
56
    /// The goal of this design of this reconnection strategy is to make it
57
    /// easier to build resilliant apps. By allowing existing Client instances
58
    /// to recover and reconnect, each component of the apps built can adopt a
59
    /// "retry-to-recover" design, or "abort-and-fail" depending on how critical
60
    /// the database is to operation.
61
1080
    pub fn new(url: Url) -> Result<Self, Error> {
62
1080
        AsyncClient::new_from_parts(
63
1080
            url,
64
1080
            CURRENT_PROTOCOL_VERSION,
65
1080
            HashMap::default(),
66
1080
            None,
67
1080
            None,
68
1080
            #[cfg(not(target_arch = "wasm32"))]
69
1080
            None,
70
1080
            #[cfg(not(target_arch = "wasm32"))]
71
1080
            Handle::try_current().ok(),
72
1080
        )
73
1080
        .map(Self)
74
1080
    }
75

            
76
    /// Sends an api `request`.
77
1601
    pub fn send_api_request<Api: api::Api>(
78
1601
        &self,
79
1601
        request: &Api,
80
1601
    ) -> Result<Api::Response, ApiError<Api::Error>> {
81
1601
        self.0.send_blocking_api_request(request)
82
1601
    }
83

            
84
    /// Sends an api `request` without waiting for a result. The response from
85
    /// the server will be ignored.
86
    pub fn invoke_api_request<Api: api::Api>(&self, request: &Api) -> Result<(), Error> {
87
        let request = Bytes::from(pot::to_vec(request).map_err(Error::from)?);
88
        self.0
89
            .send_request_without_confirmation(Api::name(), request)
90
            .map(|_| ())
91
    }
92

            
93
    /// Returns a reference to an async-compatible version of this client.
94
    #[must_use]
95
    pub fn as_async(&self) -> &AsyncClient {
96
        &self.0
97
    }
98

            
99
    /// Sets this instance's request timeout.
100
    ///
101
    /// Each client has its own timeout. When cloning a client, this timeout
102
    /// setting will be copied to the clone.
103
    pub fn set_request_timeout(&mut self, timeout: impl Into<Duration>) {
104
        self.0.request_timeout = timeout.into();
105
    }
106
}
107

            
108
impl From<AsyncClient> for BlockingClient {
109
    fn from(client: AsyncClient) -> Self {
110
        Self(client)
111
    }
112
}
113

            
114
impl From<BlockingClient> for AsyncClient {
115
    fn from(client: BlockingClient) -> Self {
116
        client.0
117
    }
118
}
119

            
120
impl StorageConnection for BlockingClient {
121
    type Authenticated = Self;
122
    type Database = BlockingRemoteDatabase;
123

            
124
    fn admin(&self) -> Self::Database {
125
        BlockingRemoteDatabase(
126
            self.0
127
                .remote_database::<Admin>(ADMIN_DATABASE_NAME)
128
                .unwrap(),
129
        )
130
    }
131

            
132
76
    fn database<DB: bonsaidb_core::schema::Schema>(
133
76
        &self,
134
76
        name: &str,
135
76
    ) -> Result<Self::Database, bonsaidb_core::Error> {
136
76
        self.0
137
76
            .remote_database::<DB>(name)
138
76
            .map(BlockingRemoteDatabase)
139
76
    }
140

            
141
1230
    fn create_database_with_schema(
142
1230
        &self,
143
1230
        name: &str,
144
1230
        schema: bonsaidb_core::schema::SchemaName,
145
1230
        only_if_needed: bool,
146
1230
    ) -> Result<(), bonsaidb_core::Error> {
147
1230
        self.send_api_request(&CreateDatabase {
148
1230
            database: Database {
149
1230
                name: name.to_string(),
150
1230
                schema,
151
1230
            },
152
1230
            only_if_needed,
153
1230
        })?;
154
1140
        Ok(())
155
1230
    }
156

            
157
60
    fn delete_database(&self, name: &str) -> Result<(), bonsaidb_core::Error> {
158
60
        self.send_api_request(&DeleteDatabase {
159
60
            name: name.to_string(),
160
60
        })?;
161
30
        Ok(())
162
60
    }
163

            
164
90
    fn list_databases(
165
90
        &self,
166
90
    ) -> Result<Vec<bonsaidb_core::connection::Database>, bonsaidb_core::Error> {
167
90
        Ok(self.send_api_request(&ListDatabases)?)
168
90
    }
169

            
170
30
    fn list_available_schemas(
171
30
        &self,
172
30
    ) -> Result<Vec<bonsaidb_core::schema::SchemaSummary>, bonsaidb_core::Error> {
173
30
        Ok(self.send_api_request(&ListAvailableSchemas)?)
174
30
    }
175

            
176
60
    fn create_user(&self, username: &str) -> Result<u64, bonsaidb_core::Error> {
177
60
        Ok(self.send_api_request(&CreateUser {
178
60
            username: username.to_string(),
179
60
        })?)
180
60
    }
181

            
182
1
    fn delete_user<'user, U: bonsaidb_core::schema::Nameable<'user, u64> + Send + Sync>(
183
1
        &self,
184
1
        user: U,
185
1
    ) -> Result<(), bonsaidb_core::Error> {
186
1
        Ok(self.send_api_request(&DeleteUser {
187
1
            user: user.name()?.into_owned(),
188
        })?)
189
1
    }
190

            
191
    #[cfg(feature = "password-hashing")]
192
    fn set_user_password<'user, U: bonsaidb_core::schema::Nameable<'user, u64> + Send + Sync>(
193
        &self,
194
        user: U,
195
        password: bonsaidb_core::connection::SensitiveString,
196
    ) -> Result<(), bonsaidb_core::Error> {
197
        use bonsaidb_core::networking::SetUserPassword;
198

            
199
        Ok(self.send_api_request(&SetUserPassword {
200
            user: user.name()?.into_owned(),
201
            password,
202
        })?)
203
    }
204

            
205
    #[cfg(any(feature = "token-authentication", feature = "password-hashing"))]
206
120
    fn authenticate(
207
120
        &self,
208
120
        authentication: bonsaidb_core::connection::Authentication,
209
120
    ) -> Result<Self::Authenticated, bonsaidb_core::Error> {
210
120
        let session =
211
120
            self.send_api_request(&bonsaidb_core::networking::Authenticate { authentication })?;
212
120
        Ok(Self(AsyncClient {
213
120
            data: self.0.data.clone(),
214
120
            session: ClientSession {
215
120
                session: Arc::new(session),
216
120
                connection_id: self.0.data.connection_counter.load(Ordering::SeqCst),
217
120
            },
218
120
            request_timeout: self.0.request_timeout,
219
120
        }))
220
120
    }
221

            
222
    fn assume_identity(
223
        &self,
224
        identity: IdentityReference<'_>,
225
    ) -> Result<Self::Authenticated, bonsaidb_core::Error> {
226
        let session = self.send_api_request(&AssumeIdentity(identity.into_owned()))?;
227
        Ok(Self(AsyncClient {
228
            data: self.0.data.clone(),
229
            session: ClientSession {
230
                session: Arc::new(session),
231
                connection_id: self.0.data.connection_counter.load(Ordering::SeqCst),
232
            },
233
            request_timeout: self.0.request_timeout,
234
        }))
235
    }
236

            
237
2
    fn add_permission_group_to_user<
238
2
        'user,
239
2
        'group,
240
2
        U: bonsaidb_core::schema::Nameable<'user, u64> + Send + Sync,
241
2
        G: bonsaidb_core::schema::Nameable<'group, u64> + Send + Sync,
242
2
    >(
243
2
        &self,
244
2
        user: U,
245
2
        permission_group: G,
246
2
    ) -> Result<(), bonsaidb_core::Error> {
247
2
        self.send_api_request(&AlterUserPermissionGroupMembership {
248
2
            user: user.name()?.into_owned(),
249
2
            group: permission_group.name()?.into_owned(),
250
            should_be_member: true,
251
        })?;
252
2
        Ok(())
253
2
    }
254

            
255
2
    fn remove_permission_group_from_user<
256
2
        'user,
257
2
        'group,
258
2
        U: bonsaidb_core::schema::Nameable<'user, u64> + Send + Sync,
259
2
        G: bonsaidb_core::schema::Nameable<'group, u64> + Send + Sync,
260
2
    >(
261
2
        &self,
262
2
        user: U,
263
2
        permission_group: G,
264
2
    ) -> Result<(), bonsaidb_core::Error> {
265
2
        self.send_api_request(&AlterUserPermissionGroupMembership {
266
2
            user: user.name()?.into_owned(),
267
2
            group: permission_group.name()?.into_owned(),
268
            should_be_member: false,
269
        })?;
270
2
        Ok(())
271
2
    }
272

            
273
2
    fn add_role_to_user<
274
2
        'user,
275
2
        'role,
276
2
        U: bonsaidb_core::schema::Nameable<'user, u64> + Send + Sync,
277
2
        R: bonsaidb_core::schema::Nameable<'role, u64> + Send + Sync,
278
2
    >(
279
2
        &self,
280
2
        user: U,
281
2
        role: R,
282
2
    ) -> Result<(), bonsaidb_core::Error> {
283
2
        self.send_api_request(&AlterUserRoleMembership {
284
2
            user: user.name()?.into_owned(),
285
2
            role: role.name()?.into_owned(),
286
            should_be_member: true,
287
        })?;
288
2
        Ok(())
289
2
    }
290

            
291
2
    fn remove_role_from_user<
292
2
        'user,
293
2
        'role,
294
2
        U: bonsaidb_core::schema::Nameable<'user, u64> + Send + Sync,
295
2
        R: bonsaidb_core::schema::Nameable<'role, u64> + Send + Sync,
296
2
    >(
297
2
        &self,
298
2
        user: U,
299
2
        role: R,
300
2
    ) -> Result<(), bonsaidb_core::Error> {
301
2
        self.send_api_request(&AlterUserRoleMembership {
302
2
            user: user.name()?.into_owned(),
303
2
            role: role.name()?.into_owned(),
304
            should_be_member: false,
305
        })?;
306
2
        Ok(())
307
2
    }
308
}
309

            
310
impl HasSession for BlockingClient {
311
120
    fn session(&self) -> Option<&bonsaidb_core::connection::Session> {
312
120
        self.0.session()
313
120
    }
314
}
315

            
316
/// A remote database that blocks the current thread when performing its
317
/// requests.
318
139
#[derive(Debug, Clone)]
319
pub struct BlockingRemoteDatabase(AsyncRemoteDatabase);
320

            
321
impl Connection for BlockingRemoteDatabase {
322
    type Storage = BlockingClient;
323

            
324
    fn storage(&self) -> Self::Storage {
325
        BlockingClient(self.0.client.clone())
326
    }
327

            
328
540
    fn list_executed_transactions(
329
540
        &self,
330
540
        starting_id: Option<u64>,
331
540
        result_limit: Option<u32>,
332
540
    ) -> Result<Vec<bonsaidb_core::transaction::Executed>, bonsaidb_core::Error> {
333
540
        Ok(self
334
540
            .0
335
540
            .client
336
540
            .send_blocking_api_request(&ListExecutedTransactions {
337
540
                database: self.0.name.to_string(),
338
540
                starting_id,
339
540
                result_limit,
340
540
            })?)
341
540
    }
342

            
343
30
    fn last_transaction_id(&self) -> Result<Option<u64>, bonsaidb_core::Error> {
344
30
        Ok(self
345
30
            .0
346
30
            .client
347
30
            .send_blocking_api_request(&LastTransactionId {
348
30
                database: self.0.name.to_string(),
349
30
            })?)
350
30
    }
351

            
352
30
    fn compact(&self) -> Result<(), bonsaidb_core::Error> {
353
30
        self.0.send_blocking_api_request(&Compact {
354
30
            database: self.0.name.to_string(),
355
30
        })?;
356
30
        Ok(())
357
30
    }
358

            
359
30
    fn compact_key_value_store(&self) -> Result<(), bonsaidb_core::Error> {
360
30
        self.0.send_blocking_api_request(&CompactKeyValueStore {
361
30
            database: self.0.name.to_string(),
362
30
        })?;
363
30
        Ok(())
364
30
    }
365
}
366

            
367
impl LowLevelConnection for BlockingRemoteDatabase {
368
32070
    fn apply_transaction(
369
32070
        &self,
370
32070
        transaction: bonsaidb_core::transaction::Transaction,
371
32070
    ) -> Result<Vec<bonsaidb_core::transaction::OperationResult>, bonsaidb_core::Error> {
372
32070
        Ok(self.0.client.send_blocking_api_request(&ApplyTransaction {
373
32070
            database: self.0.name.to_string(),
374
32070
            transaction,
375
32070
        })?)
376
32070
    }
377

            
378
540
    fn get_from_collection(
379
540
        &self,
380
540
        id: bonsaidb_core::document::DocumentId,
381
540
        collection: &CollectionName,
382
540
    ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error> {
383
540
        Ok(self.0.client.send_blocking_api_request(&Get {
384
540
            database: self.0.name.to_string(),
385
540
            collection: collection.clone(),
386
540
            id,
387
540
        })?)
388
540
    }
389

            
390
240
    fn get_multiple_from_collection(
391
240
        &self,
392
240
        ids: &[bonsaidb_core::document::DocumentId],
393
240
        collection: &CollectionName,
394
240
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
395
240
        Ok(self.0.client.send_blocking_api_request(&GetMultiple {
396
240
            database: self.0.name.to_string(),
397
240
            collection: collection.clone(),
398
240
            ids: ids.to_vec(),
399
240
        })?)
400
240
    }
401

            
402
120
    fn list_from_collection(
403
120
        &self,
404
120
        ids: Range<bonsaidb_core::document::DocumentId>,
405
120
        order: Sort,
406
120
        limit: Option<u32>,
407
120
        collection: &CollectionName,
408
120
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
409
120
        Ok(self.0.client.send_blocking_api_request(&List {
410
120
            database: self.0.name.to_string(),
411
120
            collection: collection.clone(),
412
120
            ids,
413
120
            order,
414
120
            limit,
415
120
        })?)
416
120
    }
417

            
418
30
    fn list_headers_from_collection(
419
30
        &self,
420
30
        ids: Range<DocumentId>,
421
30
        order: Sort,
422
30
        limit: Option<u32>,
423
30
        collection: &CollectionName,
424
30
    ) -> Result<Vec<Header>, bonsaidb_core::Error> {
425
30
        Ok(self.0.client.send_blocking_api_request(&ListHeaders(List {
426
30
            database: self.0.name.to_string(),
427
30
            collection: collection.clone(),
428
30
            ids,
429
30
            order,
430
30
            limit,
431
30
        }))?)
432
30
    }
433

            
434
60
    fn count_from_collection(
435
60
        &self,
436
60
        ids: Range<bonsaidb_core::document::DocumentId>,
437
60
        collection: &CollectionName,
438
60
    ) -> Result<u64, bonsaidb_core::Error> {
439
60
        Ok(self.0.client.send_blocking_api_request(&Count {
440
60
            database: self.0.name.to_string(),
441
60
            collection: collection.clone(),
442
60
            ids,
443
60
        })?)
444
60
    }
445

            
446
30
    fn compact_collection_by_name(
447
30
        &self,
448
30
        collection: CollectionName,
449
30
    ) -> Result<(), bonsaidb_core::Error> {
450
30
        self.0.send_blocking_api_request(&CompactCollection {
451
30
            database: self.0.name.to_string(),
452
30
            name: collection,
453
30
        })?;
454
30
        Ok(())
455
30
    }
456

            
457
1170
    fn query_by_name(
458
1170
        &self,
459
1170
        view: &ViewName,
460
1170
        key: Option<SerializedQueryKey>,
461
1170
        order: Sort,
462
1170
        limit: Option<u32>,
463
1170
        access_policy: AccessPolicy,
464
1170
    ) -> Result<Vec<map::Serialized>, bonsaidb_core::Error> {
465
1170
        Ok(self.0.client.send_blocking_api_request(&Query {
466
1170
            database: self.0.name.to_string(),
467
1170
            view: view.clone(),
468
1170
            key,
469
1170
            order,
470
1170
            limit,
471
1170
            access_policy,
472
1170
        })?)
473
1170
    }
474

            
475
    fn query_by_name_with_docs(
476
        &self,
477
        view: &bonsaidb_core::schema::ViewName,
478
        key: Option<SerializedQueryKey>,
479
        order: Sort,
480
        limit: Option<u32>,
481
        access_policy: AccessPolicy,
482
    ) -> Result<bonsaidb_core::schema::view::map::MappedSerializedDocuments, bonsaidb_core::Error>
483
    {
484
        Ok(self
485
            .0
486
            .client
487
            .send_blocking_api_request(&QueryWithDocs(Query {
488
                database: self.0.name.to_string(),
489
                view: view.clone(),
490
                key,
491
                order,
492
                limit,
493
                access_policy,
494
            }))?)
495
    }
496

            
497
210
    fn reduce_by_name(
498
210
        &self,
499
210
        view: &bonsaidb_core::schema::ViewName,
500
210
        key: Option<SerializedQueryKey>,
501
210
        access_policy: AccessPolicy,
502
210
    ) -> Result<Vec<u8>, bonsaidb_core::Error> {
503
210
        Ok(self
504
210
            .0
505
210
            .client
506
210
            .send_blocking_api_request(&Reduce {
507
210
                database: self.0.name.to_string(),
508
210
                view: view.clone(),
509
210
                key,
510
210
                access_policy,
511
210
            })?
512
180
            .into_vec())
513
210
    }
514

            
515
60
    fn reduce_grouped_by_name(
516
60
        &self,
517
60
        view: &bonsaidb_core::schema::ViewName,
518
60
        key: Option<SerializedQueryKey>,
519
60
        access_policy: AccessPolicy,
520
60
    ) -> Result<Vec<bonsaidb_core::schema::view::map::MappedSerializedValue>, bonsaidb_core::Error>
521
60
    {
522
60
        Ok(self
523
60
            .0
524
60
            .client
525
60
            .send_blocking_api_request(&ReduceGrouped(Reduce {
526
60
                database: self.0.name.to_string(),
527
60
                view: view.clone(),
528
60
                key,
529
60
                access_policy,
530
60
            }))?)
531
60
    }
532

            
533
60
    fn delete_docs_by_name(
534
60
        &self,
535
60
        view: &bonsaidb_core::schema::ViewName,
536
60
        key: Option<SerializedQueryKey>,
537
60
        access_policy: AccessPolicy,
538
60
    ) -> Result<u64, bonsaidb_core::Error> {
539
60
        Ok(self.0.client.send_blocking_api_request(&DeleteDocs {
540
60
            database: self.0.name.to_string(),
541
60
            view: view.clone(),
542
60
            key,
543
60
            access_policy,
544
60
        })?)
545
60
    }
546
}
547

            
548
impl HasSession for BlockingRemoteDatabase {
549
    fn session(&self) -> Option<&bonsaidb_core::connection::Session> {
550
        self.0.session()
551
    }
552
}
553

            
554
impl HasSchema for BlockingRemoteDatabase {
555
1500
    fn schematic(&self) -> &bonsaidb_core::schema::Schematic {
556
1500
        self.0.schematic()
557
1500
    }
558
}
559

            
560
impl PubSub for BlockingRemoteDatabase {
561
    type Subscriber = BlockingRemoteSubscriber;
562

            
563
240
    fn create_subscriber(&self) -> Result<Self::Subscriber, bonsaidb_core::Error> {
564
240
        let subscriber_id = self.0.client.send_blocking_api_request(&CreateSubscriber {
565
240
            database: self.0.name.to_string(),
566
240
        })?;
567

            
568
240
        let (sender, receiver) = flume::unbounded();
569
240
        self.0.client.register_subscriber(subscriber_id, sender);
570
240
        Ok(BlockingRemoteSubscriber(AsyncRemoteSubscriber {
571
240
            client: self.0.client.clone(),
572
240
            database: self.0.name.clone(),
573
240
            id: subscriber_id,
574
240
            receiver: Receiver::new(receiver),
575
240
            tokio: None,
576
240
        }))
577
240
    }
578

            
579
300
    fn publish_bytes(&self, topic: Vec<u8>, payload: Vec<u8>) -> Result<(), bonsaidb_core::Error> {
580
300
        self.0.client.send_blocking_api_request(&Publish {
581
300
            database: self.0.name.to_string(),
582
300
            topic: Bytes::from(topic),
583
300
            payload: Bytes::from(payload),
584
300
        })?;
585
300
        Ok(())
586
300
    }
587

            
588
1
    fn publish_bytes_to_all(
589
1
        &self,
590
1
        topics: impl IntoIterator<Item = Vec<u8>> + Send,
591
1
        payload: Vec<u8>,
592
1
    ) -> Result<(), bonsaidb_core::Error> {
593
1
        let topics = topics.into_iter().map(Bytes::from).collect();
594
1
        self.0.client.send_blocking_api_request(&PublishToAll {
595
1
            database: self.0.name.to_string(),
596
1
            topics,
597
1
            payload: Bytes::from(payload),
598
1
        })?;
599
1
        Ok(())
600
1
    }
601
}
602

            
603
/// A remote PubSub [`Subscriber`] that blocks the current thread when
604
/// performing requests.
605
#[derive(Debug)]
606
pub struct BlockingRemoteSubscriber(AsyncRemoteSubscriber);
607

            
608
impl Subscriber for BlockingRemoteSubscriber {
609
390
    fn subscribe_to_bytes(&self, topic: Vec<u8>) -> Result<(), bonsaidb_core::Error> {
610
390
        self.0.client.send_blocking_api_request(&SubscribeTo {
611
390
            database: self.0.database.to_string(),
612
390
            subscriber_id: self.0.id,
613
390
            topic: Bytes::from(topic),
614
390
        })?;
615
390
        Ok(())
616
390
    }
617

            
618
30
    fn unsubscribe_from_bytes(&self, topic: &[u8]) -> Result<(), bonsaidb_core::Error> {
619
30
        self.0.client.send_blocking_api_request(&UnsubscribeFrom {
620
30
            database: self.0.database.to_string(),
621
30
            subscriber_id: self.0.id,
622
30
            topic: Bytes::from(topic),
623
30
        })?;
624
30
        Ok(())
625
30
    }
626

            
627
540
    fn receiver(&self) -> &Receiver {
628
540
        AsyncSubscriber::receiver(&self.0)
629
540
    }
630
}
631

            
632
impl KeyValue for BlockingRemoteDatabase {
633
302970
    fn execute_key_operation(
634
302970
        &self,
635
302970
        op: bonsaidb_core::keyvalue::KeyOperation,
636
302970
    ) -> Result<bonsaidb_core::keyvalue::Output, bonsaidb_core::Error> {
637
302970
        Ok(self
638
302970
            .0
639
302970
            .client
640
302970
            .send_blocking_api_request(&ExecuteKeyOperation {
641
302970
                database: self.0.name.to_string(),
642
302970

            
643
302970
                op,
644
302970
            })?)
645
302970
    }
646
}
647

            
648
pub enum Tokio {
649
    Runtime(Runtime),
650
    Handle(Handle),
651
}
652

            
653
impl Tokio {
654
5550
    pub fn spawn<F: Future<Output = Result<(), crate::Error>> + Send + 'static>(
655
5550
        self,
656
5550
        task: F,
657
5550
    ) -> JoinHandle<Result<(), crate::Error>> {
658
5550
        match self {
659
1200
            Self::Runtime(tokio) => {
660
1200
                // When we have an owned runtime, we must have a thread driving
661
1200
                // the runtime. To keep the interface to `Client` simple, we are
662
1200
                // going to spawn the task and let the main block_on task simply
663
1200
                // wait for the completion event. If the JoinHandle is
664
1200
                // cancelled, the sender will be dropped and everything will
665
1200
                // clean up.
666
1200
                let (completion_sender, completion_receiver) = oneshot::channel();
667
1200
                let task = async move {
668
251970
                    task.await?;
669
1200
                    let _: Result<_, _> = completion_sender.send(());
670
1200
                    Ok(())
671
1200
                };
672
1200
                let task = tokio.spawn(task);
673
1200

            
674
1200
                std::thread::spawn(move || {
675
1200
                    tokio.block_on(async move {
676
1200
                        let _: Result<_, _> = completion_receiver.await;
677
1200
                    });
678
1200
                });
679
1200
                task
680
            }
681
4350
            Self::Handle(tokio) => tokio.spawn(task),
682
        }
683
5550
    }
684
}
685

            
686
5550
pub fn spawn_client<F: Future<Output = Result<(), crate::Error>> + Send + 'static>(
687
5550
    task: F,
688
5550
    handle: Option<Handle>,
689
5550
) -> JoinHandle<Result<(), crate::Error>> {
690
    // We need to spawn a runtime or
691
5550
    let tokio = if let Some(handle) = handle {
692
4350
        Tokio::Handle(handle)
693
    } else {
694
1200
        Tokio::Runtime(Runtime::new().unwrap())
695
    };
696
5550
    tokio.spawn(task)
697
5550
}