1
use std::{
2
    collections::{hash_map, HashMap},
3
    fmt::Debug,
4
    marker::PhantomData,
5
    net::SocketAddr,
6
    ops::Deref,
7
    path::PathBuf,
8
    sync::{
9
        atomic::{AtomicU32, AtomicUsize, Ordering},
10
        Arc,
11
    },
12
    time::Duration,
13
};
14

            
15
use async_lock::{Mutex, RwLock};
16
use async_trait::async_trait;
17
use bonsaidb_core::{
18
    admin::User,
19
    arc_bytes::serde::Bytes,
20
    circulate::{Message, Relay, Subscriber},
21
    connection::{self, AccessPolicy, Connection, QueryKey, Range, Sort, StorageConnection},
22
    custom_api::{CustomApi, CustomApiResult},
23
    document::DocumentId,
24
    keyvalue::{KeyOperation, KeyValue},
25
    networking::{
26
        self, CreateDatabaseHandler, DatabaseRequest, DatabaseRequestDispatcher, DatabaseResponse,
27
        DeleteDatabaseHandler, Payload, Request, RequestDispatcher, Response, ServerRequest,
28
        ServerRequestDispatcher, ServerResponse, CURRENT_PROTOCOL_VERSION,
29
    },
30
    permissions::{
31
        bonsai::{
32
            bonsaidb_resource_name, collection_resource_name, database_resource_name,
33
            document_resource_name, keyvalue_key_resource_name, kv_resource_name,
34
            pubsub_topic_resource_name, user_resource_name, view_resource_name, BonsaiAction,
35
            DatabaseAction, DocumentAction, KeyValueAction, PubSubAction, ServerAction,
36
            TransactionAction, ViewAction,
37
        },
38
        Action, Dispatcher, PermissionDenied, Permissions, ResourceName,
39
    },
40
    pubsub::database_topic,
41
    schema::{self, CollectionName, Nameable, NamedCollection, NamedReference, Schema, ViewName},
42
    transaction::{Command, Transaction},
43
};
44
#[cfg(feature = "password-hashing")]
45
use bonsaidb_core::{
46
    connection::{Authenticated, Authentication},
47
    permissions::bonsai::AuthenticationMethod,
48
};
49
use bonsaidb_local::{config::Builder, Database, Storage};
50
use bonsaidb_utils::{fast_async_lock, fast_async_read, fast_async_write};
51
use derive_where::derive_where;
52
use fabruic::{self, CertificateChain, Endpoint, KeyPair, PrivateKey};
53
use flume::Sender;
54
use futures::{Future, StreamExt};
55
use rustls::sign::CertifiedKey;
56
use schema::SchemaName;
57
#[cfg(not(windows))]
58
use signal_hook::consts::SIGQUIT;
59
use signal_hook::consts::{SIGINT, SIGTERM};
60
use tokio::sync::{oneshot, Notify};
61

            
62
#[cfg(feature = "acme")]
63
use crate::config::AcmeConfiguration;
64
use crate::{
65
    backend::{BackendError, ConnectionHandling, CustomApiDispatcher},
66
    error::Error,
67
    hosted::{Hosted, SerializablePrivateKey, TlsCertificate, TlsCertificatesByDomain},
68
    server::shutdown::{Shutdown, ShutdownState},
69
    Backend, NoBackend, ServerConfiguration,
70
};
71

            
72
#[cfg(feature = "acme")]
73
pub mod acme;
74
mod connected_client;
75
mod database;
76

            
77
mod shutdown;
78
mod tcp;
79
#[cfg(feature = "websockets")]
80
mod websockets;
81

            
82
use self::connected_client::OwnedClient;
83
pub use self::{
84
    connected_client::{ConnectedClient, LockedClientDataGuard, Transport},
85
    database::{ServerDatabase, ServerSubscriber},
86
    tcp::{ApplicationProtocols, HttpService, Peer, StandardTcpProtocols, TcpService},
87
};
88

            
89
static CONNECTED_CLIENT_ID_COUNTER: AtomicU32 = AtomicU32::new(0);
90

            
91
/// A BonsaiDb server.
92
#[derive(Debug)]
93
53432
#[derive_where(Clone)]
94
pub struct CustomServer<B: Backend = NoBackend> {
95
    data: Arc<Data<B>>,
96
}
97

            
98
/// A BonsaiDb server without a custom backend.
99
pub type Server = CustomServer<NoBackend>;
100

            
101
#[derive(Debug)]
102
struct Data<B: Backend = NoBackend> {
103
    storage: Storage,
104
    clients: RwLock<HashMap<u32, ConnectedClient<B>>>,
105
    request_processor: flume::Sender<ClientRequest<B>>,
106
    default_permissions: Permissions,
107
    authenticated_permissions: Permissions,
108
    endpoint: RwLock<Option<Endpoint>>,
109
    client_simultaneous_request_limit: usize,
110
    primary_tls_key: CachedCertifiedKey,
111
    primary_domain: String,
112
    #[cfg(feature = "acme")]
113
    acme: AcmeConfiguration,
114
    #[cfg(feature = "acme")]
115
    alpn_keys: AlpnKeys,
116
    shutdown: Shutdown,
117
    relay: Relay,
118
    subscribers: Arc<RwLock<HashMap<u64, Subscriber>>>,
119
    _backend: PhantomData<B>,
120
}
121

            
122
80
#[derive(Default)]
123
struct CachedCertifiedKey(parking_lot::Mutex<Option<Arc<CertifiedKey>>>);
124

            
125
impl Debug for CachedCertifiedKey {
126
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127
        f.debug_tuple("CachedCertifiedKey").finish()
128
    }
129
}
130

            
131
impl Deref for CachedCertifiedKey {
132
    type Target = parking_lot::Mutex<Option<Arc<CertifiedKey>>>;
133

            
134
868
    fn deref(&self) -> &Self::Target {
135
868
        &self.0
136
868
    }
137
}
138

            
139
impl<B: Backend> CustomServer<B> {
140
    /// Opens a server using `directory` for storage.
141
80
    pub async fn open(configuration: ServerConfiguration) -> Result<Self, Error> {
142
80
        let (request_sender, request_receiver) = flume::unbounded::<ClientRequest<B>>();
143
1280
        for _ in 0..configuration.request_workers {
144
1280
            let request_receiver = request_receiver.clone();
145
1280
            tokio::task::spawn(async move {
146
91694
                while let Ok(mut client_request) = request_receiver.recv_async().await {
147
52528
                    let request = client_request.request.take().unwrap();
148
52528
                    let result = ServerDispatcher {
149
52528
                        server: &client_request.server,
150
52528
                        client: &client_request.client,
151
52528
                        subscribers: &client_request.subscribers,
152
52528
                        response_sender: &client_request.sender,
153
52528
                    }
154
589478
                    .dispatch(&client_request.client.permissions().await, request)
155
579691
                    .await;
156
52528
                    let result = result
157
52528
                        .unwrap_or_else(|err| Response::Error(bonsaidb_core::Error::from(err)));
158
52528
                    drop(client_request.result_sender.send(result));
159
                }
160
1280
            });
161
1280
        }
162

            
163
1295
        let storage = Storage::open(configuration.storage.with_schema::<Hosted>()?).await?;
164

            
165
152
        storage.create_database::<Hosted>("_hosted", true).await?;
166

            
167
80
        let default_permissions = Permissions::from(configuration.default_permissions);
168
80
        let authenticated_permissions = Permissions::from(configuration.authenticated_permissions);
169
80

            
170
80
        let server = Self {
171
80
            data: Arc::new(Data {
172
80
                clients: RwLock::default(),
173
80
                storage,
174
80
                endpoint: RwLock::default(),
175
80
                request_processor: request_sender,
176
80
                default_permissions,
177
80
                authenticated_permissions,
178
80
                client_simultaneous_request_limit: configuration.client_simultaneous_request_limit,
179
80
                primary_tls_key: CachedCertifiedKey::default(),
180
80
                primary_domain: configuration.server_name,
181
80
                #[cfg(feature = "acme")]
182
80
                acme: configuration.acme,
183
80
                #[cfg(feature = "acme")]
184
80
                alpn_keys: AlpnKeys::default(),
185
80
                shutdown: Shutdown::new(),
186
80
                relay: Relay::default(),
187
80
                subscribers: Arc::default(),
188
80
                _backend: PhantomData::default(),
189
80
            }),
190
80
        };
191
80
        B::initialize(&server).await;
192
80
        Ok(server)
193
80
    }
194

            
195
    /// Returns the path to the public pinned certificate, if this server has
196
    /// one. Note: this function will always succeed, but the file may not
197
    /// exist.
198
    #[must_use]
199
148
    pub fn pinned_certificate_path(&self) -> PathBuf {
200
148
        self.path().join("pinned-certificate.der")
201
148
    }
202

            
203
    /// Returns the primary domain configured for this server.
204
    #[must_use]
205
32
    pub fn primary_domain(&self) -> &str {
206
32
        &self.data.primary_domain
207
32
    }
208

            
209
    /// Returns the administration database.
210
23
    pub async fn admin(&self) -> ServerDatabase<B> {
211
23
        let db = self.data.storage.admin().await;
212
23
        ServerDatabase {
213
23
            server: self.clone(),
214
23
            db,
215
23
        }
216
23
    }
217

            
218
283
    pub(crate) async fn hosted(&self) -> ServerDatabase<B> {
219
283
        let db = self
220
281
            .data
221
281
            .storage
222
281
            .database::<Hosted>("_hosted")
223
76
            .await
224
283
            .unwrap();
225
283
        ServerDatabase {
226
283
            server: self.clone(),
227
283
            db,
228
283
        }
229
283
    }
230

            
231
    /// Installs an X.509 certificate used for general purpose connections.
232
75
    pub async fn install_self_signed_certificate(&self, overwrite: bool) -> Result<(), Error> {
233
74
        let keypair = KeyPair::new_self_signed(&self.data.primary_domain);
234
74

            
235
210
        if self.certificate_chain().await.is_ok() && !overwrite {
236
1
            return Err(Error::Core(bonsaidb_core::Error::Configuration(String::from("Certificate already installed. Enable overwrite if you wish to replace the existing certificate."))));
237
74
        }
238
74

            
239
437
        self.install_certificate(keypair.certificate_chain(), keypair.private_key())
240
437
            .await?;
241

            
242
74
        tokio::fs::write(
243
74
            self.pinned_certificate_path(),
244
74
            keypair.end_entity_certificate().as_ref(),
245
74
        )
246
72
        .await?;
247

            
248
74
        Ok(())
249
75
    }
250

            
251
    /// Installs a certificate chain and private key used for TLS connections.
252
    #[cfg(feature = "pem")]
253
    pub async fn install_pem_certificate(
254
        &self,
255
        certificate_chain: &[u8],
256
        private_key: &[u8],
257
    ) -> Result<(), Error> {
258
        let private_key = match pem::parse(private_key) {
259
            Ok(pem) => PrivateKey::unchecked_from_der(pem.contents),
260
            Err(_) => PrivateKey::from_der(private_key)?,
261
        };
262
        let certificates = pem::parse_many(&certificate_chain)?
263
            .into_iter()
264
            .map(|entry| fabruic::Certificate::unchecked_from_der(entry.contents))
265
            .collect::<Vec<_>>();
266

            
267
        self.install_certificate(
268
            &CertificateChain::unchecked_from_certificates(certificates),
269
            &private_key,
270
        )
271
        .await
272
    }
273

            
274
    /// Installs a certificate chain and private key used for TLS connections.
275
74
    pub async fn install_certificate(
276
74
        &self,
277
74
        certificate_chain: &CertificateChain,
278
74
        private_key: &PrivateKey,
279
74
    ) -> Result<(), Error> {
280
74
        let db = self.hosted().await;
281

            
282
74
        TlsCertificate::entry(&self.data.primary_domain, &db)
283
74
            .update_with(|cert: &mut TlsCertificate| {
284
1
                cert.certificate_chain = certificate_chain.clone();
285
1
                cert.private_key = SerializablePrivateKey(private_key.clone());
286
74
            })
287
74
            .or_insert_with(|| TlsCertificate {
288
73
                domains: vec![self.data.primary_domain.clone()],
289
73
                private_key: SerializablePrivateKey(private_key.clone()),
290
73
                certificate_chain: certificate_chain.clone(),
291
288
            })
292
288
            .await?;
293

            
294
148
        self.refresh_certified_key().await?;
295

            
296
74
        let pinned_certificate_path = self.pinned_certificate_path();
297
74
        if pinned_certificate_path.exists() {
298
1
            tokio::fs::remove_file(&pinned_certificate_path).await?;
299
73
        }
300

            
301
74
        Ok(())
302
74
    }
303

            
304
75
    async fn refresh_certified_key(&self) -> Result<(), Error> {
305
152
        let certificate = self.tls_certificate().await?;
306

            
307
75
        let mut cached_key = self.data.primary_tls_key.lock();
308
75
        let private_key = rustls::PrivateKey(
309
75
            fabruic::dangerous::PrivateKey::as_ref(&certificate.private_key.0).to_vec(),
310
75
        );
311
75
        let private_key = rustls::sign::any_ecdsa_type(&Arc::new(private_key))?;
312

            
313
75
        let certificates = certificate
314
75
            .certificate_chain
315
75
            .iter()
316
75
            .map(|cert| rustls::Certificate(cert.as_ref().to_vec()))
317
75
            .collect::<Vec<_>>();
318
75

            
319
75
        let certified_key = Arc::new(CertifiedKey::new(certificates, private_key));
320
75
        *cached_key = Some(certified_key);
321
75
        Ok(())
322
75
    }
323

            
324
97
    async fn tls_certificate(&self) -> Result<TlsCertificate, Error> {
325
97
        let db = self.hosted().await;
326
97
        let (_, certificate) = db
327
97
            .view::<TlsCertificatesByDomain>()
328
97
            .with_key(self.data.primary_domain.clone())
329
177
            .query_with_collection_docs()
330
177
            .await?
331
            .documents
332
97
            .into_iter()
333
97
            .next()
334
97
            .ok_or_else(|| {
335
                Error::Core(bonsaidb_core::Error::Configuration(format!(
336
                    "no certificate found for {}",
337
                    self.data.primary_domain
338
                )))
339
97
            })?;
340
97
        Ok(certificate.contents)
341
97
    }
342

            
343
    /// Returns the current certificate chain.
344
112
    pub async fn certificate_chain(&self) -> Result<CertificateChain, Error> {
345
112
        let db = self.hosted().await;
346
112
        if let Some(mapping) = db
347
112
            .view::<TlsCertificatesByDomain>()
348
112
            .with_key(self.data.primary_domain.clone())
349
165
            .query()
350
160
            .await?
351
112
            .into_iter()
352
112
            .next()
353
        {
354
36
            Ok(mapping.value)
355
        } else {
356
76
            Err(Error::Core(bonsaidb_core::Error::Configuration(format!(
357
76
                "no certificate found for {}",
358
76
                self.data.primary_domain
359
76
            ))))
360
        }
361
112
    }
362

            
363
    /// Listens for incoming client connections. Does not return until the
364
    /// server shuts down.
365
22
    pub async fn listen_on(&self, port: u16) -> Result<(), Error> {
366
27
        let certificate = self.tls_certificate().await?;
367
22
        let keypair =
368
22
            KeyPair::from_parts(certificate.certificate_chain, certificate.private_key.0)?;
369
22
        let mut builder = Endpoint::builder();
370
22
        builder.set_protocols([CURRENT_PROTOCOL_VERSION.as_bytes().to_vec()]);
371
22
        builder.set_address(([0; 8], port).into());
372
22
        builder
373
22
            .set_max_idle_timeout(None)
374
22
            .map_err(|err| Error::Core(bonsaidb_core::Error::Transport(err.to_string())))?;
375
22
        builder.set_server_key_pair(Some(keypair));
376
22
        let mut server = builder
377
22
            .build()
378
22
            .map_err(|err| Error::Core(bonsaidb_core::Error::Transport(err.to_string())))?;
379
22
        {
380
22
            let mut endpoint = fast_async_write!(self.data.endpoint);
381
22
            *endpoint = Some(server.clone());
382
        }
383

            
384
22
        let mut shutdown_watcher = self
385
22
            .data
386
22
            .shutdown
387
22
            .watcher()
388
            .await
389
22
            .expect("server already shut down");
390

            
391
83
        while let Some(result) = tokio::select! {
392
4
            shutdown_state = shutdown_watcher.wait_for_shutdown() => {
393
                drop(server.close_incoming());
394
                if matches!(shutdown_state, ShutdownState::GracefulShutdown) {
395
                    server.wait_idle().await;
396
                }
397
                drop(server.close());
398
                None
399
            },
400
61
            msg = server.next() => msg
401
61
        } {
402
61
            let connection = result.accept::<()>().await?;
403
61
            let task_self = self.clone();
404
61
            tokio::spawn(async move {
405
61
                let address = connection.remote_address();
406
61
                if let Err(err) = task_self.handle_bonsai_connection(connection).await {
407
                    log::error!("[server] closing connection {}: {:?}", address, err);
408
61
                }
409
61
            });
410
        }
411

            
412
4
        Ok(())
413
4
    }
414

            
415
    /// Returns all of the currently connected clients.
416
    pub async fn connected_clients(&self) -> Vec<ConnectedClient<B>> {
417
        let clients = fast_async_read!(self.data.clients);
418
        clients.values().cloned().collect()
419
    }
420

            
421
    /// Sends a custom API response to all connected clients.
422
    pub async fn broadcast(&self, response: CustomApiResult<B::CustomApi>) {
423
        let clients = fast_async_read!(self.data.clients);
424
        for client in clients.values() {
425
            drop(client.send(response.clone()));
426
        }
427
    }
428

            
429
116
    async fn initialize_client(
430
116
        &self,
431
116
        transport: Transport,
432
116
        address: SocketAddr,
433
116
        sender: Sender<CustomApiResult<B::CustomApi>>,
434
116
    ) -> Option<OwnedClient<B>> {
435
116
        if !self.data.default_permissions.allowed_to(
436
116
            &bonsaidb_resource_name(),
437
116
            &BonsaiAction::Server(ServerAction::Connect),
438
116
        ) {
439
            return None;
440
116
        }
441

            
442
116
        let client = loop {
443
116
            let next_id = CONNECTED_CLIENT_ID_COUNTER.fetch_add(1, Ordering::SeqCst);
444
116
            let mut clients = fast_async_write!(self.data.clients);
445
116
            if let hash_map::Entry::Vacant(e) = clients.entry(next_id) {
446
116
                let client = OwnedClient::new(next_id, address, transport, sender, self.clone());
447
116
                e.insert(client.clone());
448
116
                break client;
449
            }
450
        };
451

            
452
        if matches!(
453
116
            B::client_connected(&client, self).await,
454
            ConnectionHandling::Accept
455
        ) {
456
116
            Some(client)
457
        } else {
458
            None
459
        }
460
116
    }
461

            
462
57
    async fn disconnect_client(&self, id: u32) {
463
57
        if let Some(client) = {
464
57
            let mut clients = fast_async_write!(self.data.clients);
465
57
            clients.remove(&id)
466
        } {
467
57
            B::client_disconnected(client, self).await;
468
        }
469
57
    }
470

            
471
61
    async fn handle_bonsai_connection(
472
61
        &self,
473
61
        mut connection: fabruic::Connection<()>,
474
61
    ) -> Result<(), Error> {
475
61
        if let Some(incoming) = connection.next().await {
476
61
            let incoming = match incoming {
477
61
                Ok(incoming) => incoming,
478
                Err(err) => {
479
                    log::error!("[server] Error establishing a stream: {:?}", err);
480
                    return Ok(());
481
                }
482
            };
483

            
484
61
            match incoming
485
61
            .accept::<networking::Payload<Response<CustomApiResult<B::CustomApi>>>, networking::Payload<Request<<B::CustomApi as CustomApi>::Request>>>()
486
            .await {
487
61
                Ok((sender, receiver)) => {
488
61
                    let (api_response_sender, api_response_receiver) = flume::unbounded();
489
61
                    if let Some(disconnector) = self
490
61
                        .initialize_client(
491
61
                            Transport::Bonsai,
492
61
                            connection.remote_address(),
493
61
                            api_response_sender,
494
61
                        )
495
                        .await
496
61
                    {
497
61
                        let task_sender = sender.clone();
498
61
                        tokio::spawn(async move {
499
61
                            while let Ok(response) = api_response_receiver.recv_async().await {
500
                                if task_sender
501
                                    .send(&Payload {
502
                                        id: None,
503
                                        wrapped: Response::Api(response),
504
                                    })
505
                                    .is_err()
506
                                {
507
                                    break;
508
                                }
509
                            }
510
15
                            let _ = connection.close().await;
511
61
                        });
512
61

            
513
61
                        let task_self = self.clone();
514
61
                        tokio::spawn(async move {
515
61
                            if let Err(err) = task_self
516
19479
                                .handle_stream(disconnector, sender, receiver)
517
19479
                                .await
518
                            {
519
                                log::error!("[server] Error handling stream: {:?}", err);
520
15
                            }
521
61
                        });
522
61
                    } else {
523
                        log::error!("[server] Backend rejected connection.");
524
                        return Ok(());
525
                    }
526
                }
527
                Err(err) => {
528
                    log::error!("[server] Error accepting incoming stream: {:?}", err);
529
                    return Ok(());
530
                }
531
            }
532
        }
533
61
        Ok(())
534
61
    }
535

            
536
116
    async fn handle_client_requests(
537
116
        &self,
538
116
        client: ConnectedClient<B>,
539
116
        request_receiver: flume::Receiver<Payload<Request<<B::CustomApi as CustomApi>::Request>>>,
540
116
        response_sender: flume::Sender<Payload<Response<CustomApiResult<B::CustomApi>>>>,
541
116
    ) {
542
116
        let notify = Arc::new(Notify::new());
543
116
        let requests_in_queue = Arc::new(AtomicUsize::new(0));
544
73109
        loop {
545
73109
            let current_requests = requests_in_queue.load(Ordering::SeqCst);
546
73109
            if current_requests == self.data.client_simultaneous_request_limit {
547
                // Wait for requests to finish.
548
20464
                notify.notified().await;
549
52645
            } else if requests_in_queue
550
52645
                .compare_exchange(
551
52645
                    current_requests,
552
52645
                    current_requests + 1,
553
52645
                    Ordering::SeqCst,
554
52645
                    Ordering::SeqCst,
555
52645
                )
556
52645
                .is_ok()
557
            {
558
52644
                let payload = match request_receiver.recv_async().await {
559
52528
                    Ok(payload) => payload,
560
57
                    Err(_) => break,
561
                };
562
52528
                let id = payload.id;
563
52528
                let task_sender = response_sender.clone();
564
52528

            
565
52528
                let notify = notify.clone();
566
52528
                let requests_in_queue = requests_in_queue.clone();
567
52528
                self.handle_request_through_worker(
568
52528
                    payload.wrapped,
569
52528
                    move |response| async move {
570
52528
                        drop(task_sender.send(Payload {
571
52528
                            id,
572
52528
                            wrapped: response,
573
52528
                        }));
574
52528

            
575
52528
                        requests_in_queue.fetch_sub(1, Ordering::SeqCst);
576
52528

            
577
52528
                        notify.notify_one();
578
52528

            
579
52528
                        Ok(())
580
52528
                    },
581
52528
                    client.clone(),
582
52528
                    self.data.subscribers.clone(),
583
52528
                    response_sender.clone(),
584
52528
                )
585
                .await
586
52528
                .unwrap();
587
1
            }
588
        }
589
57
    }
590

            
591
52528
    async fn handle_request_through_worker<
592
52528
        F: FnOnce(Response<CustomApiResult<B::CustomApi>>) -> R + Send + 'static,
593
52528
        R: Future<Output = Result<(), Error>> + Send,
594
52528
    >(
595
52528
        &self,
596
52528
        request: Request<<B::CustomApi as CustomApi>::Request>,
597
52528
        callback: F,
598
52528
        client: ConnectedClient<B>,
599
52528
        subscribers: Arc<RwLock<HashMap<u64, Subscriber>>>,
600
52528
        response_sender: flume::Sender<Payload<Response<CustomApiResult<B::CustomApi>>>>,
601
52528
    ) -> Result<(), Error> {
602
52528
        let (result_sender, result_receiver) = oneshot::channel();
603
52528
        self.data
604
52528
            .request_processor
605
52528
            .send(ClientRequest::<B>::new(
606
52528
                request,
607
52528
                self.clone(),
608
52528
                client,
609
52528
                subscribers,
610
52528
                response_sender,
611
52528
                result_sender,
612
52528
            ))
613
52528
            .map_err(|_| Error::InternalCommunication)?;
614
52528
        tokio::spawn(async move {
615
52528
            let result = result_receiver.await?;
616
            // Map the error into a Response::Error. The jobs system supports
617
            // multiple receivers receiving output, and wraps Err to avoid
618
            // requiring the error to be cloneable. As such, we have to unwrap
619
            // it. Thankfully, we can guarantee nothing else is waiting on a
620
            // response to a request than the original requestor, so this can be
621
            // safely unwrapped.
622
52528
            callback(result).await?;
623
52528
            Result::<(), Error>::Ok(())
624
52528
        });
625
52528
        Ok(())
626
52528
    }
627

            
628
61
    async fn handle_stream(
629
61
        &self,
630
61
        client: OwnedClient<B>,
631
61
        sender: fabruic::Sender<Payload<Response<CustomApiResult<B::CustomApi>>>>,
632
61
        mut receiver: fabruic::Receiver<Payload<Request<<B::CustomApi as CustomApi>::Request>>>,
633
61
    ) -> Result<(), Error> {
634
61
        let (payload_sender, payload_receiver) = flume::unbounded();
635
61
        tokio::spawn(async move {
636
30596
            while let Ok(payload) = payload_receiver.recv_async().await {
637
30535
                if sender.send(&payload).is_err() {
638
                    break;
639
30535
                }
640
            }
641
61
        });
642
61

            
643
61
        let (request_sender, request_receiver) =
644
61
            flume::bounded::<Payload<Request<<B::CustomApi as CustomApi>::Request>>>(
645
61
                self.data.client_simultaneous_request_limit,
646
61
            );
647
61
        let task_self = self.clone();
648
61
        tokio::spawn(async move {
649
61
            task_self
650
24458
                .handle_client_requests(client.clone(), request_receiver, payload_sender)
651
24458
                .await;
652
61
        });
653

            
654
30582
        while let Some(payload) = receiver.next().await {
655
30521
            drop(request_sender.send_async(payload?).await);
656
        }
657

            
658
15
        Ok(())
659
15
    }
660

            
661
14
    async fn forward_notifications_for(
662
14
        &self,
663
14
        subscriber_id: u64,
664
14
        receiver: flume::Receiver<Arc<Message>>,
665
14
        sender: flume::Sender<Payload<Response<CustomApiResult<B::CustomApi>>>>,
666
14
    ) {
667
42
        while let Ok(message) = receiver.recv_async().await {
668
28
            if sender
669
28
                .send(Payload {
670
28
                    id: None,
671
28
                    wrapped: Response::Database(DatabaseResponse::MessageReceived {
672
28
                        subscriber_id,
673
28
                        topic: message.topic.clone(),
674
28
                        payload: Bytes::from(&message.payload[..]),
675
28
                    }),
676
28
                })
677
28
                .is_err()
678
            {
679
                break;
680
28
            }
681
        }
682
    }
683

            
684
    /// Shuts the server down. If a `timeout` is provided, the server will stop
685
    /// accepting new connections and attempt to respond to any outstanding
686
    /// requests already being processed. After the `timeout` has elapsed or if
687
    /// no `timeout` was provided, the server is forcefully shut down.
688
29
    pub async fn shutdown(&self, timeout: Option<Duration>) -> Result<(), Error> {
689
29
        if let Some(timeout) = timeout {
690
4
            self.data.shutdown.graceful_shutdown(timeout).await;
691
        } else {
692
25
            self.data.shutdown.shutdown().await;
693
        }
694

            
695
29
        Ok(())
696
29
    }
697

            
698
    /// Listens for signals from the operating system that the server should
699
    /// shut down and attempts to gracefully shut down.
700
1
    pub async fn listen_for_shutdown(&self) -> Result<(), Error> {
701
1
        const GRACEFUL_SHUTDOWN: usize = 1;
702
1
        const TERMINATE: usize = 2;
703
1

            
704
1
        enum SignalShutdownState {
705
1
            Running,
706
1
            ShuttingDown(flume::Receiver<()>),
707
1
        }
708
1

            
709
1
        let shutdown_state = Arc::new(Mutex::new(SignalShutdownState::Running));
710
1
        let flag = Arc::new(AtomicUsize::default());
711
1
        let register_hook = |flag: &Arc<AtomicUsize>| {
712
1
            signal_hook::flag::register_usize(SIGINT, flag.clone(), GRACEFUL_SHUTDOWN)?;
713
1
            signal_hook::flag::register_usize(SIGTERM, flag.clone(), TERMINATE)?;
714
            #[cfg(not(windows))]
715
1
            signal_hook::flag::register_usize(SIGQUIT, flag.clone(), TERMINATE)?;
716
1
            Result::<(), std::io::Error>::Ok(())
717
1
        };
718
1
        if let Err(err) = register_hook(&flag) {
719
            log::error!("Error installing signals for graceful shutdown: {:?}", err);
720
            tokio::time::sleep(Duration::MAX).await;
721
        } else {
722
            loop {
723
4
                match flag.load(Ordering::Relaxed) {
724
4
                    0 => {
725
4
                        // No signal
726
4
                    }
727
                    GRACEFUL_SHUTDOWN => {
728
                        let mut state = fast_async_lock!(shutdown_state);
729
                        match *state {
730
                            SignalShutdownState::Running => {
731
                                log::error!("Interrupt signal received. Shutting down gracefully.");
732
                                let task_server = self.clone();
733
                                let (shutdown_sender, shutdown_receiver) = flume::bounded(1);
734
                                tokio::task::spawn(async move {
735
                                    task_server.shutdown(Some(Duration::from_secs(30))).await?;
736
                                    let _ = shutdown_sender.send(());
737
                                    Result::<(), Error>::Ok(())
738
                                });
739
                                *state = SignalShutdownState::ShuttingDown(shutdown_receiver);
740
                            }
741
                            SignalShutdownState::ShuttingDown(_) => {
742
                                // Two interrupts, go ahead and force the shutdown
743
                                break;
744
                            }
745
                        }
746
                    }
747
                    TERMINATE => {
748
                        log::error!("Quit signal received. Shutting down.");
749
                        break;
750
                    }
751
                    _ => unreachable!(),
752
                }
753

            
754
4
                let state = fast_async_lock!(shutdown_state);
755
4
                if let SignalShutdownState::ShuttingDown(receiver) = &*state {
756
                    if receiver.try_recv().is_ok() {
757
                        // Fully shut down.
758
                        return Ok(());
759
                    }
760
4
                }
761

            
762
4
                tokio::time::sleep(Duration::from_millis(300)).await;
763
            }
764
            self.shutdown(None).await?;
765
        }
766

            
767
        Ok(())
768
    }
769

            
770
    /// Manually authenticates `client` as `user`. `user` can be the user's id
771
    /// ([`u64`]) or the username ([`String`]/[`str`]). Returns the permissions
772
    /// that the user now has.
773
    pub async fn authenticate_client_as<'name, N: Nameable<'name, u64> + Send + Sync>(
774
        &self,
775
        user: N,
776
        client: &ConnectedClient<B>,
777
    ) -> Result<Permissions, Error> {
778
        let admin = self.data.storage.admin().await;
779
        let user = User::load(user, &admin)
780
            .await?
781
            .ok_or(bonsaidb_core::Error::UserNotFound)?;
782

            
783
        let permissions = user.contents.effective_permissions(&admin).await?;
784
        let permissions = Permissions::merged(
785
            [
786
                &permissions,
787
                &self.data.authenticated_permissions,
788
                &self.data.default_permissions,
789
            ]
790
            .into_iter(),
791
        );
792
        client
793
            .logged_in_as(user.header.id, permissions.clone())
794
            .await;
795
        Ok(permissions)
796
    }
797

            
798
24
    async fn publish_message(&self, database: &str, topic: &str, payload: Vec<u8>) {
799
24
        self.data
800
24
            .relay
801
24
            .publish_message(Message {
802
24
                topic: database_topic(database, topic),
803
24
                payload,
804
24
            })
805
            .await;
806
24
    }
807

            
808
3
    async fn publish_serialized_to_all(&self, database: &str, topics: &[String], payload: Vec<u8>) {
809
3
        self.data
810
3
            .relay
811
3
            .publish_serialized_to_all(
812
3
                topics
813
3
                    .iter()
814
9
                    .map(|topic| database_topic(database, topic))
815
3
                    .collect(),
816
3
                payload,
817
3
            )
818
            .await;
819
3
    }
820

            
821
21
    async fn create_subscriber(&self, database: String) -> ServerSubscriber<B> {
822
21
        let subscriber = self.data.relay.create_subscriber().await;
823

            
824
21
        let mut subscribers = fast_async_write!(self.data.subscribers);
825
21
        let subscriber_id = subscriber.id();
826
21
        let receiver = subscriber.receiver().clone();
827
21
        subscribers.insert(subscriber_id, subscriber);
828
21

            
829
21
        ServerSubscriber {
830
21
            server: self.clone(),
831
21
            database,
832
21
            receiver,
833
21
            id: subscriber_id,
834
21
        }
835
21
    }
836

            
837
36
    async fn subscribe_to<S: Into<String> + Send>(
838
36
        &self,
839
36
        subscriber_id: u64,
840
36
        database: &str,
841
36
        topic: S,
842
36
    ) -> Result<(), bonsaidb_core::Error> {
843
36
        let subscribers = fast_async_read!(self.data.subscribers);
844
36
        if let Some(subscriber) = subscribers.get(&subscriber_id) {
845
36
            subscriber
846
36
                .subscribe_to(database_topic(database, &topic.into()))
847
                .await;
848
36
            Ok(())
849
        } else {
850
            Err(bonsaidb_core::Error::Server(String::from(
851
                "invalid subscriber id",
852
            )))
853
        }
854
36
    }
855

            
856
3
    async fn unsubscribe_from(
857
3
        &self,
858
3
        subscriber_id: u64,
859
3
        database: &str,
860
3
        topic: &str,
861
3
    ) -> Result<(), bonsaidb_core::Error> {
862
3
        let subscribers = fast_async_read!(self.data.subscribers);
863
3
        if let Some(subscriber) = subscribers.get(&subscriber_id) {
864
3
            subscriber
865
3
                .unsubscribe_from(&database_topic(database, topic))
866
                .await;
867
3
            Ok(())
868
        } else {
869
            Err(bonsaidb_core::Error::Server(String::from(
870
                "invalid subscriber id",
871
            )))
872
        }
873
3
    }
874
}
875

            
876
impl<B: Backend> Deref for CustomServer<B> {
877
    type Target = Storage;
878

            
879
51559
    fn deref(&self) -> &Self::Target {
880
51559
        &self.data.storage
881
51559
    }
882
}
883

            
884
#[derive(Debug)]
885
struct ClientRequest<B: Backend> {
886
    request: Option<Request<<B::CustomApi as CustomApi>::Request>>,
887
    client: ConnectedClient<B>,
888
    server: CustomServer<B>,
889
    subscribers: Arc<RwLock<HashMap<u64, Subscriber>>>,
890
    sender: flume::Sender<Payload<Response<CustomApiResult<B::CustomApi>>>>,
891
    result_sender: oneshot::Sender<Response<CustomApiResult<B::CustomApi>>>,
892
}
893

            
894
impl<B: Backend> ClientRequest<B> {
895
52528
    pub fn new(
896
52528
        request: Request<<B::CustomApi as CustomApi>::Request>,
897
52528
        server: CustomServer<B>,
898
52528
        client: ConnectedClient<B>,
899
52528
        subscribers: Arc<RwLock<HashMap<u64, Subscriber>>>,
900
52528
        sender: flume::Sender<Payload<Response<CustomApiResult<B::CustomApi>>>>,
901
52528
        result_sender: oneshot::Sender<Response<CustomApiResult<B::CustomApi>>>,
902
52528
    ) -> Self {
903
52528
        Self {
904
52528
            request: Some(request),
905
52528
            server,
906
52528
            client,
907
52528
            subscribers,
908
52528
            sender,
909
52528
            result_sender,
910
52528
        }
911
52528
    }
912
}
913

            
914
#[async_trait]
915
impl<B: Backend> StorageConnection for CustomServer<B> {
916
    type Database = ServerDatabase<B>;
917

            
918
651
    async fn create_database_with_schema(
919
651
        &self,
920
651
        name: &str,
921
651
        schema: SchemaName,
922
651
        only_if_needed: bool,
923
651
    ) -> Result<(), bonsaidb_core::Error> {
924
651
        self.data
925
651
            .storage
926
1705
            .create_database_with_schema(name, schema, only_if_needed)
927
1703
            .await
928
1302
    }
929

            
930
68
    async fn database<DB: Schema>(
931
68
        &self,
932
68
        name: &str,
933
68
    ) -> Result<Self::Database, bonsaidb_core::Error> {
934
68
        let db = self.data.storage.database::<DB>(name).await?;
935
68
        Ok(ServerDatabase {
936
68
            server: self.clone(),
937
68
            db,
938
68
        })
939
136
    }
940

            
941
506
    async fn delete_database(&self, name: &str) -> Result<(), bonsaidb_core::Error> {
942
1780
        self.data.storage.delete_database(name).await
943
1012
    }
944

            
945
3
    async fn list_databases(&self) -> Result<Vec<connection::Database>, bonsaidb_core::Error> {
946
3
        self.data.storage.list_databases().await
947
6
    }
948

            
949
3
    async fn list_available_schemas(&self) -> Result<Vec<SchemaName>, bonsaidb_core::Error> {
950
3
        self.data.storage.list_available_schemas().await
951
6
    }
952

            
953
7
    async fn create_user(&self, username: &str) -> Result<u64, bonsaidb_core::Error> {
954
15
        self.data.storage.create_user(username).await
955
14
    }
956

            
957
2
    async fn delete_user<'user, U: Nameable<'user, u64> + Send + Sync>(
958
2
        &self,
959
2
        user: U,
960
2
    ) -> Result<(), bonsaidb_core::Error> {
961
4
        self.data.storage.delete_user(user).await
962
4
    }
963

            
964
    #[cfg(feature = "password-hashing")]
965
3
    async fn set_user_password<'user, U: Nameable<'user, u64> + Send + Sync>(
966
3
        &self,
967
3
        user: U,
968
3
        password: bonsaidb_core::connection::SensitiveString,
969
3
    ) -> Result<(), bonsaidb_core::Error> {
970
12
        self.data.storage.set_user_password(user, password).await
971
6
    }
972

            
973
    #[cfg(feature = "password-hashing")]
974
5
    async fn authenticate<'user, U: Nameable<'user, u64> + Send + Sync>(
975
5
        &self,
976
5
        user: U,
977
5
        authentication: Authentication,
978
5
    ) -> Result<Authenticated, bonsaidb_core::Error> {
979
13
        self.data.storage.authenticate(user, authentication).await
980
10
    }
981

            
982
6
    async fn add_permission_group_to_user<
983
6
        'user,
984
6
        'group,
985
6
        U: Nameable<'user, u64> + Send + Sync,
986
6
        G: Nameable<'group, u64> + Send + Sync,
987
6
    >(
988
6
        &self,
989
6
        user: U,
990
6
        permission_group: G,
991
6
    ) -> Result<(), bonsaidb_core::Error> {
992
6
        self.data
993
6
            .storage
994
8
            .add_permission_group_to_user(user, permission_group)
995
8
            .await
996
12
    }
997

            
998
4
    async fn remove_permission_group_from_user<
999
4
        'user,
4
        'group,
4
        U: Nameable<'user, u64> + Send + Sync,
4
        G: Nameable<'group, u64> + Send + Sync,
4
    >(
4
        &self,
4
        user: U,
4
        permission_group: G,
4
    ) -> Result<(), bonsaidb_core::Error> {
4
        self.data
4
            .storage
5
            .remove_permission_group_from_user(user, permission_group)
5
            .await
8
    }

            
4
    async fn add_role_to_user<
4
        'user,
4
        'group,
4
        U: Nameable<'user, u64> + Send + Sync,
4
        G: Nameable<'group, u64> + Send + Sync,
4
    >(
4
        &self,
4
        user: U,
4
        role: G,
4
    ) -> Result<(), bonsaidb_core::Error> {
4
        self.data.storage.add_role_to_user(user, role).await
8
    }

            
4
    async fn remove_role_from_user<
4
        'user,
4
        'group,
4
        U: Nameable<'user, u64> + Send + Sync,
4
        G: Nameable<'group, u64> + Send + Sync,
4
    >(
4
        &self,
4
        user: U,
4
        role: G,
4
    ) -> Result<(), bonsaidb_core::Error> {
5
        self.data.storage.remove_role_from_user(user, role).await
8
    }
}

            
105056
#[derive(Dispatcher, Debug)]
#[dispatcher(input = Request<<B::CustomApi as CustomApi>::Request>, input = ServerRequest, actionable = bonsaidb_core::actionable)]
struct ServerDispatcher<'s, B>
where
    B: Backend,
{
    server: &'s CustomServer<B>,
    client: &'s ConnectedClient<B>,
    subscribers: &'s Arc<RwLock<HashMap<u64, Subscriber>>>,
    response_sender: &'s flume::Sender<Payload<Response<CustomApiResult<B::CustomApi>>>>,
}

            
#[async_trait]
impl<'s, B: Backend> RequestDispatcher for ServerDispatcher<'s, B> {
    type Subaction = <B::CustomApi as CustomApi>::Request;
    type Output = Response<CustomApiResult<B::CustomApi>>;
    type Error = Error;

            
14
    async fn handle_subaction(
14
        &self,
14
        permissions: &Permissions,
14
        subaction: Self::Subaction,
14
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
14
        let dispatcher =
14
            <B::CustomApiDispatcher as CustomApiDispatcher<B>>::new(self.server, self.client);
14
        match dispatcher.dispatch(permissions, subaction).await {
10
            Ok(response) => Ok(Response::Api(Ok(response))),
4
            Err(err) => match err {
                BackendError::Backend(backend) => Ok(Response::Api(Err(backend))),
4
                BackendError::Server(server) => Err(server),
            },
        }
28
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::ServerHandler for ServerDispatcher<'s, B> {
1105
    async fn handle(
1105
        &self,
1105
        permissions: &Permissions,
1105
        request: ServerRequest,
1105
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
3386
        ServerRequestDispatcher::dispatch_to_handlers(self, permissions, request).await
2210
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::DatabaseHandler for ServerDispatcher<'s, B> {
51409
    async fn handle(
51409
        &self,
51409
        permissions: &Permissions,
51409
        database_name: String,
51409
        request: DatabaseRequest,
51409
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
51409
        let database = self
51409
            .server
577964
            .database_without_schema_internal(&database_name)
544136
            .await?;
51409
        DatabaseDispatcher {
51409
            name: database_name,
51409
            database,
51409
            server_dispatcher: self,
51409
        }
51410
        .dispatch(permissions, request)
32169
        .await
102818
    }
}

            
impl<'s, B: Backend> ServerRequestDispatcher for ServerDispatcher<'s, B> {
    type Output = Response<CustomApiResult<B::CustomApi>>;
    type Error = Error;
}

            
#[async_trait]
impl<'s, B: Backend> CreateDatabaseHandler for ServerDispatcher<'s, B> {
    type Action = BonsaiAction;

            
570
    async fn resource_name<'a>(
570
        &'a self,
570
        database: &'a bonsaidb_core::connection::Database,
570
        _only_if_needed: &'a bool,
570
    ) -> Result<ResourceName<'a>, Error> {
570
        Ok(database_resource_name(&database.name))
570
    }

            
570
    fn action() -> Self::Action {
570
        BonsaiAction::Server(ServerAction::CreateDatabase)
570
    }

            
570
    async fn handle_protected(
570
        &self,
570
        _permissions: &Permissions,
570
        database: bonsaidb_core::connection::Database,
570
        only_if_needed: bool,
570
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
570
        self.server
1562
            .create_database_with_schema(&database.name, database.schema, only_if_needed)
1562
            .await?;
564
        Ok(Response::Server(ServerResponse::DatabaseCreated {
564
            name: database.name.clone(),
564
        }))
1140
    }
}

            
#[async_trait]
impl<'s, B: Backend> DeleteDatabaseHandler for ServerDispatcher<'s, B> {
    type Action = BonsaiAction;

            
504
    async fn resource_name<'a>(&'a self, database: &'a String) -> Result<ResourceName<'a>, Error> {
504
        Ok(database_resource_name(database))
504
    }

            
504
    fn action() -> Self::Action {
504
        BonsaiAction::Server(ServerAction::DeleteDatabase)
504
    }

            
504
    async fn handle_protected(
504
        &self,
504
        _permissions: &Permissions,
504
        name: String,
504
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
1777
        self.server.delete_database(&name).await?;
502
        Ok(Response::Server(ServerResponse::DatabaseDeleted { name }))
1008
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::ListDatabasesHandler for ServerDispatcher<'s, B> {
    type Action = BonsaiAction;

            
2
    async fn resource_name<'a>(&'a self) -> Result<ResourceName<'a>, Error> {
2
        Ok(bonsaidb_resource_name())
2
    }

            
2
    fn action() -> Self::Action {
2
        BonsaiAction::Server(ServerAction::ListDatabases)
2
    }

            
2
    async fn handle_protected(
2
        &self,
2
        _permissions: &Permissions,
2
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
        Ok(Response::Server(ServerResponse::Databases(
2
            self.server.list_databases().await?,
        )))
4
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::ListAvailableSchemasHandler
    for ServerDispatcher<'s, B>
{
    type Action = BonsaiAction;

            
2
    async fn resource_name<'a>(&'a self) -> Result<ResourceName<'a>, Error> {
2
        Ok(bonsaidb_resource_name())
2
    }

            
2
    fn action() -> Self::Action {
2
        BonsaiAction::Server(ServerAction::ListAvailableSchemas)
2
    }

            
2
    async fn handle_protected(
2
        &self,
2
        _permissions: &Permissions,
2
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
        Ok(Response::Server(ServerResponse::AvailableSchemas(
2
            self.server.list_available_schemas().await?,
        )))
4
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::CreateUserHandler for ServerDispatcher<'s, B> {
    type Action = BonsaiAction;

            
4
    async fn resource_name<'a>(&'a self, _username: &'a String) -> Result<ResourceName<'a>, Error> {
4
        Ok(bonsaidb_resource_name())
4
    }

            
4
    fn action() -> Self::Action {
4
        BonsaiAction::Server(ServerAction::CreateUser)
4
    }

            
3
    async fn handle_protected(
3
        &self,
3
        _permissions: &Permissions,
3
        username: String,
3
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
        Ok(Response::Server(ServerResponse::UserCreated {
5
            id: self.server.create_user(&username).await?,
        }))
6
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::DeleteUserHandler for ServerDispatcher<'s, B> {
    type Action = BonsaiAction;

            
2
    async fn resource_name<'a>(
2
        &'a self,
2
        _primary_key: &'a NamedReference<'static, u64>,
2
    ) -> Result<ResourceName<'a>, Error> {
2
        Ok(bonsaidb_resource_name())
2
    }

            
2
    fn action() -> Self::Action {
2
        BonsaiAction::Server(ServerAction::DeleteUser)
2
    }

            
2
    async fn handle_protected(
2
        &self,
2
        _permissions: &Permissions,
2
        user: NamedReference<'static, u64>,
2
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
4
        self.server.delete_user(user).await?;
2
        Ok(Response::Ok)
4
    }
}

            
#[cfg(feature = "password-hashing")]
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::SetUserPasswordHandler for ServerDispatcher<'s, B> {
    type Action = BonsaiAction;

            
    async fn resource_name<'a>(
        &'a self,
        user: &'a NamedReference<'static, u64>,
        _password: &'a bonsaidb_core::connection::SensitiveString,
    ) -> Result<ResourceName<'a>, Error> {
        let id = user
            .id::<User, _>(&self.server.admin().await)
            .await?
            .ok_or(bonsaidb_core::Error::UserNotFound)?;

            
        Ok(user_resource_name(id))
    }

            
    fn action() -> Self::Action {
        BonsaiAction::Server(ServerAction::SetPassword)
    }

            
    async fn handle_protected(
        &self,
        _permissions: &Permissions,
        username: NamedReference<'static, u64>,
        password: bonsaidb_core::connection::SensitiveString,
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
        self.server.set_user_password(username, password).await?;
        Ok(Response::Ok)
    }
}

            
#[async_trait]
#[cfg(feature = "password-hashing")]
impl<'s, B: Backend> bonsaidb_core::networking::AuthenticateHandler for ServerDispatcher<'s, B> {
5
    async fn verify_permissions(
5
        &self,
5
        permissions: &Permissions,
5
        user: &NamedReference<'static, u64>,
5
        authentication: &Authentication,
5
    ) -> Result<(), Error> {
5
        let id = user
5
            .id::<User, _>(&self.server.admin().await)
4
            .await?
5
            .ok_or(bonsaidb_core::Error::UserNotFound)?;
5
        match authentication {
5
            Authentication::Password(_) => {
5
                permissions.check(
5
                    user_resource_name(id),
5
                    &BonsaiAction::Server(ServerAction::Authenticate(
5
                        AuthenticationMethod::PasswordHash,
5
                    )),
5
                )?;
5
                Ok(())
            }
        }
10
    }

            
5
    async fn handle_protected(
5
        &self,
5
        _permissions: &Permissions,
5
        username: NamedReference<'static, u64>,
5
        authentication: Authentication,
5
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
5
        let mut response = self
5
            .server
13
            .authenticate(username.clone(), authentication)
13
            .await?;

            
        // TODO this should be handled by the storage layer
5
        response.permissions = Permissions::merged([
5
            &response.permissions,
5
            &self.server.data.authenticated_permissions,
5
            &self.server.data.default_permissions,
5
        ]);
5

            
5
        self.client
5
            .logged_in_as(response.user_id, response.permissions.clone())
            .await;
5
        Ok(Response::Server(ServerResponse::Authenticated(response)))
10
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::AlterUserPermissionGroupMembershipHandler
    for ServerDispatcher<'s, B>
{
    type Action = BonsaiAction;

            
8
    async fn resource_name<'a>(
8
        &'a self,
8
        user: &'a NamedReference<'static, u64>,
8
        _group: &'a NamedReference<'static, u64>,
8
        _should_be_member: &'a bool,
8
    ) -> Result<ResourceName<'a>, Error> {
8
        let id = user
8
            .id::<User, _>(&self.server.admin().await)
2
            .await?
8
            .ok_or(bonsaidb_core::Error::UserNotFound)?;

            
8
        Ok(user_resource_name(id))
16
    }

            
8
    fn action() -> Self::Action {
8
        BonsaiAction::Server(ServerAction::ModifyUserPermissionGroups)
8
    }

            
8
    async fn handle_protected(
8
        &self,
8
        _permissions: &Permissions,
8
        user: NamedReference<'static, u64>,
8
        group: NamedReference<'static, u64>,
8
        should_be_member: bool,
8
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
8
        if should_be_member {
4
            self.server
5
                .add_permission_group_to_user(user, group)
5
                .await?;
        } else {
4
            self.server
5
                .remove_permission_group_from_user(user, group)
5
                .await?;
        }

            
8
        Ok(Response::Ok)
16
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::AlterUserRoleMembershipHandler
    for ServerDispatcher<'s, B>
{
    type Action = BonsaiAction;

            
8
    async fn resource_name<'a>(
8
        &'a self,
8
        user: &'a NamedReference<'static, u64>,
8
        _role: &'a NamedReference<'static, u64>,
8
        _should_be_member: &'a bool,
8
    ) -> Result<ResourceName<'a>, Error> {
8
        let id = user
8
            .id::<User, _>(&self.server.admin().await)
            .await?
8
            .ok_or(bonsaidb_core::Error::UserNotFound)?;

            
8
        Ok(user_resource_name(id))
16
    }

            
8
    fn action() -> Self::Action {
8
        BonsaiAction::Server(ServerAction::ModifyUserRoles)
8
    }

            
8
    async fn handle_protected(
8
        &self,
8
        _permissions: &Permissions,
8
        user: NamedReference<'static, u64>,
8
        role: NamedReference<'static, u64>,
8
        should_be_member: bool,
8
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
8
        if should_be_member {
4
            self.server.add_role_to_user(user, role).await?;
        } else {
5
            self.server.remove_role_from_user(user, role).await?;
        }

            
8
        Ok(Response::Ok)
16
    }
}

            
102818
#[derive(Dispatcher, Debug)]
#[dispatcher(input = DatabaseRequest, actionable = bonsaidb_core::actionable)]
struct DatabaseDispatcher<'s, B>
where
    B: Backend,
{
    name: String,
    database: Database,
    server_dispatcher: &'s ServerDispatcher<'s, B>,
}

            
impl<'s, B: Backend> DatabaseRequestDispatcher for DatabaseDispatcher<'s, B> {
    type Output = Response<CustomApiResult<B::CustomApi>>;
    type Error = Error;
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::GetHandler for DatabaseDispatcher<'s, B> {
    type Action = BonsaiAction;

            
8154
    async fn resource_name<'a>(
8154
        &'a self,
8154
        collection: &'a CollectionName,
8154
        id: &'a DocumentId,
8154
    ) -> Result<ResourceName<'a>, Error> {
8154
        Ok(document_resource_name(&self.name, collection, id))
8154
    }

            
8154
    fn action() -> Self::Action {
8154
        BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Get))
8154
    }

            
8154
    async fn handle_protected(
8154
        &self,
8154
        _permissions: &Permissions,
8154
        collection: CollectionName,
8154
        id: DocumentId,
8154
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
8154
        let document = self
8154
            .database
8154
            .internal_get_from_collection_id(id, &collection)
4826
            .await?
8154
            .ok_or_else(|| {
506
                Error::Core(bonsaidb_core::Error::DocumentNotFound(
506
                    collection,
506
                    Box::new(id),
506
                ))
8154
            })?;
7648
        Ok(Response::Database(DatabaseResponse::Documents(vec![
7648
            document,
7648
        ])))
16308
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::GetMultipleHandler for DatabaseDispatcher<'s, B> {
4
    async fn verify_permissions(
4
        &self,
4
        permissions: &Permissions,
4
        collection: &CollectionName,
4
        ids: &Vec<DocumentId>,
4
    ) -> Result<(), Error> {
12
        for id in ids {
8
            let document_name = document_resource_name(&self.name, collection, id);
8
            let action = BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Get));
8
            permissions.check(&document_name, &action)?;
        }

            
4
        Ok(())
8
    }

            
4
    async fn handle_protected(
4
        &self,
4
        _permissions: &Permissions,
4
        collection: CollectionName,
4
        ids: Vec<DocumentId>,
4
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
4
        let documents = self
4
            .database
4
            .internal_get_multiple_from_collection_id(&ids, &collection)
2
            .await?;
4
        Ok(Response::Database(DatabaseResponse::Documents(documents)))
8
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::ListHandler for DatabaseDispatcher<'s, B> {
    type Action = BonsaiAction;

            
8
    async fn resource_name<'a>(
8
        &'a self,
8
        collection: &'a CollectionName,
8
        _ids: &'a Range<DocumentId>,
8
        _order: &'a Sort,
8
        _limit: &'a Option<usize>,
8
    ) -> Result<ResourceName<'a>, Error> {
8
        Ok(collection_resource_name(&self.name, collection))
8
    }

            
8
    fn action() -> Self::Action {
8
        BonsaiAction::Database(DatabaseAction::Document(DocumentAction::List))
8
    }

            
8
    async fn handle_protected(
8
        &self,
8
        _permissions: &Permissions,
8
        collection: CollectionName,
8
        ids: Range<DocumentId>,
8
        order: Sort,
8
        limit: Option<usize>,
8
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
8
        let documents = self
8
            .database
8
            .list_from_collection(ids, order, limit, &collection)
4
            .await?;
8
        Ok(Response::Database(DatabaseResponse::Documents(documents)))
16
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::CountHandler for DatabaseDispatcher<'s, B> {
    type Action = BonsaiAction;

            
4
    async fn resource_name<'a>(
4
        &'a self,
4
        collection: &'a CollectionName,
4
        _ids: &'a Range<DocumentId>,
4
    ) -> Result<ResourceName<'a>, Error> {
4
        Ok(collection_resource_name(&self.name, collection))
4
    }

            
4
    fn action() -> Self::Action {
4
        BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Count))
4
    }

            
4
    async fn handle_protected(
4
        &self,
4
        _permissions: &Permissions,
4
        collection: CollectionName,
4
        ids: Range<DocumentId>,
4
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
4
        let documents = self
4
            .database
4
            .count_from_collection(ids, &collection)
2
            .await?;
4
        Ok(Response::Database(DatabaseResponse::Count(documents)))
8
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::QueryHandler for DatabaseDispatcher<'s, B> {
    type Action = BonsaiAction;

            
3929
    async fn resource_name<'a>(
3929
        &'a self,
3929
        view: &'a ViewName,
3929
        _key: &'a Option<QueryKey<Bytes>>,
3929
        _order: &'a Sort,
3929
        _limit: &'a Option<usize>,
3929
        _access_policy: &'a AccessPolicy,
3929
        _with_docs: &'a bool,
3929
    ) -> Result<ResourceName<'a>, Error> {
3929
        Ok(view_resource_name(&self.name, view))
3929
    }

            
3929
    fn action() -> Self::Action {
3929
        BonsaiAction::Database(DatabaseAction::View(ViewAction::Query))
3929
    }

            
3929
    async fn handle_protected(
3929
        &self,
3929
        _permissions: &Permissions,
3929
        view: ViewName,
3929
        key: Option<QueryKey<Bytes>>,
3929
        order: Sort,
3929
        limit: Option<usize>,
3929
        access_policy: AccessPolicy,
3929
        with_docs: bool,
3929
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
3929
        if with_docs {
3878
            let mappings = self
3878
                .database
3985
                .query_by_name_with_docs(&view, key, order, limit, access_policy)
3985
                .await?;
3878
            Ok(Response::Database(DatabaseResponse::ViewMappingsWithDocs(
3878
                mappings,
3878
            )))
        } else {
51
            let mappings = self
51
                .database
51
                .query_by_name(&view, key, order, limit, access_policy)
35
                .await?;
51
            Ok(Response::Database(DatabaseResponse::ViewMappings(mappings)))
        }
7858
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::ReduceHandler for DatabaseDispatcher<'s, B> {
    type Action = BonsaiAction;

            
7975
    async fn resource_name<'a>(
7975
        &'a self,
7975
        view: &'a ViewName,
7975
        _key: &'a Option<QueryKey<Bytes>>,
7975
        _access_policy: &'a AccessPolicy,
7975
        _grouped: &'a bool,
7975
    ) -> Result<ResourceName<'a>, Error> {
7975
        Ok(view_resource_name(&self.name, view))
7975
    }

            
7975
    fn action() -> Self::Action {
7975
        BonsaiAction::Database(DatabaseAction::View(ViewAction::Reduce))
7975
    }

            
7975
    async fn handle_protected(
7975
        &self,
7975
        _permissions: &Permissions,
7975
        view: ViewName,
7975
        key: Option<QueryKey<Bytes>>,
7975
        access_policy: AccessPolicy,
7975
        grouped: bool,
7975
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
7975
        if grouped {
6
            let values = self
6
                .database
7
                .reduce_grouped_by_name(&view, key, access_policy)
5
                .await?;
6
            Ok(Response::Database(DatabaseResponse::ViewGroupedReduction(
6
                values,
6
            )))
        } else {
7969
            let value = self
7969
                .database
7970
                .reduce_by_name(&view, key, access_policy)
159
                .await?;
7967
            Ok(Response::Database(DatabaseResponse::ViewReduction(
7967
                Bytes::from(value),
7967
            )))
        }
15950
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::ApplyTransactionHandler
    for DatabaseDispatcher<'s, B>
{
10009
    async fn verify_permissions(
10009
        &self,
10009
        permissions: &Permissions,
10009
        transaction: &Transaction,
10009
    ) -> Result<(), Error> {
24134
        for op in &transaction.operations {
14127
            let (resource, action) = match &op.command {
9923
                Command::Insert { .. } => (
9923
                    collection_resource_name(&self.name, &op.collection),
9923
                    BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Insert)),
9923
                ),
3006
                Command::Update { header, .. } => (
3006
                    document_resource_name(&self.name, &op.collection, &header.id),
3006
                    BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Update)),
3006
                ),
504
                Command::Overwrite { id, .. } => (
504
                    document_resource_name(&self.name, &op.collection, id),
504
                    BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Overwrite)),
504
                ),
694
                Command::Delete { header } => (
694
                    document_resource_name(&self.name, &op.collection, &header.id),
694
                    BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Delete)),
694
                ),
            };
14127
            permissions.check(&resource, &action)?;
        }

            
10007
        Ok(())
20018
    }

            
10007
    async fn handle_protected(
10007
        &self,
10007
        _permissions: &Permissions,
10007
        transaction: Transaction,
10007
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
10012
        let results = self.database.apply_transaction(transaction).await?;
9493
        Ok(Response::Database(DatabaseResponse::TransactionResults(
9493
            results,
9493
        )))
20014
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::DeleteDocsHandler for DatabaseDispatcher<'s, B> {
    type Action = BonsaiAction;

            
2
    async fn resource_name<'a>(
2
        &'a self,
2
        view: &'a ViewName,
2
        _key: &'a Option<QueryKey<Bytes>>,
2
        _access_policy: &'a AccessPolicy,
2
    ) -> Result<ResourceName<'a>, Error> {
2
        Ok(view_resource_name(&self.name, view))
2
    }

            
2
    fn action() -> Self::Action {
2
        BonsaiAction::Database(DatabaseAction::View(ViewAction::DeleteDocs))
2
    }

            
2
    async fn handle_protected(
2
        &self,
2
        _permissions: &Permissions,
2
        view: ViewName,
2
        key: Option<QueryKey<Bytes>>,
2
        access_policy: AccessPolicy,
2
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
2
        let count = self
2
            .database
4
            .delete_docs_by_name(&view, key, access_policy)
4
            .await?;
2
        Ok(Response::Database(DatabaseResponse::Count(count)))
4
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::ListExecutedTransactionsHandler
    for DatabaseDispatcher<'s, B>
{
    type Action = BonsaiAction;

            
1036
    async fn resource_name<'a>(
1036
        &'a self,
1036
        _starting_id: &'a Option<u64>,
1036
        _result_limit: &'a Option<usize>,
1036
    ) -> Result<ResourceName<'a>, Error> {
1036
        Ok(database_resource_name(&self.name))
1036
    }

            
1036
    fn action() -> Self::Action {
1036
        BonsaiAction::Database(DatabaseAction::Transaction(TransactionAction::ListExecuted))
1036
    }

            
1036
    async fn handle_protected(
1036
        &self,
1036
        _permissions: &Permissions,
1036
        starting_id: Option<u64>,
1036
        result_limit: Option<usize>,
1036
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
        Ok(Response::Database(DatabaseResponse::ExecutedTransactions(
1036
            self.database
1036
                .list_executed_transactions(starting_id, result_limit)
814
                .await?,
        )))
2072
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::LastTransactionIdHandler
    for DatabaseDispatcher<'s, B>
{
    type Action = BonsaiAction;

            
2
    async fn resource_name<'a>(&'a self) -> Result<ResourceName<'a>, Error> {
2
        Ok(database_resource_name(&self.name))
2
    }

            
2
    fn action() -> Self::Action {
2
        BonsaiAction::Database(DatabaseAction::Transaction(TransactionAction::GetLastId))
2
    }

            
2
    async fn handle_protected(
2
        &self,
2
        _permissions: &Permissions,
2
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
        Ok(Response::Database(DatabaseResponse::LastTransactionId(
2
            self.database.last_transaction_id().await?,
        )))
4
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::CreateSubscriberHandler
    for DatabaseDispatcher<'s, B>
{
    type Action = BonsaiAction;

            
14
    async fn resource_name<'a>(&'a self) -> Result<ResourceName<'a>, Error> {
14
        Ok(database_resource_name(&self.name))
14
    }

            
14
    fn action() -> Self::Action {
14
        BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::CreateSuscriber))
14
    }

            
14
    async fn handle_protected(
14
        &self,
14
        _permissions: &Permissions,
14
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
14
        let server = self.server_dispatcher.server;
14
        let subscriber = server.create_subscriber(self.name.clone()).await;
14
        let subscriber_id = subscriber.id;
14
        self.server_dispatcher
14
            .client
14
            .register_subscriber(subscriber_id)
            .await;

            
14
        let task_self = server.clone();
14
        let response_sender = self.server_dispatcher.response_sender.clone();
14
        tokio::spawn(async move {
14
            task_self
14
                .forward_notifications_for(
14
                    subscriber.id,
14
                    subscriber.receiver,
14
                    response_sender.clone(),
22
                )
22
                .await;
14
        });
14
        Ok(Response::Database(DatabaseResponse::SubscriberCreated {
14
            subscriber_id,
14
        }))
28
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::PublishHandler for DatabaseDispatcher<'s, B> {
    type Action = BonsaiAction;

            
16
    async fn resource_name<'a>(
16
        &'a self,
16
        topic: &'a String,
16
        _payload: &'a Bytes,
16
    ) -> Result<ResourceName<'a>, Error> {
16
        Ok(pubsub_topic_resource_name(&self.name, topic))
16
    }

            
16
    fn action() -> Self::Action {
16
        BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::Publish))
16
    }

            
16
    async fn handle_protected(
16
        &self,
16
        _permissions: &Permissions,
16
        topic: String,
16
        payload: Bytes,
16
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
16
        self.server_dispatcher
16
            .server
16
            .publish_message(&self.name, &topic, payload.into_vec())
            .await;
16
        Ok(Response::Ok)
32
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::PublishToAllHandler for DatabaseDispatcher<'s, B> {
2
    async fn verify_permissions(
2
        &self,
2
        permissions: &Permissions,
2
        topics: &Vec<String>,
2
        _payload: &Bytes,
2
    ) -> Result<(), Error> {
8
        for topic in topics {
6
            let topic_name = pubsub_topic_resource_name(&self.name, topic);
6
            let action = BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::Publish));
6
            if !permissions.allowed_to(&topic_name, &action) {
                return Err(Error::from(PermissionDenied {
                    resource: topic_name.to_owned(),
                    action: action.name(),
                }));
6
            }
        }

            
2
        Ok(())
4
    }

            
2
    async fn handle_protected(
2
        &self,
2
        _permissions: &Permissions,
2
        topics: Vec<String>,
2
        payload: Bytes,
2
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
2
        self.server_dispatcher
2
            .server
2
            .publish_serialized_to_all(&self.name, &topics, payload.into_vec())
            .await;
2
        Ok(Response::Ok)
4
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::SubscribeToHandler for DatabaseDispatcher<'s, B> {
    type Action = BonsaiAction;

            
24
    async fn resource_name<'a>(
24
        &'a self,
24
        _subscriber_id: &'a u64,
24
        topic: &'a String,
24
    ) -> Result<ResourceName<'a>, Error> {
24
        Ok(pubsub_topic_resource_name(&self.name, topic))
24
    }

            
24
    fn action() -> Self::Action {
24
        BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::SubscribeTo))
24
    }

            
24
    async fn handle_protected(
24
        &self,
24
        _permissions: &Permissions,
24
        subscriber_id: u64,
24
        topic: String,
24
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
24
        if self
24
            .server_dispatcher
24
            .client
24
            .owns_subscriber(subscriber_id)
            .await
        {
24
            self.server_dispatcher
24
                .server
24
                .subscribe_to(subscriber_id, &self.name, topic)
                .await
24
                .map(|_| Response::Ok)
24
                .map_err(Error::from)
        } else {
            Err(Error::Transport(String::from("invalid subscriber_id")))
        }
48
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::UnsubscribeFromHandler
    for DatabaseDispatcher<'s, B>
{
    type Action = BonsaiAction;

            
2
    async fn resource_name<'a>(
2
        &'a self,
2
        _subscriber_id: &'a u64,
2
        topic: &'a String,
2
    ) -> Result<ResourceName<'a>, Error> {
2
        Ok(pubsub_topic_resource_name(&self.name, topic))
2
    }

            
2
    fn action() -> Self::Action {
2
        BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::UnsubscribeFrom))
2
    }

            
2
    async fn handle_protected(
2
        &self,
2
        _permissions: &Permissions,
2
        subscriber_id: u64,
2
        topic: String,
2
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
2
        if self
2
            .server_dispatcher
2
            .client
2
            .owns_subscriber(subscriber_id)
            .await
        {
2
            self.server_dispatcher
2
                .server
2
                .unsubscribe_from(subscriber_id, &self.name, &topic)
                .await
2
                .map(|_| Response::Ok)
2
                .map_err(Error::from)
        } else {
            Err(Error::Transport(String::from("invalid subscriber_id")))
        }
4
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::UnregisterSubscriberHandler
    for DatabaseDispatcher<'s, B>
{
    async fn handle(
        &self,
        _permissions: &Permissions,
        subscriber_id: u64,
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
        if self
            .server_dispatcher
            .client
            .remove_subscriber(subscriber_id)
            .await
        {
            let mut subscribers = fast_async_write!(self.server_dispatcher.subscribers);
            if subscribers.remove(&subscriber_id).is_none() {
                Ok(Response::Error(bonsaidb_core::Error::Server(String::from(
                    "invalid subscriber id",
                ))))
            } else {
                Ok(Response::Ok)
            }
        } else {
            Err(Error::Transport(String::from("invalid subscriber_id")))
        }
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::ExecuteKeyOperationHandler
    for DatabaseDispatcher<'s, B>
{
    type Action = BonsaiAction;

            
20222
    async fn resource_name<'a>(&'a self, op: &'a KeyOperation) -> Result<ResourceName<'a>, Error> {
20221
        Ok(keyvalue_key_resource_name(
20221
            &self.name,
20221
            op.namespace.as_deref(),
20221
            &op.key,
20221
        ))
20222
    }

            
20222
    fn action() -> Self::Action {
20222
        BonsaiAction::Database(DatabaseAction::KeyValue(KeyValueAction::ExecuteOperation))
20222
    }

            
20222
    async fn handle_protected(
20222
        &self,
20222
        _permissions: &Permissions,
20222
        op: KeyOperation,
20222
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
20222
        let result = self.database.execute_key_operation(op).await?;
20214
        Ok(Response::Database(DatabaseResponse::KvOutput(result)))
40444
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::CompactCollectionHandler
    for DatabaseDispatcher<'s, B>
{
    type Action = BonsaiAction;

            
2
    async fn resource_name<'a>(
2
        &'a self,
2
        collection: &'a CollectionName,
2
    ) -> Result<ResourceName<'a>, Error> {
2
        Ok(collection_resource_name(&self.name, collection))
2
    }

            
2
    fn action() -> Self::Action {
2
        BonsaiAction::Database(DatabaseAction::Compact)
2
    }

            
2
    async fn handle_protected(
2
        &self,
2
        _permissions: &Permissions,
2
        collection: CollectionName,
2
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
2
        self.database.compact_collection_by_name(collection).await?;

            
2
        Ok(Response::Ok)
4
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::CompactKeyValueStoreHandler
    for DatabaseDispatcher<'s, B>
{
    type Action = BonsaiAction;

            
2
    async fn resource_name<'a>(&'a self) -> Result<ResourceName<'a>, Error> {
2
        Ok(kv_resource_name(&self.name))
2
    }

            
2
    fn action() -> Self::Action {
2
        BonsaiAction::Database(DatabaseAction::Compact)
2
    }

            
2
    async fn handle_protected(
2
        &self,
2
        _permissions: &Permissions,
2
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
2
        self.database.compact_key_value_store().await?;

            
2
        Ok(Response::Ok)
4
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::CompactHandler for DatabaseDispatcher<'s, B> {
    type Action = BonsaiAction;

            
2
    async fn resource_name<'a>(&'a self) -> Result<ResourceName<'a>, Error> {
2
        Ok(database_resource_name(&self.name))
2
    }

            
2
    fn action() -> Self::Action {
2
        BonsaiAction::Database(DatabaseAction::Compact)
2
    }

            
2
    async fn handle_protected(
2
        &self,
2
        _permissions: &Permissions,
2
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
2
        self.database.compact().await?;

            
2
        Ok(Response::Ok)
4
    }
}

            
80
#[derive(Default)]
struct AlpnKeys(Arc<std::sync::Mutex<HashMap<String, Arc<rustls::sign::CertifiedKey>>>>);

            
impl Debug for AlpnKeys {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_tuple("AlpnKeys").finish()
    }
}

            
impl Deref for AlpnKeys {
    type Target = Arc<std::sync::Mutex<HashMap<String, Arc<rustls::sign::CertifiedKey>>>>;

            
    fn deref(&self) -> &Self::Target {
        &self.0
    }
}