1
use std::any::TypeId;
2
use std::collections::HashMap;
3
use std::fmt::Debug;
4
use std::ops::Deref;
5
use std::sync::atomic::{AtomicU32, Ordering};
6
use std::sync::Arc;
7
use std::time::Duration;
8

            
9
use async_trait::async_trait;
10
use bonsaidb_core::admin::{Admin, ADMIN_DATABASE_NAME};
11
use bonsaidb_core::api::{self, Api, ApiName};
12
use bonsaidb_core::arc_bytes::serde::Bytes;
13
use bonsaidb_core::arc_bytes::OwnedBytes;
14
use bonsaidb_core::connection::{
15
    AsyncStorageConnection, Database, HasSession, IdentityReference, Session,
16
};
17
use bonsaidb_core::networking::{
18
    AlterUserPermissionGroupMembership, AlterUserRoleMembership, AssumeIdentity, CreateDatabase,
19
    CreateUser, DeleteDatabase, DeleteUser, ListAvailableSchemas, ListDatabases, LogOutSession,
20
    MessageReceived, Payload, UnregisterSubscriber, CURRENT_PROTOCOL_VERSION,
21
};
22
use bonsaidb_core::permissions::Permissions;
23
use bonsaidb_core::schema::{Nameable, Schema, SchemaName, SchemaSummary, Schematic};
24
use bonsaidb_utils::fast_async_lock;
25
use flume::Sender;
26
use futures::future::BoxFuture;
27
use futures::{Future, FutureExt};
28
use parking_lot::Mutex;
29
#[cfg(not(target_arch = "wasm32"))]
30
use tokio::runtime::Handle;
31
use url::Url;
32

            
33
pub use self::remote_database::{AsyncRemoteDatabase, AsyncRemoteSubscriber};
34
#[cfg(not(target_arch = "wasm32"))]
35
pub use self::sync::{BlockingClient, BlockingRemoteDatabase, BlockingRemoteSubscriber};
36
use crate::builder::Async;
37
use crate::error::Error;
38
use crate::{ApiError, Builder};
39

            
40
#[cfg(not(target_arch = "wasm32"))]
41
mod quic_worker;
42
mod remote_database;
43
#[cfg(not(target_arch = "wasm32"))]
44
mod sync;
45
#[cfg(all(feature = "websockets", not(target_arch = "wasm32")))]
46
mod tungstenite_worker;
47
#[cfg(all(feature = "websockets", target_arch = "wasm32"))]
48
mod wasm_websocket_worker;
49

            
50
12450
#[derive(Debug, Clone, Default)]
51
pub struct SubscriberMap(Arc<Mutex<HashMap<u64, flume::Sender<Message>>>>);
52

            
53
impl SubscriberMap {
54
7530
    pub fn clear(&self) {
55
7530
        let mut data = self.lock();
56
7530
        data.clear();
57
7530
    }
58
}
59

            
60
impl Deref for SubscriberMap {
61
    type Target = Mutex<HashMap<u64, flume::Sender<Message>>>;
62

            
63
9900
    fn deref(&self) -> &Self::Target {
64
9900
        &self.0
65
9900
    }
66
}
67

            
68
use bonsaidb_core::circulate::Message;
69

            
70
#[cfg(all(feature = "websockets", not(target_arch = "wasm32")))]
71
pub type WebSocketError = tokio_tungstenite::tungstenite::Error;
72

            
73
#[cfg(all(feature = "websockets", target_arch = "wasm32"))]
74
pub type WebSocketError = wasm_websocket_worker::WebSocketError;
75

            
76
/// Client for connecting to a BonsaiDb server.
77
///
78
/// ## How this type automatically reconnects
79
///
80
/// This type is designed to automatically reconnect if the underlying network
81
/// connection has been lost. When a disconnect happens, the error that caused
82
/// the disconnection will be returned to at least one requestor. If multiple
83
/// pending requests are outstanding, all remaining pending requests will have
84
/// an [`Error::Disconnected`](bonsaidb_core::networking::Error::Disconnected)
85
/// returned. This allows the application to detect when a networking issue has
86
/// arisen.
87
///
88
/// If the disconnect happens while the client is completely idle, the next
89
/// request will report the disconnection error. The subsequent request will
90
/// cause the client to begin reconnecting again.
91
///
92
/// When unauthenticated, this reconnection behavior is mostly transparent --
93
/// disconnection errors can be shown to the user, and service will be restored
94
/// automatically. However, when dealing with authentication, the client does
95
/// not store credentials to be able to send them again when reconnecting. This
96
/// means that the existing client handles will lose their authentication when
97
/// the network connection is broken. The current authentication status can be
98
/// checked using [`HasSession::session()`].
99
///
100
/// ## Connecting via QUIC
101
///
102
/// The URL scheme to connect via QUIC is `bonsaidb`. If no port is specified,
103
/// port 5645 is assumed.
104
///
105
/// ### With a valid TLS certificate
106
///
107
/// ```rust
108
/// # use bonsaidb_client::{AsyncClient, fabruic::Certificate, url::Url};
109
/// # async fn test_fn() -> anyhow::Result<()> {
110
/// let client = AsyncClient::build(Url::parse("bonsaidb://my-server.com")?).build()?;
111
/// # Ok(())
112
/// # }
113
/// ```
114
///
115
/// ### With a Self-Signed Pinned Certificate
116
///
117
/// When using `install_self_signed_certificate()`, clients will need the
118
/// contents of the `pinned-certificate.der` file within the database. It can be
119
/// specified when building the client:
120
///
121
/// ```rust
122
/// # use bonsaidb_client::{AsyncClient, fabruic::Certificate, url::Url};
123
/// # async fn test_fn() -> anyhow::Result<()> {
124
/// let certificate =
125
///     Certificate::from_der(std::fs::read("mydb.bonsaidb/pinned-certificate.der")?)?;
126
/// let client = AsyncClient::build(Url::parse("bonsaidb://localhost")?)
127
///     .with_certificate(certificate)
128
///     .build()?;
129
/// # Ok(())
130
/// # }
131
/// ```
132
///
133
/// ## Connecting via WebSockets
134
///
135
/// WebSockets are built atop the HTTP protocol. There are two URL schemes for
136
/// WebSockets:
137
///
138
/// - `ws`: Insecure WebSockets. Port 80 is assumed if no port is specified.
139
/// - `wss`: Secure WebSockets. Port 443 is assumed if no port is specified.
140
///
141
/// ### Without TLS
142
///
143
/// ```rust
144
/// # use bonsaidb_client::{AsyncClient, fabruic::Certificate, url::Url};
145
/// # async fn test_fn() -> anyhow::Result<()> {
146
/// let client = AsyncClient::build(Url::parse("ws://localhost")?).build()?;
147
/// # Ok(())
148
/// # }
149
/// ```
150
///
151
/// ### With TLS
152
///
153
/// ```rust
154
/// # use bonsaidb_client::{AsyncClient, fabruic::Certificate, url::Url};
155
/// # async fn test_fn() -> anyhow::Result<()> {
156
/// let client = AsyncClient::build(Url::parse("wss://my-server.com")?).build()?;
157
/// # Ok(())
158
/// # }
159
/// ```
160
///
161
/// ## Using a `Api`
162
///
163
/// Our user guide has a [section on creating and
164
/// using](https://dev.bonsaidb.io/release/guide/about/access-models/custom-api-server.html)
165
/// an [`Api`](api::Api).
166
///
167
/// ```rust
168
/// # use bonsaidb_client::{AsyncClient, fabruic::Certificate, url::Url};
169
/// // `bonsaidb_core` is re-exported to `bonsaidb::core` or `bonsaidb_client::core`.
170
/// use bonsaidb_core::api::{Api, ApiName, Infallible};
171
/// use bonsaidb_core::schema::Qualified;
172
/// use serde::{Deserialize, Serialize};
173
///
174
/// #[derive(Serialize, Deserialize, Debug)]
175
/// pub struct Ping;
176
///
177
/// #[derive(Serialize, Deserialize, Clone, Debug)]
178
/// pub struct Pong;
179
///
180
/// impl Api for Ping {
181
///     type Error = Infallible;
182
///     type Response = Pong;
183
///
184
///     fn name() -> ApiName {
185
///         ApiName::private("ping")
186
///     }
187
/// }
188
///
189
/// # async fn test_fn() -> anyhow::Result<()> {
190
/// let client = AsyncClient::build(Url::parse("bonsaidb://localhost")?).build()?;
191
/// let Pong = client.send_api_request(&Ping).await?;
192
/// # Ok(())
193
/// # }
194
/// ```
195
///
196
/// ### Receiving out-of-band messages from the server
197
///
198
/// If the server sends a message that isn't in response to a request, the
199
/// client will invoke it's [api callback](Builder::with_api_callback):
200
///
201
/// ```rust
202
/// # use bonsaidb_client::{AsyncClient, ApiCallback, fabruic::Certificate, url::Url};
203
/// # // `bonsaidb_core` is re-exported to `bonsaidb::core` or `bonsaidb_client::core`.
204
/// # use bonsaidb_core::{api::{Api, Infallible, ApiName}, schema::{Qualified}};
205
/// # use serde::{Serialize, Deserialize};
206
/// # #[derive(Serialize, Deserialize, Debug)]
207
/// # pub struct Ping;
208
/// # #[derive(Serialize, Deserialize, Clone, Debug)]
209
/// # pub struct Pong;
210
/// # impl Api for Ping {
211
/// #     type Response = Pong;
212
/// #     type Error = Infallible;
213
/// #
214
/// #     fn name() -> ApiName {
215
/// #         ApiName::private("ping")
216
/// #     }
217
/// # }
218
/// # async fn test_fn() -> anyhow::Result<()> {
219
/// let client = AsyncClient::build(Url::parse("bonsaidb://localhost")?)
220
///     .with_api_callback(ApiCallback::<Ping>::new(|result: Pong| async move {
221
///         println!("Received out-of-band Pong");
222
///     }))
223
///     .build()?;
224
/// # Ok(())
225
/// # }
226
/// ```
227
3089
#[derive(Debug, Clone)]
228
pub struct AsyncClient {
229
    pub(crate) data: Arc<Data>,
230
    session: ClientSession,
231
    request_timeout: Duration,
232
}
233

            
234
impl Drop for AsyncClient {
235
58800
    fn drop(&mut self) {
236
58800
        if self.session_is_current() && Arc::strong_count(&self.session.session) == 1 {
237
6090
            if let Some(session_id) = self.session.session.id {
238
570
                // Final reference to an authenticated session
239
570
                drop(self.invoke_blocking_api_request(&LogOutSession(session_id)));
240
5520
            }
241
52710
        }
242
58800
    }
243
}
244

            
245
impl PartialEq for AsyncClient {
246
    fn eq(&self, other: &Self) -> bool {
247
        Arc::ptr_eq(&self.data, &other.data)
248
    }
249
}
250

            
251
#[derive(Debug)]
252
pub struct Data {
253
    request_sender: Sender<PendingRequest>,
254
    effective_permissions: Mutex<Option<Permissions>>,
255
    schemas: Mutex<HashMap<TypeId, Arc<Schematic>>>,
256
    connection_counter: Arc<AtomicU32>,
257
    request_id: AtomicU32,
258
    subscribers: SubscriberMap,
259
}
260

            
261
impl AsyncClient {
262
    /// Returns a builder for a new client connecting to `url`.
263
3270
    pub fn build(url: Url) -> Builder<Async> {
264
3270
        Builder::new(url)
265
3270
    }
266

            
267
    /// Initialize a client connecting to `url`. This client can be shared by
268
    /// cloning it. All requests are done asynchronously over the same
269
    /// connection.
270
    ///
271
    /// If the client has an error connecting, the first request made will
272
    /// present that error. If the client disconnects while processing requests,
273
    /// all requests being processed will exit and return
274
    /// [`Error::Disconnected`](bonsaidb_core::networking::Error::Disconnected).
275
    /// The client will automatically try reconnecting.
276
    ///
277
    /// The goal of this design of this reconnection strategy is to make it
278
    /// easier to build resilliant apps. By allowing existing Client instances
279
    /// to recover and reconnect, each component of the apps built can adopt a
280
    /// "retry-to-recover" design, or "abort-and-fail" depending on how critical
281
    /// the database is to operation.
282
1080
    pub fn new(url: Url) -> Result<Self, Error> {
283
1080
        Self::new_from_parts(
284
1080
            url,
285
1080
            CURRENT_PROTOCOL_VERSION,
286
1080
            HashMap::default(),
287
1080
            None,
288
1080
            None,
289
1080
            #[cfg(not(target_arch = "wasm32"))]
290
1080
            None,
291
1080
            #[cfg(not(target_arch = "wasm32"))]
292
1080
            Handle::try_current().ok(),
293
1080
        )
294
1080
    }
295

            
296
    /// Initialize a client connecting to `url` with `certificate` being used to
297
    /// validate and encrypt the connection. This client can be shared by
298
    /// cloning it. All requests are done asynchronously over the same
299
    /// connection.
300
    ///
301
    /// If the client has an error connecting, the first request made will
302
    /// present that error. If the client disconnects while processing requests,
303
    /// all requests being processed will exit and return
304
    /// [`Error::Disconnected`]. The client will automatically try reconnecting.
305
    ///
306
    /// The goal of this design of this reconnection strategy is to make it
307
    /// easier to build resilliant apps. By allowing existing Client instances
308
    /// to recover and reconnect, each component of the apps built can adopt a
309
    /// "retry-to-recover" design, or "abort-and-fail" depending on how critical
310
    /// the database is to operation.
311
5550
    pub(crate) fn new_from_parts(
312
5550
        url: Url,
313
5550
        protocol_version: &'static str,
314
5550
        mut custom_apis: HashMap<ApiName, Option<Arc<dyn AnyApiCallback>>>,
315
5550
        connect_timeout: Option<Duration>,
316
5550
        request_timeout: Option<Duration>,
317
5550
        #[cfg(not(target_arch = "wasm32"))] certificate: Option<fabruic::Certificate>,
318
5550
        #[cfg(not(target_arch = "wasm32"))] tokio: Option<Handle>,
319
5550
    ) -> Result<Self, Error> {
320
5550
        let subscribers = SubscriberMap::default();
321
5550
        let callback_subscribers = subscribers.clone();
322
5550
        custom_apis.insert(
323
5550
            MessageReceived::name(),
324
5550
            Some(Arc::new(ApiCallback::<MessageReceived>::new(
325
5550
                move |message: MessageReceived| {
326
1350
                    let callback_subscribers = callback_subscribers.clone();
327
1350
                    async move {
328
1350
                        let mut subscribers = callback_subscribers.lock();
329
1350
                        if let Some(sender) = subscribers.get(&message.subscriber_id) {
330
1350
                            if sender
331
1350
                                .send(bonsaidb_core::circulate::Message {
332
1350
                                    topic: OwnedBytes::from(message.topic.into_vec()),
333
1350
                                    payload: OwnedBytes::from(message.payload.into_vec()),
334
1350
                                })
335
1350
                                .is_err()
336
                            {
337
                                subscribers.remove(&message.subscriber_id);
338
1350
                            }
339
                        }
340
1350
                    }
341
5550
                },
342
5550
            ))),
343
5550
        );
344
5550
        // Default timeouts to 1 minute.
345
5550
        let connection = ConnectionInfo {
346
5550
            url,
347
5550
            subscribers,
348
5550
            connect_timeout: connect_timeout.unwrap_or(Duration::from_secs(60)),
349
5550
            request_timeout: request_timeout.unwrap_or(Duration::from_secs(60)),
350
5550
        };
351
5550
        match connection.url.scheme() {
352
5550
            #[cfg(not(target_arch = "wasm32"))]
353
5550
            "bonsaidb" => Ok(Self::new_bonsai_client(
354
2340
                connection,
355
2340
                protocol_version,
356
2340
                certificate,
357
2340
                custom_apis,
358
2340
                tokio,
359
2340
            )),
360
            #[cfg(feature = "websockets")]
361
3210
            "wss" | "ws" => Ok(Self::new_websocket_client(
362
3210
                connection,
363
3210
                protocol_version,
364
3210
                custom_apis,
365
3210
                #[cfg(not(target_arch = "wasm32"))]
366
3210
                tokio,
367
3210
            )),
368
            other => Err(Error::InvalidUrl(format!("unsupported scheme {other}"))),
369
        }
370
5550
    }
371

            
372
    #[cfg(not(target_arch = "wasm32"))]
373
2340
    fn new_bonsai_client(
374
2340
        server: ConnectionInfo,
375
2340
        protocol_version: &'static str,
376
2340
        certificate: Option<fabruic::Certificate>,
377
2340
        custom_apis: HashMap<ApiName, Option<Arc<dyn AnyApiCallback>>>,
378
2340
        tokio: Option<Handle>,
379
2340
    ) -> Self {
380
2340
        let (request_sender, request_receiver) = flume::unbounded();
381
2340
        let connection_counter = Arc::new(AtomicU32::default());
382
2340
        let request_timeout = server.request_timeout;
383
2340
        let subscribers = server.subscribers.clone();
384
2340

            
385
2340
        sync::spawn_client(
386
2340
            quic_worker::reconnecting_client_loop(
387
2340
                server,
388
2340
                protocol_version,
389
2340
                certificate,
390
2340
                request_receiver,
391
2340
                Arc::new(custom_apis),
392
2340
                connection_counter.clone(),
393
2340
            ),
394
2340
            tokio,
395
2340
        );
396
2340

            
397
2340
        Self {
398
2340
            data: Arc::new(Data {
399
2340
                request_sender,
400
2340
                schemas: Mutex::default(),
401
2340
                connection_counter,
402
2340
                request_id: AtomicU32::default(),
403
2340
                effective_permissions: Mutex::default(),
404
2340
                subscribers,
405
2340
            }),
406
2340
            session: ClientSession::default(),
407
2340
            request_timeout,
408
2340
        }
409
2340
    }
410

            
411
    #[cfg(all(feature = "websockets", not(target_arch = "wasm32")))]
412
3210
    fn new_websocket_client(
413
3210
        server: ConnectionInfo,
414
3210
        protocol_version: &'static str,
415
3210
        custom_apis: HashMap<ApiName, Option<Arc<dyn AnyApiCallback>>>,
416
3210
        tokio: Option<Handle>,
417
3210
    ) -> Self {
418
3210
        let (request_sender, request_receiver) = flume::unbounded();
419
3210
        let connection_counter = Arc::new(AtomicU32::default());
420
3210
        let request_timeout = server.request_timeout;
421
3210
        let subscribers = server.subscribers.clone();
422
3210

            
423
3210
        sync::spawn_client(
424
3210
            tungstenite_worker::reconnecting_client_loop(
425
3210
                server,
426
3210
                protocol_version,
427
3210
                request_receiver,
428
3210
                Arc::new(custom_apis),
429
3210
                connection_counter.clone(),
430
3210
            ),
431
3210
            tokio,
432
3210
        );
433
3210

            
434
3210
        Self {
435
3210
            data: Arc::new(Data {
436
3210
                request_sender,
437
3210
                schemas: Mutex::default(),
438
3210
                request_id: AtomicU32::default(),
439
3210
                connection_counter,
440
3210
                effective_permissions: Mutex::default(),
441
3210
                subscribers,
442
3210
            }),
443
3210
            session: ClientSession::default(),
444
3210
            request_timeout,
445
3210
        }
446
3210
    }
447

            
448
    #[cfg(all(feature = "websockets", target_arch = "wasm32"))]
449
    fn new_websocket_client(
450
        server: ConnectionInfo,
451
        protocol_version: &'static str,
452
        custom_apis: HashMap<ApiName, Option<Arc<dyn AnyApiCallback>>>,
453
    ) -> Self {
454
        let (request_sender, request_receiver) = flume::unbounded();
455
        let connection_counter = Arc::new(AtomicU32::default());
456

            
457
        wasm_websocket_worker::spawn_client(
458
            Arc::new(server.url),
459
            protocol_version,
460
            request_receiver,
461
            Arc::new(custom_apis),
462
            server.subscribers.clone(),
463
            connection_counter.clone(),
464
            None,
465
            server.connect_timeout,
466
        );
467

            
468
        #[cfg(feature = "test-util")]
469
        let background_task_running = Arc::new(AtomicBool::new(true));
470

            
471
        Self {
472
            data: Arc::new(Data {
473
                request_sender,
474
                schemas: Mutex::default(),
475
                request_id: AtomicU32::default(),
476
                connection_counter,
477
                effective_permissions: Mutex::default(),
478
                subscribers: server.subscribers,
479
                #[cfg(feature = "test-util")]
480
                background_task_running,
481
            }),
482
            session: ClientSession::default(),
483
            request_timeout: server.request_timeout,
484
        }
485
    }
486

            
487
2162190
    fn send_request_without_confirmation(
488
2162190
        &self,
489
2162190
        name: ApiName,
490
2162190
        bytes: Bytes,
491
2162190
    ) -> Result<flume::Receiver<Result<Bytes, Error>>, Error> {
492
2162190
        let (result_sender, result_receiver) = flume::bounded(1);
493
2162190
        let id = self.data.request_id.fetch_add(1, Ordering::SeqCst);
494
2162190
        self.data.request_sender.send(PendingRequest {
495
2162190
            request: Payload {
496
2162190
                session_id: self.session.session.id,
497
2162190
                id: Some(id),
498
2162190
                name,
499
2162190
                value: Ok(bytes),
500
2162190
            },
501
2162190
            responder: result_sender,
502
2162190
        })?;
503

            
504
2162190
        Ok(result_receiver)
505
2162190
    }
506

            
507
1820280
    async fn send_request_async(&self, name: ApiName, bytes: Bytes) -> Result<Bytes, Error> {
508
1820280
        let result_receiver = self.send_request_without_confirmation(name, bytes)?;
509

            
510
        #[cfg(target_arch = "wasm32")]
511
        let result = {
512
            use wasm_bindgen::JsCast;
513
            let (timeout_sender, mut timeout_receiver) = futures::channel::oneshot::channel();
514
            // Install the timeout.
515
            {
516
                if let Some(window) = web_sys::window() {
517
                    let timeout = wasm_bindgen::closure::Closure::once_into_js(move || {
518
                        let _result = timeout_sender.send(());
519
                    });
520
                    let _: Result<_, _> = window
521
                        .set_timeout_with_callback_and_timeout_and_arguments_0(
522
                            timeout.as_ref().unchecked_ref(),
523
                            self.request_timeout
524
                                .as_millis()
525
                                .try_into()
526
                                .unwrap_or(i32::MAX),
527
                        );
528
                }
529
            }
530
            futures::select! {
531
                result = result_receiver.recv_async() => Ok(result),
532
                _ = timeout_receiver => Err(Error::Network(bonsaidb_core::networking::Error::RequestTimeout)),
533
            }
534
        };
535
        #[cfg(not(target_arch = "wasm32"))]
536
2941140
        let result = tokio::time::timeout(self.request_timeout, result_receiver.recv_async()).await;
537

            
538
1820280
        match result {
539
1820220
            Ok(response) => response?,
540
60
            Err(_) => Err(Error::request_timeout()),
541
        }
542
1820280
    }
543

            
544
    #[cfg(not(target_arch = "wasm32"))]
545
341340
    fn send_request(&self, name: ApiName, bytes: Bytes) -> Result<Bytes, Error> {
546
341340
        let result_receiver = self.send_request_without_confirmation(name, bytes)?;
547

            
548
341340
        result_receiver.recv_timeout(self.request_timeout)?
549
341340
    }
550

            
551
    /// Sends an api `request`.
552
1819294
    pub async fn send_api_request<Api: api::Api>(
553
1819294
        &self,
554
1819294
        request: &Api,
555
1819294
    ) -> Result<Api::Response, ApiError<Api::Error>> {
556
1819294
        let request = Bytes::from(pot::to_vec(request).map_err(Error::from)?);
557
2940067
        let response = self.send_request_async(Api::name(), request).await?;
558
1802878
        let response =
559
1802878
            pot::from_slice::<Result<Api::Response, Api::Error>>(&response).map_err(Error::from)?;
560
1802878
        response.map_err(ApiError::Api)
561
1819294
    }
562

            
563
    #[cfg(not(target_arch = "wasm32"))]
564
340992
    fn send_blocking_api_request<Api: api::Api>(
565
340992
        &self,
566
340992
        request: &Api,
567
340992
    ) -> Result<Api::Response, ApiError<Api::Error>> {
568
340992
        let request = Bytes::from(pot::to_vec(request).map_err(Error::from)?);
569
340992
        let response = self.send_request(Api::name(), request)?;
570

            
571
340390
        let response =
572
340390
            pot::from_slice::<Result<Api::Response, Api::Error>>(&response).map_err(Error::from)?;
573
340390
        response.map_err(ApiError::Api)
574
340992
    }
575

            
576
570
    fn invoke_blocking_api_request<Api: api::Api>(&self, request: &Api) -> Result<(), Error> {
577
570
        let request = Bytes::from(pot::to_vec(request).map_err(Error::from)?);
578
570
        self.send_request_without_confirmation(Api::name(), request)
579
570
            .map(|_| ())
580
570
    }
581

            
582
    /// Returns the current effective permissions for the client. Returns None
583
    /// if unauthenticated.
584
    #[must_use]
585
    pub fn effective_permissions(&self) -> Option<Permissions> {
586
        let effective_permissions = self.data.effective_permissions.lock();
587
        effective_permissions.clone()
588
    }
589

            
590
720
    pub(crate) fn register_subscriber(&self, id: u64, sender: flume::Sender<Message>) {
591
720
        let mut subscribers = self.data.subscribers.lock();
592
720
        subscribers.insert(id, sender);
593
720
    }
594

            
595
60
    pub(crate) async fn unregister_subscriber_async(&self, database: String, id: u64) {
596
60
        drop(
597
60
            self.send_api_request(&UnregisterSubscriber {
598
60
                database,
599
60
                subscriber_id: id,
600
60
            })
601
60
            .await,
602
        );
603
60
        let mut subscribers = self.data.subscribers.lock();
604
60
        subscribers.remove(&id);
605
60
    }
606

            
607
    #[cfg(not(target_arch = "wasm32"))]
608
240
    pub(crate) fn unregister_subscriber(&self, database: String, id: u64) {
609
240
        drop(self.send_blocking_api_request(&UnregisterSubscriber {
610
240
            database,
611
240
            subscriber_id: id,
612
240
        }));
613
240
        let mut subscribers = self.data.subscribers.lock();
614
240
        subscribers.remove(&id);
615
240
    }
616

            
617
1468
    fn remote_database<DB: bonsaidb_core::schema::Schema>(
618
1468
        &self,
619
1468
        name: &str,
620
1468
    ) -> Result<AsyncRemoteDatabase, bonsaidb_core::Error> {
621
1468
        let mut schemas = self.data.schemas.lock();
622
1468
        let type_id = TypeId::of::<DB>();
623
1468
        let schematic = if let Some(schematic) = schemas.get(&type_id) {
624
1116
            schematic.clone()
625
        } else {
626
352
            let schematic = Arc::new(DB::schematic()?);
627
352
            schemas.insert(type_id, schematic.clone());
628
352
            schematic
629
        };
630
1468
        Ok(AsyncRemoteDatabase::new(
631
1468
            self.clone(),
632
1468
            name.to_string(),
633
1468
            schematic,
634
1468
        ))
635
1468
    }
636

            
637
59280
    fn session_is_current(&self) -> bool {
638
59280
        self.session.session.id.is_none()
639
1170
            || self.data.connection_counter.load(Ordering::SeqCst) == self.session.connection_id
640
59280
    }
641

            
642
    /// Sets this instance's request timeout.
643
    ///
644
    /// Each client has its own timeout. When cloning a client, this timeout
645
    /// setting will be copied to the clone.
646
    pub fn set_request_timeout(&mut self, timeout: impl Into<Duration>) {
647
        self.request_timeout = timeout.into();
648
    }
649
}
650

            
651
impl HasSession for AsyncClient {
652
480
    fn session(&self) -> Option<&Session> {
653
480
        self.session_is_current().then_some(&self.session.session)
654
480
    }
655
}
656

            
657
#[async_trait]
658
impl AsyncStorageConnection for AsyncClient {
659
    type Authenticated = Self;
660
    type Database = AsyncRemoteDatabase;
661

            
662
    async fn admin(&self) -> Self::Database {
663
        self.remote_database::<Admin>(ADMIN_DATABASE_NAME).unwrap()
664
    }
665

            
666
17460
    async fn create_database_with_schema(
667
17460
        &self,
668
17460
        name: &str,
669
17460
        schema: SchemaName,
670
17460
        only_if_needed: bool,
671
17460
    ) -> Result<(), bonsaidb_core::Error> {
672
17460
        self.send_api_request(&CreateDatabase {
673
17460
            database: Database {
674
17460
                name: name.to_string(),
675
17460
                schema,
676
17460
            },
677
17460
            only_if_needed,
678
17460
        })
679
92880
        .await?;
680
17280
        Ok(())
681
52380
    }
682

            
683
1218
    async fn database<DB: Schema>(
684
1218
        &self,
685
1218
        name: &str,
686
1218
    ) -> Result<Self::Database, bonsaidb_core::Error> {
687
1218
        self.remote_database::<DB>(name)
688
2436
    }
689

            
690
15120
    async fn delete_database(&self, name: &str) -> Result<(), bonsaidb_core::Error> {
691
15120
        self.send_api_request(&DeleteDatabase {
692
15120
            name: name.to_string(),
693
15120
        })
694
85380
        .await?;
695
15060
        Ok(())
696
45360
    }
697

            
698
120
    async fn list_databases(&self) -> Result<Vec<Database>, bonsaidb_core::Error> {
699
120
        Ok(self.send_api_request(&ListDatabases).await?)
700
360
    }
701

            
702
60
    async fn list_available_schemas(&self) -> Result<Vec<SchemaSummary>, bonsaidb_core::Error> {
703
60
        Ok(self.send_api_request(&ListAvailableSchemas).await?)
704
180
    }
705

            
706
210
    async fn create_user(&self, username: &str) -> Result<u64, bonsaidb_core::Error> {
707
210
        Ok(self
708
210
            .send_api_request(&CreateUser {
709
210
                username: username.to_string(),
710
210
            })
711
210
            .await?)
712
630
    }
713

            
714
2
    async fn delete_user<'user, U: Nameable<'user, u64> + Send + Sync>(
715
2
        &self,
716
2
        user: U,
717
2
    ) -> Result<(), bonsaidb_core::Error> {
718
2
        Ok(self
719
2
            .send_api_request(&DeleteUser {
720
2
                user: user.name()?.into_owned(),
721
            })
722
2
            .await?)
723
6
    }
724

            
725
    #[cfg(feature = "password-hashing")]
726
    async fn set_user_password<'user, U: Nameable<'user, u64> + Send + Sync>(
727
        &self,
728
        user: U,
729
        password: bonsaidb_core::connection::SensitiveString,
730
    ) -> Result<(), bonsaidb_core::Error> {
731
        Ok(self
732
            .send_api_request(&bonsaidb_core::networking::SetUserPassword {
733
                user: user.name()?.into_owned(),
734
                password,
735
            })
736
            .await?)
737
    }
738

            
739
    #[cfg(any(feature = "token-authentication", feature = "password-hashing"))]
740
450
    async fn authenticate(
741
450
        &self,
742
450
        authentication: bonsaidb_core::connection::Authentication,
743
450
    ) -> Result<Self::Authenticated, bonsaidb_core::Error> {
744
450
        let session = self
745
450
            .send_api_request(&bonsaidb_core::networking::Authenticate { authentication })
746
630
            .await?;
747
450
        Ok(Self {
748
450
            data: self.data.clone(),
749
450
            session: ClientSession {
750
450
                session: Arc::new(session),
751
450
                connection_id: self.data.connection_counter.load(Ordering::SeqCst),
752
450
            },
753
450
            request_timeout: self.request_timeout,
754
450
        })
755
1350
    }
756

            
757
60
    async fn assume_identity(
758
60
        &self,
759
60
        identity: IdentityReference<'_>,
760
60
    ) -> Result<Self::Authenticated, bonsaidb_core::Error> {
761
60
        let session = self
762
60
            .send_api_request(&AssumeIdentity(identity.into_owned()))
763
60
            .await?;
764
60
        Ok(Self {
765
60
            data: self.data.clone(),
766
60
            session: ClientSession {
767
60
                session: Arc::new(session),
768
60
                connection_id: self.data.connection_counter.load(Ordering::SeqCst),
769
60
            },
770
60
            request_timeout: self.request_timeout,
771
60
        })
772
180
    }
773

            
774
4
    async fn add_permission_group_to_user<
775
4
        'user,
776
4
        'group,
777
4
        U: Nameable<'user, u64> + Send + Sync,
778
4
        G: Nameable<'group, u64> + Send + Sync,
779
4
    >(
780
4
        &self,
781
4
        user: U,
782
4
        permission_group: G,
783
4
    ) -> Result<(), bonsaidb_core::Error> {
784
4
        self.send_api_request(&AlterUserPermissionGroupMembership {
785
4
            user: user.name()?.into_owned(),
786
4
            group: permission_group.name()?.into_owned(),
787
            should_be_member: true,
788
        })
789
4
        .await?;
790
4
        Ok(())
791
12
    }
792

            
793
4
    async fn remove_permission_group_from_user<
794
4
        'user,
795
4
        'group,
796
4
        U: Nameable<'user, u64> + Send + Sync,
797
4
        G: Nameable<'group, u64> + Send + Sync,
798
4
    >(
799
4
        &self,
800
4
        user: U,
801
4
        permission_group: G,
802
4
    ) -> Result<(), bonsaidb_core::Error> {
803
4
        self.send_api_request(&AlterUserPermissionGroupMembership {
804
4
            user: user.name()?.into_owned(),
805
4
            group: permission_group.name()?.into_owned(),
806
            should_be_member: false,
807
        })
808
4
        .await?;
809
4
        Ok(())
810
12
    }
811

            
812
4
    async fn add_role_to_user<
813
4
        'user,
814
4
        'group,
815
4
        U: Nameable<'user, u64> + Send + Sync,
816
4
        G: Nameable<'group, u64> + Send + Sync,
817
4
    >(
818
4
        &self,
819
4
        user: U,
820
4
        role: G,
821
4
    ) -> Result<(), bonsaidb_core::Error> {
822
4
        self.send_api_request(&AlterUserRoleMembership {
823
4
            user: user.name()?.into_owned(),
824
4
            role: role.name()?.into_owned(),
825
            should_be_member: true,
826
        })
827
4
        .await?;
828
4
        Ok(())
829
12
    }
830

            
831
4
    async fn remove_role_from_user<
832
4
        'user,
833
4
        'group,
834
4
        U: Nameable<'user, u64> + Send + Sync,
835
4
        G: Nameable<'group, u64> + Send + Sync,
836
4
    >(
837
4
        &self,
838
4
        user: U,
839
4
        role: G,
840
4
    ) -> Result<(), bonsaidb_core::Error> {
841
4
        self.send_api_request(&AlterUserRoleMembership {
842
4
            user: user.name()?.into_owned(),
843
4
            role: role.name()?.into_owned(),
844
            should_be_member: false,
845
        })
846
4
        .await?;
847
4
        Ok(())
848
12
    }
849
}
850

            
851
type OutstandingRequestMap = HashMap<u32, PendingRequest>;
852
type OutstandingRequestMapHandle = Arc<async_lock::Mutex<OutstandingRequestMap>>;
853
type PendingRequestResponder = Sender<Result<Bytes, Error>>;
854

            
855
#[derive(Debug)]
856
pub struct PendingRequest {
857
    request: Payload,
858
    responder: PendingRequestResponder,
859
}
860

            
861
2162700
async fn process_response_payload(
862
2162700
    payload: Payload,
863
2162700
    outstanding_requests: &OutstandingRequestMapHandle,
864
2162700
    custom_apis: &HashMap<ApiName, Option<Arc<dyn AnyApiCallback>>>,
865
2162700
) {
866
2162700
    if let Some(payload_id) = payload.id {
867
2161350
        if let Some(outstanding_request) = {
868
2161350
            let mut outstanding_requests = fast_async_lock!(outstanding_requests);
869
2161350
            outstanding_requests.remove(&payload_id)
870
2161350
        } {
871
2161350
            drop(
872
2161350
                outstanding_request
873
2161350
                    .responder
874
2161350
                    .send(payload.value.map_err(Error::from)),
875
2161350
            );
876
2161350
        }
877
1350
    } else if let (Some(custom_api_callback), Ok(value)) = (
878
1350
        custom_apis.get(&payload.name).and_then(Option::as_ref),
879
1350
        payload.value,
880
1350
    ) {
881
1350
        custom_api_callback.response_received(value).await;
882
    } else {
883
        log::warn!("unexpected api response received ({})", payload.name);
884
    }
885
2162700
}
886

            
887
trait ApiWrapper<Response>: Send + Sync {
888
    fn invoke(&self, response: Response) -> BoxFuture<'static, ()>;
889
}
890

            
891
/// A callback that is invoked when an [`Api::Response`](Api::Response)
892
/// value is received out-of-band (not in reply to a request).
893
pub struct ApiCallback<Api: api::Api> {
894
    generator: Box<dyn ApiWrapper<Api::Response>>,
895
}
896

            
897
/// The trait bounds required for the function wrapped in a [`ApiCallback`].
898
pub trait ApiCallbackFn<Request, F>: Fn(Request) -> F + Send + Sync + 'static {}
899

            
900
impl<T, Request, F> ApiCallbackFn<Request, F> for T where T: Fn(Request) -> F + Send + Sync + 'static
901
{}
902

            
903
struct ApiFutureBoxer<Response: Send + Sync, F: Future<Output = ()> + Send + Sync>(
904
    Box<dyn ApiCallbackFn<Response, F>>,
905
);
906

            
907
impl<Response: Send + Sync, F: Future<Output = ()> + Send + Sync + 'static> ApiWrapper<Response>
908
    for ApiFutureBoxer<Response, F>
909
{
910
1350
    fn invoke(&self, response: Response) -> BoxFuture<'static, ()> {
911
1350
        self.0(response).boxed()
912
1350
    }
913
}
914

            
915
impl<Api: api::Api> ApiCallback<Api> {
916
    /// Returns a new instance wrapping the provided function.
917
5550
    pub fn new<
918
5550
        F: ApiCallbackFn<Api::Response, Fut>,
919
5550
        Fut: Future<Output = ()> + Send + Sync + 'static,
920
5550
    >(
921
5550
        callback: F,
922
5550
    ) -> Self {
923
5550
        Self {
924
5550
            generator: Box::new(ApiFutureBoxer::<Api::Response, Fut>(Box::new(callback))),
925
5550
        }
926
5550
    }
927

            
928
    /// Returns a new instance wrapping the provided function, passing a clone
929
    /// of `context` as the second parameter. This is just a convenience wrapper
930
    /// around `new()` that produces more readable code when needing to access
931
    /// external information inside of the callback.
932
    pub fn new_with_context<
933
        Context: Send + Sync + Clone + 'static,
934
        F: Fn(Api::Response, Context) -> Fut + Send + Sync + 'static,
935
        Fut: Future<Output = ()> + Send + Sync + 'static,
936
    >(
937
        context: Context,
938
        callback: F,
939
    ) -> Self {
940
        Self {
941
            generator: Box::new(ApiFutureBoxer::<Api::Response, Fut>(Box::new(
942
                move |request| {
943
                    let context = context.clone();
944
                    callback(request, context)
945
                },
946
            ))),
947
        }
948
    }
949
}
950

            
951
#[async_trait]
952
pub trait AnyApiCallback: Send + Sync + 'static {
953
    /// An out-of-band `response` was received. This happens when the server
954
    /// sends a response that isn't in response to a request.
955
    async fn response_received(&self, response: Bytes);
956
}
957

            
958
#[async_trait]
959
impl<Api: api::Api> AnyApiCallback for ApiCallback<Api> {
960
1350
    async fn response_received(&self, response: Bytes) {
961
1350
        match pot::from_slice::<Result<Api::Response, Api::Error>>(&response) {
962
1350
            Ok(response) => self.generator.invoke(response.unwrap()).await,
963
            Err(err) => {
964
                log::error!("error deserializing api: {err}");
965
            }
966
        }
967
2700
    }
968
}
969

            
970
6913
#[derive(Debug, Clone, Default)]
971
pub struct ClientSession {
972
    session: Arc<Session>,
973
    connection_id: u32,
974
}
975

            
976
2700
async fn disconnect_pending_requests(
977
2700
    outstanding_requests: &OutstandingRequestMapHandle,
978
2700
    pending_error: &mut Option<Error>,
979
2700
) {
980
2700
    let mut outstanding_requests = fast_async_lock!(outstanding_requests);
981
2700
    for (_, pending) in outstanding_requests.drain() {
982
330
        drop(
983
330
            pending
984
330
                .responder
985
330
                .send(Err(pending_error.take().unwrap_or(Error::disconnected()))),
986
330
        );
987
330
    }
988
2700
}
989

            
990
struct ConnectionInfo {
991
    pub url: Url,
992
    pub subscribers: SubscriberMap,
993
    #[cfg_attr(target_arch = "wasm32", allow(dead_code))]
994
    pub connect_timeout: Duration,
995
    pub request_timeout: Duration,
996
}