1
#[cfg(feature = "test-util")]
2
use std::sync::atomic::AtomicBool;
3
use std::{
4
    any::TypeId,
5
    collections::HashMap,
6
    fmt::Debug,
7
    ops::Deref,
8
    sync::{
9
        atomic::{AtomicU32, Ordering},
10
        Arc,
11
    },
12
};
13

            
14
use async_trait::async_trait;
15
#[cfg(feature = "password-hashing")]
16
use bonsaidb_core::connection::Authentication;
17
use bonsaidb_core::{
18
    admin::{Admin, ADMIN_DATABASE_NAME},
19
    api::{self, Api},
20
    arc_bytes::{serde::Bytes, OwnedBytes},
21
    connection::{AsyncStorageConnection, Database, HasSession, IdentityReference, Session},
22
    networking::{
23
        AlterUserPermissionGroupMembership, AlterUserRoleMembership, AssumeIdentity,
24
        CreateDatabase, CreateUser, DeleteDatabase, DeleteUser, ListAvailableSchemas,
25
        ListDatabases, LogOutSession, MessageReceived, Payload, UnregisterSubscriber,
26
        CURRENT_PROTOCOL_VERSION,
27
    },
28
    permissions::Permissions,
29
    schema::{ApiName, Nameable, Schema, SchemaName, Schematic},
30
};
31
use bonsaidb_utils::fast_async_lock;
32
use flume::Sender;
33
use futures::{future::BoxFuture, Future, FutureExt};
34
use parking_lot::Mutex;
35
#[cfg(not(target_arch = "wasm32"))]
36
use tokio::{runtime::Handle, task::JoinHandle};
37
use url::Url;
38

            
39
pub use self::remote_database::{RemoteDatabase, RemoteSubscriber};
40
use crate::{error::Error, ApiError, Builder};
41

            
42
#[cfg(not(target_arch = "wasm32"))]
43
mod quic_worker;
44
mod remote_database;
45
#[cfg(not(target_arch = "wasm32"))]
46
mod sync;
47
#[cfg(all(feature = "websockets", not(target_arch = "wasm32")))]
48
mod tungstenite_worker;
49
#[cfg(all(feature = "websockets", target_arch = "wasm32"))]
50
mod wasm_websocket_worker;
51

            
52
6860
#[derive(Debug, Clone, Default)]
53
pub struct SubscriberMap(Arc<Mutex<HashMap<u64, flume::Sender<Message>>>>);
54

            
55
impl SubscriberMap {
56
3100
    pub fn clear(&self) {
57
3100
        let mut data = self.lock();
58
3100
        data.clear();
59
3100
    }
60
}
61

            
62
impl Deref for SubscriberMap {
63
    type Target = Mutex<HashMap<u64, flume::Sender<Message>>>;
64

            
65
4680
    fn deref(&self) -> &Self::Target {
66
4680
        &self.0
67
4680
    }
68
}
69

            
70
use bonsaidb_core::circulate::Message;
71

            
72
#[cfg(all(feature = "websockets", not(target_arch = "wasm32")))]
73
pub type WebSocketError = tokio_tungstenite::tungstenite::Error;
74

            
75
#[cfg(all(feature = "websockets", target_arch = "wasm32"))]
76
pub type WebSocketError = wasm_websocket_worker::WebSocketError;
77

            
78
/// Client for connecting to a BonsaiDb server.
79
///
80
///
81
///
82
/// ## Connecting via QUIC
83
///
84
/// The URL scheme to connect via QUIC is `bonsaidb`. If no port is specified,
85
/// port 5645 is assumed.
86
///
87
/// ### With a valid TLS certificate
88
///
89
/// ```rust
90
/// # use bonsaidb_client::{Client, fabruic::Certificate, url::Url};
91
/// # async fn test_fn() -> anyhow::Result<()> {
92
/// let client = Client::build(Url::parse("bonsaidb://my-server.com")?).finish()?;
93
/// # Ok(())
94
/// # }
95
/// ```
96
///
97
/// ### With a Self-Signed Pinned Certificate
98
///
99
/// When using `install_self_signed_certificate()`, clients will need the
100
/// contents of the `pinned-certificate.der` file within the database. It can be
101
/// specified when building the client:
102
///
103
/// ```rust
104
/// # use bonsaidb_client::{Client, fabruic::Certificate, url::Url};
105
/// # async fn test_fn() -> anyhow::Result<()> {
106
/// let certificate =
107
///     Certificate::from_der(std::fs::read("mydb.bonsaidb/pinned-certificate.der")?)?;
108
/// let client = Client::build(Url::parse("bonsaidb://localhost")?)
109
///     .with_certificate(certificate)
110
///     .finish()?;
111
/// # Ok(())
112
/// # }
113
/// ```
114
///
115
/// ## Connecting via WebSockets
116
///
117
/// WebSockets are built atop the HTTP protocol. There are two URL schemes for
118
/// WebSockets:
119
///
120
/// - `ws`: Insecure WebSockets. Port 80 is assumed if no port is specified.
121
/// - `wss`: Secure WebSockets. Port 443 is assumed if no port is specified.
122
///
123
/// ### Without TLS
124
///
125
/// ```rust
126
/// # use bonsaidb_client::{Client, fabruic::Certificate, url::Url};
127
/// # async fn test_fn() -> anyhow::Result<()> {
128
/// let client = Client::build(Url::parse("ws://localhost")?).finish()?;
129
/// # Ok(())
130
/// # }
131
/// ```
132
///
133
/// ### With TLS
134
///
135
/// ```rust
136
/// # use bonsaidb_client::{Client, fabruic::Certificate, url::Url};
137
/// # async fn test_fn() -> anyhow::Result<()> {
138
/// let client = Client::build(Url::parse("wss://my-server.com")?).finish()?;
139
/// # Ok(())
140
/// # }
141
/// ```
142
///
143
/// ## Using a `Api`
144
///
145
/// Our user guide has a [section on creating and
146
/// using](https://dev.bonsaidb.io/release/guide/about/access-models/custom-api-server.html)
147
/// an [`Api`](api::Api).
148
///
149
/// ```rust
150
/// # use bonsaidb_client::{Client, fabruic::Certificate, url::Url};
151
/// // `bonsaidb_core` is re-exported to `bonsaidb::core` or `bonsaidb_client::core`.
152
/// use bonsaidb_core::{
153
///     api::{Api, Infallible},
154
///     schema::{ApiName, Qualified},
155
/// };
156
/// use serde::{Deserialize, Serialize};
157
///
158
/// #[derive(Serialize, Deserialize, Debug)]
159
/// pub struct Ping;
160
///
161
/// #[derive(Serialize, Deserialize, Clone, Debug)]
162
/// pub struct Pong;
163
///
164
/// impl Api for Ping {
165
///     type Response = Pong;
166
///     type Error = Infallible;
167
///
168
///     fn name() -> ApiName {
169
///         ApiName::private("ping")
170
///     }
171
/// }
172
///
173
/// # async fn test_fn() -> anyhow::Result<()> {
174
/// let client = Client::build(Url::parse("bonsaidb://localhost")?).finish()?;
175
/// let Pong = client.send_api_request_async(&Ping).await?;
176
/// # Ok(())
177
/// # }
178
/// ```
179
///
180
/// ### Receiving out-of-band messages from the server
181
///
182
/// If the server sends a message that isn't in response to a request, the
183
/// client will invoke it's [api callback](Builder::with_api_callback):
184
///
185
/// ```rust
186
/// # use bonsaidb_client::{Client, ApiCallback, fabruic::Certificate, url::Url};
187
/// # // `bonsaidb_core` is re-exported to `bonsaidb::core` or `bonsaidb_client::core`.
188
/// # use bonsaidb_core::{api::{Api, Infallible}, schema::{ApiName, Qualified}};
189
/// # use serde::{Serialize, Deserialize};
190
/// # #[derive(Serialize, Deserialize, Debug)]
191
/// # pub struct Ping;
192
/// # #[derive(Serialize, Deserialize, Clone, Debug)]
193
/// # pub struct Pong;
194
/// # impl Api for Ping {
195
/// #     type Response = Pong;
196
/// #     type Error = Infallible;
197
/// #
198
/// #     fn name() -> ApiName {
199
/// #         ApiName::private("ping")
200
/// #     }
201
/// # }
202
/// # async fn test_fn() -> anyhow::Result<()> {
203
/// let client = Client::build(Url::parse("bonsaidb://localhost")?)
204
///     .with_api_callback(ApiCallback::<Ping>::new(|result: Pong| async move {
205
///         println!("Received out-of-band Pong");
206
///     }))
207
///     .finish()?;
208
/// # Ok(())
209
/// # }
210
/// ```
211
2514
#[derive(Debug, Clone)]
212
pub struct Client {
213
    pub(crate) data: Arc<Data>,
214
    session: Arc<Session>,
215
}
216

            
217
impl Drop for Client {
218
37040
    fn drop(&mut self) {
219
37040
        if Arc::strong_count(&self.session) == 1 {
220
3100
            if let Some(session_id) = self.session.id {
221
140
                // Final reference to an authenticated session
222
140
                drop(self.invoke_api_request(&LogOutSession(session_id)));
223
2960
            }
224
33940
        }
225
37040
    }
226
}
227

            
228
impl PartialEq for Client {
229
    fn eq(&self, other: &Self) -> bool {
230
        Arc::ptr_eq(&self.data, &other.data)
231
    }
232
}
233

            
234
#[derive(Debug)]
235
pub struct Data {
236
    request_sender: Sender<PendingRequest>,
237
    #[cfg(not(target_arch = "wasm32"))]
238
    _worker: CancellableHandle<Result<(), Error>>,
239
    effective_permissions: Mutex<Option<Permissions>>,
240
    schemas: Mutex<HashMap<TypeId, Arc<Schematic>>>,
241
    request_id: AtomicU32,
242
    subscribers: SubscriberMap,
243
    #[cfg(feature = "test-util")]
244
    background_task_running: Arc<AtomicBool>,
245
}
246

            
247
impl Client {
248
    /// Returns a builder for a new client connecting to `url`.
249
1720
    pub fn build(url: Url) -> Builder {
250
1720
        Builder::new(url)
251
1720
    }
252
}
253

            
254
impl Client {
255
    /// Initialize a client connecting to `url`. This client can be shared by
256
    /// cloning it. All requests are done asynchronously over the same
257
    /// connection.
258
    ///
259
    /// If the client has an error connecting, the first request made will
260
    /// present that error. If the client disconnects while processing requests,
261
    /// all requests being processed will exit and return
262
    /// [`Error::Disconnected`]. The client will automatically try reconnecting.
263
    ///
264
    /// The goal of this design of this reconnection strategy is to make it
265
    /// easier to build resilliant apps. By allowing existing Client instances
266
    /// to recover and reconnect, each component of the apps built can adopt a
267
    /// "retry-to-recover" design, or "abort-and-fail" depending on how critical
268
    /// the database is to operation.
269
1260
    pub fn new(url: Url) -> Result<Self, Error> {
270
1260
        Self::new_from_parts(
271
1260
            url,
272
1260
            CURRENT_PROTOCOL_VERSION,
273
1260
            HashMap::default(),
274
1260
            #[cfg(not(target_arch = "wasm32"))]
275
1260
            None,
276
1260
            #[cfg(not(target_arch = "wasm32"))]
277
1260
            Handle::try_current().ok(),
278
1260
        )
279
1260
    }
280

            
281
    /// Initialize a client connecting to `url` with `certificate` being used to
282
    /// validate and encrypt the connection. This client can be shared by
283
    /// cloning it. All requests are done asynchronously over the same
284
    /// connection.
285
    ///
286
    /// If the client has an error connecting, the first request made will
287
    /// present that error. If the client disconnects while processing requests,
288
    /// all requests being processed will exit and return
289
    /// [`Error::Disconnected`]. The client will automatically try reconnecting.
290
    ///
291
    /// The goal of this design of this reconnection strategy is to make it
292
    /// easier to build resilliant apps. By allowing existing Client instances
293
    /// to recover and reconnect, each component of the apps built can adopt a
294
    /// "retry-to-recover" design, or "abort-and-fail" depending on how critical
295
    /// the database is to operation.
296
2980
    pub(crate) fn new_from_parts(
297
2980
        url: Url,
298
2980
        protocol_version: &'static str,
299
2980
        mut custom_apis: HashMap<ApiName, Option<Arc<dyn AnyApiCallback>>>,
300
2980
        #[cfg(not(target_arch = "wasm32"))] certificate: Option<fabruic::Certificate>,
301
2980
        #[cfg(not(target_arch = "wasm32"))] tokio: Option<Handle>,
302
2980
    ) -> Result<Self, Error> {
303
2980
        let subscribers = SubscriberMap::default();
304
2980
        let callback_subscribers = subscribers.clone();
305
2980
        custom_apis.insert(
306
2980
            MessageReceived::name(),
307
2980
            Some(Arc::new(ApiCallback::<MessageReceived>::new(
308
2980
                move |message: MessageReceived| {
309
900
                    let callback_subscribers = callback_subscribers.clone();
310
900
                    async move {
311
900
                        let mut subscribers = callback_subscribers.lock();
312
900
                        if let Some(sender) = subscribers.get(&message.subscriber_id) {
313
900
                            if sender
314
900
                                .send(bonsaidb_core::circulate::Message {
315
900
                                    topic: OwnedBytes::from(message.topic.into_vec()),
316
900
                                    payload: OwnedBytes::from(message.payload.into_vec()),
317
900
                                })
318
900
                                .is_err()
319
                            {
320
                                subscribers.remove(&message.subscriber_id);
321
900
                            }
322
                        }
323
900
                    }
324
2980
                },
325
2980
            ))),
326
2980
        );
327
2980
        match url.scheme() {
328
2980
            #[cfg(not(target_arch = "wasm32"))]
329
2980
            "bonsaidb" => Ok(Self::new_bonsai_client(
330
1240
                url,
331
1240
                protocol_version,
332
1240
                certificate,
333
1240
                custom_apis,
334
1240
                tokio,
335
1240
                subscribers,
336
1240
            )),
337
            #[cfg(feature = "websockets")]
338
1740
            "wss" | "ws" => Ok(Self::new_websocket_client(
339
1740
                url,
340
1740
                protocol_version,
341
1740
                custom_apis,
342
1740
                #[cfg(not(target_arch = "wasm32"))]
343
1740
                tokio,
344
1740
                subscribers,
345
1740
            )),
346
            other => {
347
                return Err(Error::InvalidUrl(format!("unsupported scheme {}", other)));
348
            }
349
        }
350
2980
    }
351

            
352
    #[cfg(not(target_arch = "wasm32"))]
353
1240
    fn new_bonsai_client(
354
1240
        url: Url,
355
1240
        protocol_version: &'static str,
356
1240
        certificate: Option<fabruic::Certificate>,
357
1240
        custom_apis: HashMap<ApiName, Option<Arc<dyn AnyApiCallback>>>,
358
1240
        tokio: Option<Handle>,
359
1240
        subscribers: SubscriberMap,
360
1240
    ) -> Self {
361
1240
        let (request_sender, request_receiver) = flume::unbounded();
362
1240

            
363
1240
        let worker = sync::spawn_client(
364
1240
            quic_worker::reconnecting_client_loop(
365
1240
                url,
366
1240
                protocol_version,
367
1240
                certificate,
368
1240
                request_receiver,
369
1240
                Arc::new(custom_apis),
370
1240
                subscribers.clone(),
371
1240
            ),
372
1240
            tokio,
373
1240
        );
374
1240

            
375
1240
        #[cfg(feature = "test-util")]
376
1240
        let background_task_running = Arc::new(AtomicBool::new(true));
377
1240

            
378
1240
        Self {
379
1240
            data: Arc::new(Data {
380
1240
                request_sender,
381
1240
                _worker: CancellableHandle {
382
1240
                    worker,
383
1240
                    #[cfg(feature = "test-util")]
384
1240
                    background_task_running: background_task_running.clone(),
385
1240
                },
386
1240
                schemas: Mutex::default(),
387
1240
                request_id: AtomicU32::default(),
388
1240
                effective_permissions: Mutex::default(),
389
1240
                subscribers,
390
1240
                #[cfg(feature = "test-util")]
391
1240
                background_task_running,
392
1240
            }),
393
1240
            session: Arc::new(Session::default()),
394
1240
        }
395
1240
    }
396

            
397
    #[cfg(all(feature = "websockets", not(target_arch = "wasm32")))]
398
1740
    fn new_websocket_client(
399
1740
        url: Url,
400
1740
        protocol_version: &'static str,
401
1740
        custom_apis: HashMap<ApiName, Option<Arc<dyn AnyApiCallback>>>,
402
1740
        tokio: Option<Handle>,
403
1740
        subscribers: SubscriberMap,
404
1740
    ) -> Self {
405
1740
        let (request_sender, request_receiver) = flume::unbounded();
406
1740

            
407
1740
        let worker = sync::spawn_client(
408
1740
            tungstenite_worker::reconnecting_client_loop(
409
1740
                url,
410
1740
                protocol_version,
411
1740
                request_receiver,
412
1740
                Arc::new(custom_apis),
413
1740
                subscribers.clone(),
414
1740
            ),
415
1740
            tokio,
416
1740
        );
417
1740

            
418
1740
        #[cfg(feature = "test-util")]
419
1740
        let background_task_running = Arc::new(AtomicBool::new(true));
420
1740

            
421
1740
        Self {
422
1740
            data: Arc::new(Data {
423
1740
                request_sender,
424
1740
                #[cfg(not(target_arch = "wasm32"))]
425
1740
                _worker: CancellableHandle {
426
1740
                    worker,
427
1740
                    #[cfg(feature = "test-util")]
428
1740
                    background_task_running: background_task_running.clone(),
429
1740
                },
430
1740
                schemas: Mutex::default(),
431
1740
                request_id: AtomicU32::default(),
432
1740
                effective_permissions: Mutex::default(),
433
1740
                subscribers,
434
1740
                #[cfg(feature = "test-util")]
435
1740
                background_task_running,
436
1740
            }),
437
1740
            session: Arc::new(Session::default()),
438
1740
        }
439
1740
    }
440

            
441
    #[cfg(all(feature = "websockets", target_arch = "wasm32"))]
442
    fn new_websocket_client(
443
        url: Url,
444
        protocol_version: &'static str,
445
        custom_apis: HashMap<ApiName, Option<Arc<dyn AnyApiCallback>>>,
446
        subscribers: SubscriberMap,
447
    ) -> Self {
448
        let (request_sender, request_receiver) = flume::unbounded();
449

            
450
        wasm_websocket_worker::spawn_client(
451
            Arc::new(url),
452
            protocol_version,
453
            request_receiver,
454
            Arc::new(custom_apis),
455
            subscribers.clone(),
456
        );
457

            
458
        #[cfg(feature = "test-util")]
459
        let background_task_running = Arc::new(AtomicBool::new(true));
460

            
461
        Self {
462
            data: Arc::new(Data {
463
                request_sender,
464
                #[cfg(not(target_arch = "wasm32"))]
465
                worker: CancellableHandle {
466
                    worker,
467
                    #[cfg(feature = "test-util")]
468
                    background_task_running: background_task_running.clone(),
469
                },
470
                schemas: Mutex::default(),
471
                request_id: AtomicU32::default(),
472
                effective_permissions: Mutex::default(),
473
                subscribers,
474
                #[cfg(feature = "test-util")]
475
                background_task_running,
476
            }),
477
            session: Arc::new(Session::default()),
478
        }
479
    }
480

            
481
1448300
    fn send_request_without_confirmation(
482
1448300
        &self,
483
1448300
        name: ApiName,
484
1448300
        bytes: Bytes,
485
1448300
    ) -> Result<flume::Receiver<Result<Bytes, Error>>, Error> {
486
1448300
        let (result_sender, result_receiver) = flume::bounded(1);
487
1448300
        let id = self.data.request_id.fetch_add(1, Ordering::SeqCst);
488
1448300
        self.data.request_sender.send(PendingRequest {
489
1448300
            request: Payload {
490
1448300
                session_id: self.session.id,
491
1448300
                id: Some(id),
492
1448300
                name,
493
1448300
                value: Ok(bytes),
494
1448300
            },
495
1448300
            responder: result_sender,
496
1448300
        })?;
497

            
498
1448300
        Ok(result_receiver)
499
1448300
    }
500

            
501
1221580
    async fn send_request_async(&self, name: ApiName, bytes: Bytes) -> Result<Bytes, Error> {
502
1221560
        let result_receiver = self.send_request_without_confirmation(name, bytes)?;
503

            
504
1579260
        result_receiver.recv_async().await?
505
1221560
    }
506

            
507
226580
    fn send_request(&self, name: ApiName, bytes: Bytes) -> Result<Bytes, Error> {
508
226580
        let result_receiver = self.send_request_without_confirmation(name, bytes)?;
509

            
510
226580
        result_receiver.recv()?
511
226580
    }
512

            
513
    /// Sends an api `request`.
514
1220857
    pub async fn send_api_request_async<Api: api::Api>(
515
1220857
        &self,
516
1220857
        request: &Api,
517
1220857
    ) -> Result<Api::Response, ApiError<Api::Error>> {
518
1220877
        let request = Bytes::from(pot::to_vec(request).map_err(Error::from)?);
519
1578386
        let response = self.send_request_async(Api::name(), request).await?;
520
1210093
        let response =
521
1210073
            pot::from_slice::<Result<Api::Response, Api::Error>>(&response).map_err(Error::from)?;
522
1210093
        response.map_err(ApiError::Api)
523
1220877
    }
524

            
525
    /// Sends an api `request` without waiting for a result. The response from
526
    /// the server will be ignored.
527
140
    pub fn invoke_api_request<Api: api::Api>(&self, request: &Api) -> Result<(), Error> {
528
140
        let request = Bytes::from(pot::to_vec(request).map_err(Error::from)?);
529
140
        self.send_request_without_confirmation(Api::name(), request)
530
140
            .map(|_| ())
531
140
    }
532

            
533
    /// Sends an api `request`.
534
226390
    pub fn send_api_request<Api: api::Api>(
535
226390
        &self,
536
226390
        request: &Api,
537
226390
    ) -> Result<Api::Response, ApiError<Api::Error>> {
538
226390
        let request = Bytes::from(pot::to_vec(request).map_err(Error::from)?);
539
226390
        let response = self.send_request(Api::name(), request)?;
540

            
541
226070
        let response =
542
226070
            pot::from_slice::<Result<Api::Response, Api::Error>>(&response).map_err(Error::from)?;
543
226070
        response.map_err(ApiError::Api)
544
226390
    }
545

            
546
    /// Returns the current effective permissions for the client. Returns None
547
    /// if unauthenticated.
548
    pub async fn effective_permissions(&self) -> Option<Permissions> {
549
        let effective_permissions = self.data.effective_permissions.lock();
550
        effective_permissions.clone()
551
    }
552

            
553
    #[cfg(feature = "test-util")]
554
    #[doc(hidden)]
555
    #[must_use]
556
    pub fn background_task_running(&self) -> Arc<AtomicBool> {
557
        self.data.background_task_running.clone()
558
    }
559

            
560
480
    pub(crate) fn register_subscriber(&self, id: u64, sender: flume::Sender<Message>) {
561
480
        let mut subscribers = self.data.subscribers.lock();
562
480
        subscribers.insert(id, sender);
563
480
    }
564

            
565
40
    pub(crate) async fn unregister_subscriber_async(&self, database: String, id: u64) {
566
40
        drop(
567
40
            self.send_api_request_async(&UnregisterSubscriber {
568
40
                database,
569
40
                subscriber_id: id,
570
40
            })
571
40
            .await,
572
        );
573
40
        let mut subscribers = self.data.subscribers.lock();
574
40
        subscribers.remove(&id);
575
40
    }
576

            
577
    #[cfg(not(target_arch = "wasm32"))]
578
160
    pub(crate) fn unregister_subscriber(&self, database: String, id: u64) {
579
160
        drop(self.send_api_request(&UnregisterSubscriber {
580
160
            database,
581
160
            subscriber_id: id,
582
160
        }));
583
160
        let mut subscribers = self.data.subscribers.lock();
584
160
        subscribers.remove(&id);
585
160
    }
586

            
587
1308
    fn database<DB: bonsaidb_core::schema::Schema>(
588
1308
        &self,
589
1308
        name: &str,
590
1308
    ) -> Result<RemoteDatabase, bonsaidb_core::Error> {
591
1308
        let mut schemas = self.data.schemas.lock();
592
1308
        let type_id = TypeId::of::<DB>();
593
1308
        let schematic = if let Some(schematic) = schemas.get(&type_id) {
594
1103
            schematic.clone()
595
        } else {
596
205
            let schematic = Arc::new(DB::schematic()?);
597
205
            schemas.insert(type_id, schematic.clone());
598
205
            schematic
599
        };
600
1308
        Ok(RemoteDatabase::new(
601
1308
            self.clone(),
602
1308
            name.to_string(),
603
1308
            schematic,
604
1308
        ))
605
1308
    }
606
}
607

            
608
impl HasSession for Client {
609
    fn session(&self) -> Option<&Session> {
610
        Some(&self.session)
611
    }
612
}
613

            
614
#[async_trait]
615
impl AsyncStorageConnection for Client {
616
    type Database = RemoteDatabase;
617
    type Authenticated = Self;
618

            
619
    async fn admin(&self) -> Self::Database {
620
        self.database::<Admin>(ADMIN_DATABASE_NAME).unwrap()
621
    }
622

            
623
11440
    async fn create_database_with_schema(
624
11440
        &self,
625
11440
        name: &str,
626
11440
        schema: SchemaName,
627
11440
        only_if_needed: bool,
628
11440
    ) -> Result<(), bonsaidb_core::Error> {
629
11440
        self.send_api_request_async(&CreateDatabase {
630
11440
            database: Database {
631
11440
                name: name.to_string(),
632
11440
                schema,
633
11440
            },
634
11440
            only_if_needed,
635
40620
        })
636
40620
        .await?;
637
11320
        Ok(())
638
22880
    }
639

            
640
1186
    async fn database<DB: Schema>(
641
1186
        &self,
642
1186
        name: &str,
643
1186
    ) -> Result<Self::Database, bonsaidb_core::Error> {
644
1186
        self.database::<DB>(name)
645
1186
    }
646

            
647
10080
    async fn delete_database(&self, name: &str) -> Result<(), bonsaidb_core::Error> {
648
10080
        self.send_api_request_async(&DeleteDatabase {
649
10080
            name: name.to_string(),
650
37120
        })
651
37120
        .await?;
652
10040
        Ok(())
653
20160
    }
654

            
655
40
    async fn list_databases(&self) -> Result<Vec<Database>, bonsaidb_core::Error> {
656
40
        Ok(self.send_api_request_async(&ListDatabases).await?)
657
80
    }
658

            
659
40
    async fn list_available_schemas(&self) -> Result<Vec<SchemaName>, bonsaidb_core::Error> {
660
40
        Ok(self.send_api_request_async(&ListAvailableSchemas).await?)
661
80
    }
662

            
663
80
    async fn create_user(&self, username: &str) -> Result<u64, bonsaidb_core::Error> {
664
80
        Ok(self
665
80
            .send_api_request_async(&CreateUser {
666
80
                username: username.to_string(),
667
80
            })
668
80
            .await?)
669
160
    }
670

            
671
2
    async fn delete_user<'user, U: Nameable<'user, u64> + Send + Sync>(
672
2
        &self,
673
2
        user: U,
674
2
    ) -> Result<(), bonsaidb_core::Error> {
675
        Ok(self
676
            .send_api_request_async(&DeleteUser {
677
2
                user: user.name()?.into_owned(),
678
2
            })
679
2
            .await?)
680
4
    }
681

            
682
    #[cfg(feature = "password-hashing")]
683
    async fn set_user_password<'user, U: Nameable<'user, u64> + Send + Sync>(
684
        &self,
685
        user: U,
686
        password: bonsaidb_core::connection::SensitiveString,
687
    ) -> Result<(), bonsaidb_core::Error> {
688
        Ok(self
689
            .send_api_request_async(&bonsaidb_core::networking::SetUserPassword {
690
                user: user.name()?.into_owned(),
691
                password,
692
            })
693
            .await?)
694
    }
695

            
696
    #[cfg(feature = "password-hashing")]
697
5
    async fn authenticate<'user, U: Nameable<'user, u64> + Send + Sync>(
698
5
        &self,
699
5
        user: U,
700
5
        authentication: Authentication,
701
5
    ) -> Result<Self::Authenticated, bonsaidb_core::Error> {
702
5
        let session = self
703
            .send_api_request_async(&bonsaidb_core::networking::Authenticate {
704
5
                user: user.name()?.into_owned(),
705
5
                authentication,
706
11
            })
707
11
            .await?;
708
5
        Ok(Self {
709
5
            data: self.data.clone(),
710
5
            session: Arc::new(session),
711
5
        })
712
10
    }
713

            
714
40
    async fn assume_identity(
715
40
        &self,
716
40
        identity: IdentityReference<'_>,
717
40
    ) -> Result<Self::Authenticated, bonsaidb_core::Error> {
718
40
        let session = self
719
40
            .send_api_request_async(&AssumeIdentity(identity.into_owned()))
720
40
            .await?;
721
40
        Ok(Self {
722
40
            data: self.data.clone(),
723
40
            session: Arc::new(session),
724
40
        })
725
80
    }
726

            
727
4
    async fn add_permission_group_to_user<
728
4
        'user,
729
4
        'group,
730
4
        U: Nameable<'user, u64> + Send + Sync,
731
4
        G: Nameable<'group, u64> + Send + Sync,
732
4
    >(
733
4
        &self,
734
4
        user: U,
735
4
        permission_group: G,
736
4
    ) -> Result<(), bonsaidb_core::Error> {
737
        self.send_api_request_async(&AlterUserPermissionGroupMembership {
738
4
            user: user.name()?.into_owned(),
739
4
            group: permission_group.name()?.into_owned(),
740
            should_be_member: true,
741
4
        })
742
4
        .await?;
743
4
        Ok(())
744
8
    }
745

            
746
4
    async fn remove_permission_group_from_user<
747
4
        'user,
748
4
        'group,
749
4
        U: Nameable<'user, u64> + Send + Sync,
750
4
        G: Nameable<'group, u64> + Send + Sync,
751
4
    >(
752
4
        &self,
753
4
        user: U,
754
4
        permission_group: G,
755
4
    ) -> Result<(), bonsaidb_core::Error> {
756
        self.send_api_request_async(&AlterUserPermissionGroupMembership {
757
4
            user: user.name()?.into_owned(),
758
4
            group: permission_group.name()?.into_owned(),
759
            should_be_member: false,
760
4
        })
761
4
        .await?;
762
4
        Ok(())
763
8
    }
764

            
765
4
    async fn add_role_to_user<
766
4
        'user,
767
4
        'group,
768
4
        U: Nameable<'user, u64> + Send + Sync,
769
4
        G: Nameable<'group, u64> + Send + Sync,
770
4
    >(
771
4
        &self,
772
4
        user: U,
773
4
        role: G,
774
4
    ) -> Result<(), bonsaidb_core::Error> {
775
        self.send_api_request_async(&AlterUserRoleMembership {
776
4
            user: user.name()?.into_owned(),
777
4
            role: role.name()?.into_owned(),
778
            should_be_member: true,
779
4
        })
780
4
        .await?;
781
4
        Ok(())
782
8
    }
783

            
784
4
    async fn remove_role_from_user<
785
4
        'user,
786
4
        'group,
787
4
        U: Nameable<'user, u64> + Send + Sync,
788
4
        G: Nameable<'group, u64> + Send + Sync,
789
4
    >(
790
4
        &self,
791
4
        user: U,
792
4
        role: G,
793
4
    ) -> Result<(), bonsaidb_core::Error> {
794
        self.send_api_request_async(&AlterUserRoleMembership {
795
4
            user: user.name()?.into_owned(),
796
4
            role: role.name()?.into_owned(),
797
            should_be_member: false,
798
4
        })
799
4
        .await?;
800
4
        Ok(())
801
8
    }
802
}
803

            
804
type OutstandingRequestMap = HashMap<u32, PendingRequest>;
805
type OutstandingRequestMapHandle = Arc<async_lock::Mutex<OutstandingRequestMap>>;
806
type PendingRequestResponder = Sender<Result<Bytes, Error>>;
807

            
808
#[derive(Debug)]
809
pub struct PendingRequest {
810
    request: Payload,
811
    responder: PendingRequestResponder,
812
}
813

            
814
#[cfg(not(target_arch = "wasm32"))]
815
#[derive(Debug)]
816
struct CancellableHandle<T> {
817
    worker: JoinHandle<T>,
818
    #[cfg(feature = "test-util")]
819
    background_task_running: Arc<AtomicBool>,
820
}
821

            
822
#[cfg(not(target_arch = "wasm32"))]
823
impl<T> Drop for CancellableHandle<T> {
824
2960
    fn drop(&mut self) {
825
2960
        self.worker.abort();
826
2960
        #[cfg(feature = "test-util")]
827
2960
        self.background_task_running.store(false, Ordering::Release);
828
2960
    }
829
}
830

            
831
1449020
async fn process_response_payload(
832
1449020
    payload: Payload,
833
1449020
    outstanding_requests: &OutstandingRequestMapHandle,
834
1449020
    custom_apis: &HashMap<ApiName, Option<Arc<dyn AnyApiCallback>>>,
835
1449020
) {
836
1449020
    if let Some(payload_id) = payload.id {
837
1448120
        if let Some(outstanding_request) = {
838
1448120
            let mut outstanding_requests = fast_async_lock!(outstanding_requests);
839
1448120
            outstanding_requests.remove(&payload_id)
840
1448120
        } {
841
1448120
            drop(
842
1448120
                outstanding_request
843
1448120
                    .responder
844
1448120
                    .send(payload.value.map_err(Error::from)),
845
1448120
            );
846
1448120
        }
847
900
    } else if let (Some(custom_api_callback), Ok(value)) = (
848
900
        custom_apis.get(&payload.name).and_then(Option::as_ref),
849
900
        payload.value,
850
900
    ) {
851
900
        custom_api_callback.response_received(value).await;
852
    } else {
853
        log::warn!("unexpected api response received ({})", payload.name);
854
    }
855
1449020
}
856

            
857
trait ApiWrapper<Response>: Send + Sync {
858
    fn invoke(&self, response: Response) -> BoxFuture<'static, ()>;
859
}
860

            
861
/// A callback that is invoked when an [`Api::Response`](Api::Response)
862
/// value is received out-of-band (not in reply to a request).
863
pub struct ApiCallback<Api: api::Api> {
864
    generator: Box<dyn ApiWrapper<Api::Response>>,
865
}
866

            
867
/// The trait bounds required for the function wrapped in a [`ApiCallback`].
868
pub trait ApiCallbackFn<Request, F>: Fn(Request) -> F + Send + Sync + 'static {}
869

            
870
impl<T, Request, F> ApiCallbackFn<Request, F> for T where T: Fn(Request) -> F + Send + Sync + 'static
871
{}
872

            
873
struct ApiFutureBoxer<Response: Send + Sync, F: Future<Output = ()> + Send + Sync>(
874
    Box<dyn ApiCallbackFn<Response, F>>,
875
);
876

            
877
impl<Response: Send + Sync, F: Future<Output = ()> + Send + Sync + 'static> ApiWrapper<Response>
878
    for ApiFutureBoxer<Response, F>
879
{
880
900
    fn invoke(&self, response: Response) -> BoxFuture<'static, ()> {
881
900
        (&self.0)(response).boxed()
882
900
    }
883
}
884

            
885
impl<Api: api::Api> ApiCallback<Api> {
886
    /// Returns a new instance wrapping the provided function.
887
2980
    pub fn new<
888
2980
        F: ApiCallbackFn<Api::Response, Fut>,
889
2980
        Fut: Future<Output = ()> + Send + Sync + 'static,
890
2980
    >(
891
2980
        callback: F,
892
2980
    ) -> Self {
893
2980
        Self {
894
2980
            generator: Box::new(ApiFutureBoxer::<Api::Response, Fut>(Box::new(callback))),
895
2980
        }
896
2980
    }
897
    /// Returns a new instance wrapping the provided function, passing a clone
898
    /// of `context` as the second parameter. This is just a convenience wrapper
899
    /// around `new()` that produces more readable code when needing to access
900
    /// external information inside of the callback.
901
    pub fn new_with_context<
902
        Context: Send + Sync + Clone + 'static,
903
        F: Fn(Api::Response, Context) -> Fut + Send + Sync + 'static,
904
        Fut: Future<Output = ()> + Send + Sync + 'static,
905
    >(
906
        context: Context,
907
        callback: F,
908
    ) -> Self {
909
        Self {
910
            generator: Box::new(ApiFutureBoxer::<Api::Response, Fut>(Box::new(
911
                move |request| {
912
                    let context = context.clone();
913
                    callback(request, context)
914
                },
915
            ))),
916
        }
917
    }
918
}
919

            
920
#[async_trait]
921
pub trait AnyApiCallback: Send + Sync + 'static {
922
    /// An out-of-band `response` was received. This happens when the server
923
    /// sends a response that isn't in response to a request.
924
    async fn response_received(&self, response: Bytes);
925
}
926

            
927
#[async_trait]
928
impl<Api: api::Api> AnyApiCallback for ApiCallback<Api> {
929
900
    async fn response_received(&self, response: Bytes) {
930
900
        match pot::from_slice::<Result<Api::Response, Api::Error>>(&response) {
931
900
            Ok(response) => self.generator.invoke(response.unwrap()).await,
932
            Err(err) => {
933
                log::error!("error deserializing api: {err}");
934
            }
935
        }
936
900
    }
937
}