1
use std::{
2
    collections::{hash_map, HashMap},
3
    fmt::Debug,
4
    marker::PhantomData,
5
    net::SocketAddr,
6
    ops::Deref,
7
    path::PathBuf,
8
    sync::{
9
        atomic::{AtomicU32, AtomicUsize, Ordering},
10
        Arc,
11
    },
12
    time::Duration,
13
};
14

            
15
use async_lock::{Mutex, RwLock};
16
use async_trait::async_trait;
17
use bonsaidb_core::{
18
    admin::User,
19
    arc_bytes::serde::Bytes,
20
    circulate::{Message, Relay, Subscriber},
21
    connection::{self, AccessPolicy, Connection, QueryKey, Range, Sort, StorageConnection},
22
    custom_api::{CustomApi, CustomApiResult},
23
    keyvalue::{KeyOperation, KeyValue},
24
    networking::{
25
        self, CreateDatabaseHandler, DatabaseRequest, DatabaseRequestDispatcher, DatabaseResponse,
26
        DeleteDatabaseHandler, Payload, Request, RequestDispatcher, Response, ServerRequest,
27
        ServerRequestDispatcher, ServerResponse, CURRENT_PROTOCOL_VERSION,
28
    },
29
    permissions::{
30
        bonsai::{
31
            bonsaidb_resource_name, collection_resource_name, database_resource_name,
32
            document_resource_name, keyvalue_key_resource_name, kv_resource_name,
33
            pubsub_topic_resource_name, user_resource_name, view_resource_name, BonsaiAction,
34
            DatabaseAction, DocumentAction, KeyValueAction, PubSubAction, ServerAction,
35
            TransactionAction, ViewAction,
36
        },
37
        Action, Dispatcher, PermissionDenied, Permissions, ResourceName,
38
    },
39
    pubsub::database_topic,
40
    schema::{self, CollectionName, NamedCollection, NamedReference, Schema, ViewName},
41
    transaction::{Command, Transaction},
42
};
43
#[cfg(feature = "password-hashing")]
44
use bonsaidb_core::{
45
    connection::{Authenticated, Authentication},
46
    permissions::bonsai::AuthenticationMethod,
47
};
48
use bonsaidb_local::{
49
    config::Builder,
50
    jobs::{manager::Manager, Job},
51
    Database, Storage,
52
};
53
use bonsaidb_utils::{fast_async_lock, fast_async_read, fast_async_write};
54
use derive_where::derive_where;
55
use fabruic::{self, CertificateChain, Endpoint, KeyPair, PrivateKey};
56
use flume::Sender;
57
use futures::{Future, StreamExt};
58
use rustls::sign::CertifiedKey;
59
use schema::SchemaName;
60
#[cfg(not(windows))]
61
use signal_hook::consts::SIGQUIT;
62
use signal_hook::consts::{SIGINT, SIGTERM};
63
use tokio::sync::Notify;
64

            
65
#[cfg(feature = "acme")]
66
use crate::config::AcmeConfiguration;
67
use crate::{
68
    backend::{BackendError, ConnectionHandling, CustomApiDispatcher},
69
    error::Error,
70
    hosted::{Hosted, SerializablePrivateKey, TlsCertificate, TlsCertificatesByDomain},
71
    server::shutdown::{Shutdown, ShutdownState},
72
    Backend, NoBackend, ServerConfiguration,
73
};
74

            
75
#[cfg(feature = "acme")]
76
pub mod acme;
77
mod connected_client;
78
mod database;
79

            
80
mod shutdown;
81
mod tcp;
82
#[cfg(feature = "websockets")]
83
mod websockets;
84

            
85
use self::connected_client::OwnedClient;
86
pub use self::{
87
    connected_client::{ConnectedClient, LockedClientDataGuard, Transport},
88
    database::{ServerDatabase, ServerSubscriber},
89
    tcp::{ApplicationProtocols, HttpService, Peer, StandardTcpProtocols, TcpService},
90
};
91

            
92
static CONNECTED_CLIENT_ID_COUNTER: AtomicU32 = AtomicU32::new(0);
93

            
94
/// A BonsaiDb server.
95
#[derive(Debug)]
96
53042
#[derive_where(Clone)]
97
pub struct CustomServer<B: Backend = NoBackend> {
98
    data: Arc<Data<B>>,
99
}
100

            
101
/// A BonsaiDb server without a custom backend.
102
pub type Server = CustomServer<NoBackend>;
103

            
104
#[derive(Debug)]
105
struct Data<B: Backend = NoBackend> {
106
    storage: Storage,
107
    clients: RwLock<HashMap<u32, ConnectedClient<B>>>,
108
    request_processor: Manager,
109
    default_permissions: Permissions,
110
    authenticated_permissions: Permissions,
111
    endpoint: RwLock<Option<Endpoint>>,
112
    client_simultaneous_request_limit: usize,
113
    primary_tls_key: CachedCertifiedKey,
114
    primary_domain: String,
115
    #[cfg(feature = "acme")]
116
    acme: AcmeConfiguration,
117
    #[cfg(feature = "acme")]
118
    alpn_keys: AlpnKeys,
119
    shutdown: Shutdown,
120
    relay: Relay,
121
    subscribers: Arc<RwLock<HashMap<u64, Subscriber>>>,
122
    _backend: PhantomData<B>,
123
}
124

            
125
76
#[derive(Default)]
126
struct CachedCertifiedKey(parking_lot::Mutex<Option<Arc<CertifiedKey>>>);
127

            
128
impl Debug for CachedCertifiedKey {
129
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
130
        f.debug_tuple("CachedCertifiedKey").finish()
131
    }
132
}
133

            
134
impl Deref for CachedCertifiedKey {
135
    type Target = parking_lot::Mutex<Option<Arc<CertifiedKey>>>;
136

            
137
712
    fn deref(&self) -> &Self::Target {
138
712
        &self.0
139
712
    }
140
}
141

            
142
impl<B: Backend> CustomServer<B> {
143
    /// Opens a server using `directory` for storage.
144
76
    pub async fn open(configuration: ServerConfiguration) -> Result<Self, Error> {
145
76
        let request_processor = Manager::default();
146
1215
        for _ in 0..configuration.request_workers {
147
1215
            request_processor.spawn_worker();
148
1215
        }
149

            
150
1130
        let storage = Storage::open(configuration.storage.with_schema::<Hosted>()?).await?;
151

            
152
145
        storage.create_database::<Hosted>("_hosted", true).await?;
153

            
154
76
        let default_permissions = Permissions::from(configuration.default_permissions);
155
76
        let authenticated_permissions = Permissions::from(configuration.authenticated_permissions);
156
76

            
157
76
        let server = Self {
158
76
            data: Arc::new(Data {
159
76
                clients: RwLock::default(),
160
76
                storage,
161
76
                endpoint: RwLock::default(),
162
76
                request_processor,
163
76
                default_permissions,
164
76
                authenticated_permissions,
165
76
                client_simultaneous_request_limit: configuration.client_simultaneous_request_limit,
166
76
                primary_tls_key: CachedCertifiedKey::default(),
167
76
                primary_domain: configuration.server_name,
168
76
                #[cfg(feature = "acme")]
169
76
                acme: configuration.acme,
170
76
                #[cfg(feature = "acme")]
171
76
                alpn_keys: AlpnKeys::default(),
172
76
                shutdown: Shutdown::new(),
173
76
                relay: Relay::default(),
174
76
                subscribers: Arc::default(),
175
76
                _backend: PhantomData::default(),
176
76
            }),
177
76
        };
178
76
        B::initialize(&server).await;
179
76
        Ok(server)
180
76
    }
181

            
182
    /// Returns the path to the public pinned certificate, if this server has
183
    /// one. Note: this function will always succeed, but the file may not
184
    /// exist.
185
    #[must_use]
186
140
    pub fn pinned_certificate_path(&self) -> PathBuf {
187
140
        self.path().join("pinned-certificate.der")
188
140
    }
189

            
190
    /// Returns the primary domain configured for this server.
191
    #[must_use]
192
32
    pub fn primary_domain(&self) -> &str {
193
32
        &self.data.primary_domain
194
32
    }
195

            
196
    /// Returns the administration database.
197
23
    pub async fn admin(&self) -> ServerDatabase<B> {
198
23
        let db = self.data.storage.admin().await;
199
23
        ServerDatabase {
200
23
            server: self.clone(),
201
23
            db,
202
23
        }
203
23
    }
204

            
205
271
    pub(crate) async fn hosted(&self) -> ServerDatabase<B> {
206
271
        let db = self
207
271
            .data
208
271
            .storage
209
271
            .database::<Hosted>("_hosted")
210
72
            .await
211
271
            .unwrap();
212
271
        ServerDatabase {
213
271
            server: self.clone(),
214
271
            db,
215
271
        }
216
271
    }
217

            
218
    /// Installs an X.509 certificate used for general purpose connections.
219
71
    pub async fn install_self_signed_certificate(&self, overwrite: bool) -> Result<(), Error> {
220
71
        let keypair = KeyPair::new_self_signed(&self.data.primary_domain);
221
71

            
222
136
        if self.certificate_chain().await.is_ok() && !overwrite {
223
1
            return Err(Error::Core(bonsaidb_core::Error::Configuration(String::from("Certificate already installed. Enable overwrite if you wish to replace the existing certificate."))));
224
70
        }
225
70

            
226
342
        self.install_certificate(keypair.certificate_chain(), keypair.private_key())
227
342
            .await?;
228

            
229
70
        tokio::fs::write(
230
70
            self.pinned_certificate_path(),
231
70
            keypair.end_entity_certificate().as_ref(),
232
70
        )
233
59
        .await?;
234

            
235
70
        Ok(())
236
71
    }
237

            
238
    /// Installs a certificate chain and private key used for TLS connections.
239
    #[cfg(feature = "pem")]
240
    pub async fn install_pem_certificate(
241
        &self,
242
        certificate_chain: &[u8],
243
        private_key: &[u8],
244
    ) -> Result<(), Error> {
245
        let private_key = match pem::parse(private_key) {
246
            Ok(pem) => PrivateKey::unchecked_from_der(pem.contents),
247
            Err(_) => PrivateKey::from_der(private_key)?,
248
        };
249
        let certificates = pem::parse_many(&certificate_chain)?
250
            .into_iter()
251
            .map(|entry| fabruic::Certificate::unchecked_from_der(entry.contents))
252
            .collect::<Vec<_>>();
253

            
254
        self.install_certificate(
255
            &CertificateChain::unchecked_from_certificates(certificates),
256
            &private_key,
257
        )
258
        .await
259
    }
260

            
261
    /// Installs a certificate chain and private key used for TLS connections.
262
70
    pub async fn install_certificate(
263
70
        &self,
264
70
        certificate_chain: &CertificateChain,
265
70
        private_key: &PrivateKey,
266
70
    ) -> Result<(), Error> {
267
70
        let db = self.hosted().await;
268

            
269
70
        TlsCertificate::entry(&self.data.primary_domain, &db)
270
70
            .update_with(|cert: &mut TlsCertificate| {
271
1
                cert.certificate_chain = certificate_chain.clone();
272
1
                cert.private_key = SerializablePrivateKey(private_key.clone());
273
70
            })
274
70
            .or_insert_with(|| TlsCertificate {
275
69
                domains: vec![self.data.primary_domain.clone()],
276
69
                private_key: SerializablePrivateKey(private_key.clone()),
277
69
                certificate_chain: certificate_chain.clone(),
278
206
            })
279
206
            .await?;
280

            
281
135
        self.refresh_certified_key().await?;
282

            
283
70
        let pinned_certificate_path = self.pinned_certificate_path();
284
70
        if pinned_certificate_path.exists() {
285
1
            tokio::fs::remove_file(&pinned_certificate_path).await?;
286
69
        }
287

            
288
70
        Ok(())
289
70
    }
290

            
291
71
    async fn refresh_certified_key(&self) -> Result<(), Error> {
292
139
        let certificate = self.tls_certificate().await?;
293

            
294
71
        let mut cached_key = self.data.primary_tls_key.lock();
295
71
        let private_key = rustls::PrivateKey(
296
71
            fabruic::dangerous::PrivateKey::as_ref(&certificate.private_key.0).to_vec(),
297
71
        );
298
71
        let private_key = rustls::sign::any_ecdsa_type(&Arc::new(private_key))?;
299

            
300
71
        let certificates = certificate
301
71
            .certificate_chain
302
71
            .iter()
303
71
            .map(|cert| rustls::Certificate(cert.as_ref().to_vec()))
304
71
            .collect::<Vec<_>>();
305
71

            
306
71
        let certified_key = Arc::new(CertifiedKey::new(certificates, private_key));
307
71
        *cached_key = Some(certified_key);
308
71
        Ok(())
309
71
    }
310

            
311
93
    async fn tls_certificate(&self) -> Result<TlsCertificate, Error> {
312
93
        let db = self.hosted().await;
313
93
        let (_, certificate) = db
314
93
            .view::<TlsCertificatesByDomain>()
315
93
            .with_key(self.data.primary_domain.clone())
316
165
            .query_with_collection_docs()
317
165
            .await?
318
            .documents
319
93
            .into_iter()
320
93
            .next()
321
93
            .ok_or_else(|| {
322
                Error::Core(bonsaidb_core::Error::Configuration(format!(
323
                    "no certificate found for {}",
324
                    self.data.primary_domain
325
                )))
326
93
            })?;
327
93
        Ok(certificate.contents)
328
93
    }
329

            
330
    /// Returns the current certificate chain.
331
108
    pub async fn certificate_chain(&self) -> Result<CertificateChain, Error> {
332
108
        let db = self.hosted().await;
333
108
        if let Some(mapping) = db
334
108
            .view::<TlsCertificatesByDomain>()
335
108
            .with_key(self.data.primary_domain.clone())
336
109
            .query()
337
91
            .await?
338
108
            .into_iter()
339
108
            .next()
340
        {
341
36
            Ok(mapping.value)
342
        } else {
343
72
            Err(Error::Core(bonsaidb_core::Error::Configuration(format!(
344
72
                "no certificate found for {}",
345
72
                self.data.primary_domain
346
72
            ))))
347
        }
348
108
    }
349

            
350
    /// Listens for incoming client connections. Does not return until the
351
    /// server shuts down.
352
22
    pub async fn listen_on(&self, port: u16) -> Result<(), Error> {
353
28
        let certificate = self.tls_certificate().await?;
354
22
        let keypair =
355
22
            KeyPair::from_parts(certificate.certificate_chain, certificate.private_key.0)?;
356
22
        let mut builder = Endpoint::builder();
357
22
        builder.set_protocols([CURRENT_PROTOCOL_VERSION.as_bytes().to_vec()]);
358
22
        builder.set_address(([0; 8], port).into());
359
22
        builder
360
22
            .set_max_idle_timeout(None)
361
22
            .map_err(|err| Error::Core(bonsaidb_core::Error::Transport(err.to_string())))?;
362
22
        builder.set_server_key_pair(Some(keypair));
363
22
        let mut server = builder
364
22
            .build()
365
22
            .map_err(|err| Error::Core(bonsaidb_core::Error::Transport(err.to_string())))?;
366
22
        {
367
22
            let mut endpoint = fast_async_write!(self.data.endpoint);
368
22
            *endpoint = Some(server.clone());
369
        }
370

            
371
22
        let mut shutdown_watcher = self
372
22
            .data
373
22
            .shutdown
374
22
            .watcher()
375
            .await
376
22
            .expect("server already shut down");
377

            
378
83
        while let Some(result) = tokio::select! {
379
4
            shutdown_state = shutdown_watcher.wait_for_shutdown() => {
380
                drop(server.close_incoming());
381
                if matches!(shutdown_state, ShutdownState::GracefulShutdown) {
382
                    server.wait_idle().await;
383
                }
384
                drop(server.close());
385
                None
386
            },
387
61
            msg = server.next() => msg
388
61
        } {
389
61
            let connection = result.accept::<()>().await?;
390
61
            let task_self = self.clone();
391
61
            tokio::spawn(async move {
392
61
                let address = connection.remote_address();
393
61
                if let Err(err) = task_self.handle_bonsai_connection(connection).await {
394
                    log::error!("[server] closing connection {}: {:?}", address, err);
395
61
                }
396
61
            });
397
        }
398

            
399
4
        Ok(())
400
4
    }
401

            
402
    /// Returns all of the currently connected clients.
403
    pub async fn connected_clients(&self) -> Vec<ConnectedClient<B>> {
404
        let clients = fast_async_read!(self.data.clients);
405
        clients.values().cloned().collect()
406
    }
407

            
408
    /// Sends a custom API response to all connected clients.
409
    pub async fn broadcast(&self, response: CustomApiResult<B::CustomApi>) {
410
        let clients = fast_async_read!(self.data.clients);
411
        for client in clients.values() {
412
            drop(client.send(response.clone()));
413
        }
414
    }
415

            
416
116
    async fn initialize_client(
417
116
        &self,
418
116
        transport: Transport,
419
116
        address: SocketAddr,
420
116
        sender: Sender<CustomApiResult<B::CustomApi>>,
421
116
    ) -> Option<OwnedClient<B>> {
422
116
        if !self.data.default_permissions.allowed_to(
423
116
            &bonsaidb_resource_name(),
424
116
            &BonsaiAction::Server(ServerAction::Connect),
425
116
        ) {
426
            return None;
427
116
        }
428

            
429
116
        let client = loop {
430
116
            let next_id = CONNECTED_CLIENT_ID_COUNTER.fetch_add(1, Ordering::SeqCst);
431
116
            let mut clients = fast_async_write!(self.data.clients);
432
116
            if let hash_map::Entry::Vacant(e) = clients.entry(next_id) {
433
116
                let client = OwnedClient::new(next_id, address, transport, sender, self.clone());
434
116
                e.insert(client.clone());
435
116
                break client;
436
            }
437
        };
438

            
439
        if matches!(
440
116
            B::client_connected(&client, self).await,
441
            ConnectionHandling::Accept
442
        ) {
443
116
            Some(client)
444
        } else {
445
            None
446
        }
447
116
    }
448

            
449
56
    async fn disconnect_client(&self, id: u32) {
450
56
        if let Some(client) = {
451
56
            let mut clients = fast_async_write!(self.data.clients);
452
56
            clients.remove(&id)
453
        } {
454
56
            B::client_disconnected(client, self).await;
455
        }
456
56
    }
457

            
458
61
    async fn handle_bonsai_connection(
459
61
        &self,
460
61
        mut connection: fabruic::Connection<()>,
461
61
    ) -> Result<(), Error> {
462
61
        if let Some(incoming) = connection.next().await {
463
61
            let incoming = match incoming {
464
61
                Ok(incoming) => incoming,
465
                Err(err) => {
466
                    log::error!("[server] Error establishing a stream: {:?}", err);
467
                    return Ok(());
468
                }
469
            };
470

            
471
61
            match incoming
472
61
            .accept::<networking::Payload<Response<CustomApiResult<B::CustomApi>>>, networking::Payload<Request<<B::CustomApi as CustomApi>::Request>>>()
473
            .await {
474
61
                Ok((sender, receiver)) => {
475
61
                    let (api_response_sender, api_response_receiver) = flume::unbounded();
476
61
                    if let Some(disconnector) = self
477
61
                        .initialize_client(
478
61
                            Transport::Bonsai,
479
61
                            connection.remote_address(),
480
61
                            api_response_sender,
481
61
                        )
482
                        .await
483
61
                    {
484
61
                        let task_sender = sender.clone();
485
61
                        tokio::spawn(async move {
486
61
                            while let Ok(response) = api_response_receiver.recv_async().await {
487
                                if task_sender
488
                                    .send(&Payload {
489
                                        id: None,
490
                                        wrapped: Response::Api(response),
491
                                    })
492
                                    .is_err()
493
                                {
494
                                    break;
495
                                }
496
                            }
497
15
                            let _ = connection.close().await;
498
61
                        });
499
61

            
500
61
                        let task_self = self.clone();
501
61
                        tokio::spawn(async move {
502
61
                            if let Err(err) = task_self
503
18175
                                .handle_stream(disconnector, sender, receiver)
504
18175
                                .await
505
                            {
506
                                log::error!("[server] Error handling stream: {:?}", err);
507
16
                            }
508
61
                        });
509
61
                    } else {
510
                        log::error!("[server] Backend rejected connection.");
511
                        return Ok(());
512
                    }
513
                }
514
                Err(err) => {
515
                    log::error!("[server] Error accepting incoming stream: {:?}", err);
516
                    return Ok(());
517
                }
518
            }
519
        }
520
61
        Ok(())
521
61
    }
522

            
523
116
    async fn handle_client_requests(
524
116
        &self,
525
116
        client: ConnectedClient<B>,
526
116
        request_receiver: flume::Receiver<Payload<Request<<B::CustomApi as CustomApi>::Request>>>,
527
116
        response_sender: flume::Sender<Payload<Response<CustomApiResult<B::CustomApi>>>>,
528
116
    ) {
529
116
        let notify = Arc::new(Notify::new());
530
116
        let requests_in_queue = Arc::new(AtomicUsize::new(0));
531
72641
        loop {
532
72641
            let current_requests = requests_in_queue.load(Ordering::SeqCst);
533
72641
            if current_requests == self.data.client_simultaneous_request_limit {
534
                // Wait for requests to finish.
535
20361
                notify.notified().await;
536
52280
            } else if requests_in_queue
537
52280
                .compare_exchange(
538
52280
                    current_requests,
539
52280
                    current_requests + 1,
540
52280
                    Ordering::SeqCst,
541
52280
                    Ordering::SeqCst,
542
52280
                )
543
52280
                .is_ok()
544
            {
545
52279
                let payload = match request_receiver.recv_async().await {
546
52163
                    Ok(payload) => payload,
547
57
                    Err(_) => break,
548
                };
549
52163
                let id = payload.id;
550
52163
                let task_sender = response_sender.clone();
551
52163

            
552
52163
                let notify = notify.clone();
553
52163
                let requests_in_queue = requests_in_queue.clone();
554
52163
                self.handle_request_through_worker(
555
52163
                    payload.wrapped,
556
52163
                    move |response| async move {
557
52162
                        drop(task_sender.send(Payload {
558
52162
                            id,
559
52162
                            wrapped: response,
560
52162
                        }));
561
52162

            
562
52162
                        requests_in_queue.fetch_sub(1, Ordering::SeqCst);
563
52162

            
564
52162
                        notify.notify_one();
565
52162

            
566
52162
                        Ok(())
567
52163
                    },
568
52163
                    client.clone(),
569
52163
                    self.data.subscribers.clone(),
570
52163
                    response_sender.clone(),
571
52163
                )
572
643
                .await
573
52163
                .unwrap();
574
1
            }
575
        }
576
57
    }
577

            
578
52163
    async fn handle_request_through_worker<
579
52163
        F: FnOnce(Response<CustomApiResult<B::CustomApi>>) -> R + Send + 'static,
580
52163
        R: Future<Output = Result<(), Error>> + Send,
581
52163
    >(
582
52163
        &self,
583
52163
        request: Request<<B::CustomApi as CustomApi>::Request>,
584
52163
        callback: F,
585
52163
        client: ConnectedClient<B>,
586
52163
        subscribers: Arc<RwLock<HashMap<u64, Subscriber>>>,
587
52163
        response_sender: flume::Sender<Payload<Response<CustomApiResult<B::CustomApi>>>>,
588
52163
    ) -> Result<(), Error> {
589
52163
        let job = self
590
52163
            .data
591
52163
            .request_processor
592
52163
            .enqueue(ClientRequest::<B>::new(
593
52163
                request,
594
52163
                self.clone(),
595
52163
                client,
596
52163
                subscribers,
597
52163
                response_sender,
598
52163
            ))
599
643
            .await;
600
52163
        tokio::spawn(async move {
601
52163
            let result = job.receive().await?;
602
            // Map the error into a Response::Error. The jobs system supports
603
            // multiple receivers receiving output, and wraps Err to avoid
604
            // requiring the error to be cloneable. As such, we have to unwrap
605
            // it. Thankfully, we can guarantee nothing else is waiting on a
606
            // response to a request than the original requestor, so this can be
607
            // safely unwrapped.
608
52163
            let result =
609
52163
                result.unwrap_or_else(|err| Response::Error(Arc::try_unwrap(err).unwrap().into()));
610
52163
            callback(result).await?;
611
52163
            Result::<(), Error>::Ok(())
612
52163
        });
613
52163
        Ok(())
614
52163
    }
615

            
616
61
    async fn handle_stream(
617
61
        &self,
618
61
        client: OwnedClient<B>,
619
61
        sender: fabruic::Sender<Payload<Response<CustomApiResult<B::CustomApi>>>>,
620
61
        mut receiver: fabruic::Receiver<Payload<Request<<B::CustomApi as CustomApi>::Request>>>,
621
61
    ) -> Result<(), Error> {
622
61
        let (payload_sender, payload_receiver) = flume::unbounded();
623
61
        tokio::spawn(async move {
624
30164
            while let Ok(payload) = payload_receiver.recv_async().await {
625
30103
                if sender.send(&payload).is_err() {
626
                    break;
627
30103
                }
628
            }
629
61
        });
630
61

            
631
61
        let (request_sender, request_receiver) =
632
61
            flume::bounded::<Payload<Request<<B::CustomApi as CustomApi>::Request>>>(
633
61
                self.data.client_simultaneous_request_limit,
634
61
            );
635
61
        let task_self = self.clone();
636
61
        tokio::spawn(async move {
637
61
            task_self
638
24446
                .handle_client_requests(client.clone(), request_receiver, payload_sender)
639
24446
                .await;
640
61
        });
641

            
642
30150
        while let Some(payload) = receiver.next().await {
643
30089
            drop(request_sender.send_async(payload?).await);
644
        }
645

            
646
16
        Ok(())
647
16
    }
648

            
649
14
    async fn forward_notifications_for(
650
14
        &self,
651
14
        subscriber_id: u64,
652
14
        receiver: flume::Receiver<Arc<Message>>,
653
14
        sender: flume::Sender<Payload<Response<CustomApiResult<B::CustomApi>>>>,
654
14
    ) {
655
42
        while let Ok(message) = receiver.recv_async().await {
656
28
            if sender
657
28
                .send(Payload {
658
28
                    id: None,
659
28
                    wrapped: Response::Database(DatabaseResponse::MessageReceived {
660
28
                        subscriber_id,
661
28
                        topic: message.topic.clone(),
662
28
                        payload: Bytes::from(&message.payload[..]),
663
28
                    }),
664
28
                })
665
28
                .is_err()
666
            {
667
                break;
668
28
            }
669
        }
670
    }
671

            
672
    /// Shuts the server down. If a `timeout` is provided, the server will stop
673
    /// accepting new connections and attempt to respond to any outstanding
674
    /// requests already being processed. After the `timeout` has elapsed or if
675
    /// no `timeout` was provided, the server is forcefully shut down.
676
29
    pub async fn shutdown(&self, timeout: Option<Duration>) -> Result<(), Error> {
677
29
        if let Some(timeout) = timeout {
678
4
            self.data.shutdown.graceful_shutdown(timeout).await;
679
        } else {
680
25
            self.data.shutdown.shutdown().await;
681
        }
682

            
683
29
        Ok(())
684
29
    }
685

            
686
    /// Listens for signals from the operating system that the server should
687
    /// shut down and attempts to gracefully shut down.
688
1
    pub async fn listen_for_shutdown(&self) -> Result<(), Error> {
689
1
        const GRACEFUL_SHUTDOWN: usize = 1;
690
1
        const TERMINATE: usize = 2;
691
1

            
692
1
        enum SignalShutdownState {
693
1
            Running,
694
1
            ShuttingDown(flume::Receiver<()>),
695
1
        }
696
1

            
697
1
        let shutdown_state = Arc::new(Mutex::new(SignalShutdownState::Running));
698
1
        let flag = Arc::new(AtomicUsize::default());
699
1
        let register_hook = |flag: &Arc<AtomicUsize>| {
700
1
            signal_hook::flag::register_usize(SIGINT, flag.clone(), GRACEFUL_SHUTDOWN)?;
701
1
            signal_hook::flag::register_usize(SIGTERM, flag.clone(), TERMINATE)?;
702
            #[cfg(not(windows))]
703
1
            signal_hook::flag::register_usize(SIGQUIT, flag.clone(), TERMINATE)?;
704
1
            Result::<(), std::io::Error>::Ok(())
705
1
        };
706
1
        if let Err(err) = register_hook(&flag) {
707
            log::error!("Error installing signals for graceful shutdown: {:?}", err);
708
            tokio::time::sleep(Duration::MAX).await;
709
        } else {
710
            loop {
711
4
                match flag.load(Ordering::Relaxed) {
712
4
                    0 => {
713
4
                        // No signal
714
4
                    }
715
                    GRACEFUL_SHUTDOWN => {
716
                        let mut state = fast_async_lock!(shutdown_state);
717
                        match *state {
718
                            SignalShutdownState::Running => {
719
                                log::error!("Interrupt signal received. Shutting down gracefully.");
720
                                let task_server = self.clone();
721
                                let (shutdown_sender, shutdown_receiver) = flume::bounded(1);
722
                                tokio::task::spawn(async move {
723
                                    task_server.shutdown(Some(Duration::from_secs(30))).await?;
724
                                    let _ = shutdown_sender.send(());
725
                                    Result::<(), Error>::Ok(())
726
                                });
727
                                *state = SignalShutdownState::ShuttingDown(shutdown_receiver);
728
                            }
729
                            SignalShutdownState::ShuttingDown(_) => {
730
                                // Two interrupts, go ahead and force the shutdown
731
                                break;
732
                            }
733
                        }
734
                    }
735
                    TERMINATE => {
736
                        log::error!("Quit signal received. Shutting down.");
737
                        break;
738
                    }
739
                    _ => unreachable!(),
740
                }
741

            
742
4
                let state = fast_async_lock!(shutdown_state);
743
4
                if let SignalShutdownState::ShuttingDown(receiver) = &*state {
744
                    if receiver.try_recv().is_ok() {
745
                        // Fully shut down.
746
                        return Ok(());
747
                    }
748
4
                }
749

            
750
4
                tokio::time::sleep(Duration::from_millis(300)).await;
751
            }
752
            self.shutdown(None).await?;
753
        }
754

            
755
        Ok(())
756
    }
757

            
758
    /// Manually authenticates `client` as `user`. `user` can be the user's id
759
    /// ([`u64`]) or the username ([`String`]/[`str`]). Returns the permissions
760
    /// that the user now has.
761
    pub async fn authenticate_client_as<'name, N: Into<NamedReference<'name>> + Send + Sync>(
762
        &self,
763
        user: N,
764
        client: &ConnectedClient<B>,
765
    ) -> Result<Permissions, Error> {
766
        let admin = self.data.storage.admin().await;
767
        let user = User::load(user, &admin)
768
            .await?
769
            .ok_or(bonsaidb_core::Error::UserNotFound)?;
770

            
771
        let permissions = user.contents.effective_permissions(&admin).await?;
772
        let permissions = Permissions::merged(
773
            [
774
                &permissions,
775
                &self.data.authenticated_permissions,
776
                &self.data.default_permissions,
777
            ]
778
            .into_iter(),
779
        );
780
        client.logged_in_as(user.id, permissions.clone()).await;
781
        Ok(permissions)
782
    }
783

            
784
24
    async fn publish_message(&self, database: &str, topic: &str, payload: Vec<u8>) {
785
24
        self.data
786
24
            .relay
787
24
            .publish_message(Message {
788
24
                topic: database_topic(database, topic),
789
24
                payload,
790
24
            })
791
            .await;
792
24
    }
793

            
794
3
    async fn publish_serialized_to_all(&self, database: &str, topics: &[String], payload: Vec<u8>) {
795
3
        self.data
796
3
            .relay
797
3
            .publish_serialized_to_all(
798
3
                topics
799
3
                    .iter()
800
9
                    .map(|topic| database_topic(database, topic))
801
3
                    .collect(),
802
3
                payload,
803
3
            )
804
            .await;
805
3
    }
806

            
807
21
    async fn create_subscriber(&self, database: String) -> ServerSubscriber<B> {
808
21
        let subscriber = self.data.relay.create_subscriber().await;
809

            
810
21
        let mut subscribers = fast_async_write!(self.data.subscribers);
811
21
        let subscriber_id = subscriber.id();
812
21
        let receiver = subscriber.receiver().clone();
813
21
        subscribers.insert(subscriber_id, subscriber);
814
21

            
815
21
        ServerSubscriber {
816
21
            server: self.clone(),
817
21
            database,
818
21
            receiver,
819
21
            id: subscriber_id,
820
21
        }
821
21
    }
822

            
823
36
    async fn subscribe_to<S: Into<String> + Send>(
824
36
        &self,
825
36
        subscriber_id: u64,
826
36
        database: &str,
827
36
        topic: S,
828
36
    ) -> Result<(), bonsaidb_core::Error> {
829
36
        let subscribers = fast_async_read!(self.data.subscribers);
830
36
        if let Some(subscriber) = subscribers.get(&subscriber_id) {
831
36
            subscriber
832
36
                .subscribe_to(database_topic(database, &topic.into()))
833
                .await;
834
36
            Ok(())
835
        } else {
836
            Err(bonsaidb_core::Error::Server(String::from(
837
                "invalid subscriber id",
838
            )))
839
        }
840
36
    }
841

            
842
3
    async fn unsubscribe_from(
843
3
        &self,
844
3
        subscriber_id: u64,
845
3
        database: &str,
846
3
        topic: &str,
847
3
    ) -> Result<(), bonsaidb_core::Error> {
848
3
        let subscribers = fast_async_read!(self.data.subscribers);
849
3
        if let Some(subscriber) = subscribers.get(&subscriber_id) {
850
3
            subscriber
851
3
                .unsubscribe_from(&database_topic(database, topic))
852
                .await;
853
3
            Ok(())
854
        } else {
855
            Err(bonsaidb_core::Error::Server(String::from(
856
                "invalid subscriber id",
857
            )))
858
        }
859
3
    }
860
}
861

            
862
impl<B: Backend> Deref for CustomServer<B> {
863
    type Target = Storage;
864

            
865
51188
    fn deref(&self) -> &Self::Target {
866
51188
        &self.data.storage
867
51188
    }
868
}
869

            
870
#[derive(Debug)]
871
struct ClientRequest<B: Backend> {
872
    request: Option<Request<<B::CustomApi as CustomApi>::Request>>,
873
    client: ConnectedClient<B>,
874
    server: CustomServer<B>,
875
    subscribers: Arc<RwLock<HashMap<u64, Subscriber>>>,
876
    sender: flume::Sender<Payload<Response<CustomApiResult<B::CustomApi>>>>,
877
}
878

            
879
impl<B: Backend> ClientRequest<B> {
880
52163
    pub fn new(
881
52163
        request: Request<<B::CustomApi as CustomApi>::Request>,
882
52163
        server: CustomServer<B>,
883
52163
        client: ConnectedClient<B>,
884
52163
        subscribers: Arc<RwLock<HashMap<u64, Subscriber>>>,
885
52163
        sender: flume::Sender<Payload<Response<CustomApiResult<B::CustomApi>>>>,
886
52163
    ) -> Self {
887
52163
        Self {
888
52163
            request: Some(request),
889
52163
            server,
890
52163
            client,
891
52163
            subscribers,
892
52163
            sender,
893
52163
        }
894
52163
    }
895
}
896

            
897
#[async_trait]
898
impl<B: Backend> Job for ClientRequest<B> {
899
    type Output = Response<CustomApiResult<B::CustomApi>>;
900
    type Error = Error;
901

            
902
156463
    #[cfg_attr(feature = "tracing", tracing::instrument)]
903
52163
    async fn execute(&mut self) -> Result<Self::Output, Self::Error> {
904
52163
        let request = self.request.take().unwrap();
905
52163
        ServerDispatcher {
906
52163
            server: &self.server,
907
52163
            client: &self.client,
908
52163
            subscribers: &self.subscribers,
909
52163
            response_sender: &self.sender,
910
52163
        }
911
110561
        .dispatch(&self.client.permissions().await, request)
912
99537
        .await
913
104326
    }
914
}
915

            
916
#[async_trait]
917
impl<B: Backend> StorageConnection for CustomServer<B> {
918
    type Database = ServerDatabase<B>;
919

            
920
647
    async fn create_database_with_schema(
921
647
        &self,
922
647
        name: &str,
923
647
        schema: SchemaName,
924
647
        only_if_needed: bool,
925
647
    ) -> Result<(), bonsaidb_core::Error> {
926
647
        self.data
927
647
            .storage
928
2738
            .create_database_with_schema(name, schema, only_if_needed)
929
2734
            .await
930
1294
    }
931

            
932
55
    async fn database<DB: Schema>(
933
55
        &self,
934
55
        name: &str,
935
55
    ) -> Result<Self::Database, bonsaidb_core::Error> {
936
55
        let db = self.data.storage.database::<DB>(name).await?;
937
55
        Ok(ServerDatabase {
938
55
            server: self.clone(),
939
55
            db,
940
55
        })
941
110
    }
942

            
943
506
    async fn delete_database(&self, name: &str) -> Result<(), bonsaidb_core::Error> {
944
1817
        self.data.storage.delete_database(name).await
945
1012
    }
946

            
947
3
    async fn list_databases(&self) -> Result<Vec<connection::Database>, bonsaidb_core::Error> {
948
3
        self.data.storage.list_databases().await
949
6
    }
950

            
951
3
    async fn list_available_schemas(&self) -> Result<Vec<SchemaName>, bonsaidb_core::Error> {
952
3
        self.data.storage.list_available_schemas().await
953
6
    }
954

            
955
7
    async fn create_user(&self, username: &str) -> Result<u64, bonsaidb_core::Error> {
956
10
        self.data.storage.create_user(username).await
957
14
    }
958

            
959
    #[cfg(feature = "password-hashing")]
960
3
    async fn set_user_password<'user, U: Into<NamedReference<'user>> + Send + Sync>(
961
3
        &self,
962
3
        user: U,
963
3
        password: bonsaidb_core::connection::SensitiveString,
964
3
    ) -> Result<(), bonsaidb_core::Error> {
965
11
        self.data.storage.set_user_password(user, password).await
966
6
    }
967

            
968
    #[cfg(feature = "password-hashing")]
969
5
    async fn authenticate<'user, U: Into<NamedReference<'user>> + Send + Sync>(
970
5
        &self,
971
5
        user: U,
972
5
        authentication: Authentication,
973
5
    ) -> Result<Authenticated, bonsaidb_core::Error> {
974
11
        self.data.storage.authenticate(user, authentication).await
975
10
    }
976

            
977
6
    async fn add_permission_group_to_user<
978
6
        'user,
979
6
        'group,
980
6
        U: Into<NamedReference<'user>> + Send + Sync,
981
6
        G: Into<NamedReference<'group>> + Send + Sync,
982
6
    >(
983
6
        &self,
984
6
        user: U,
985
6
        permission_group: G,
986
6
    ) -> Result<(), bonsaidb_core::Error> {
987
6
        self.data
988
6
            .storage
989
6
            .add_permission_group_to_user(user, permission_group)
990
4
            .await
991
12
    }
992

            
993
4
    async fn remove_permission_group_from_user<
994
4
        'user,
995
4
        'group,
996
4
        U: Into<NamedReference<'user>> + Send + Sync,
997
4
        G: Into<NamedReference<'group>> + Send + Sync,
998
4
    >(
999
4
        &self,
4
        user: U,
4
        permission_group: G,
4
    ) -> Result<(), bonsaidb_core::Error> {
4
        self.data
4
            .storage
4
            .remove_permission_group_from_user(user, permission_group)
3
            .await
8
    }

            
4
    async fn add_role_to_user<
4
        'user,
4
        'group,
4
        U: Into<NamedReference<'user>> + Send + Sync,
4
        G: Into<NamedReference<'group>> + Send + Sync,
4
    >(
4
        &self,
4
        user: U,
4
        role: G,
4
    ) -> Result<(), bonsaidb_core::Error> {
4
        self.data.storage.add_role_to_user(user, role).await
8
    }

            
4
    async fn remove_role_from_user<
4
        'user,
4
        'group,
4
        U: Into<NamedReference<'user>> + Send + Sync,
4
        G: Into<NamedReference<'group>> + Send + Sync,
4
    >(
4
        &self,
4
        user: U,
4
        role: G,
4
    ) -> Result<(), bonsaidb_core::Error> {
4
        self.data.storage.remove_role_from_user(user, role).await
8
    }
}

            
104326
#[derive(Dispatcher, Debug)]
#[dispatcher(input = Request<<B::CustomApi as CustomApi>::Request>, input = ServerRequest, actionable = bonsaidb_core::actionable)]
struct ServerDispatcher<'s, B>
where
    B: Backend,
{
    server: &'s CustomServer<B>,
    client: &'s ConnectedClient<B>,
    subscribers: &'s Arc<RwLock<HashMap<u64, Subscriber>>>,
    response_sender: &'s flume::Sender<Payload<Response<CustomApiResult<B::CustomApi>>>>,
}

            
#[async_trait]
impl<'s, B: Backend> RequestDispatcher for ServerDispatcher<'s, B> {
    type Subaction = <B::CustomApi as CustomApi>::Request;
    type Output = Response<CustomApiResult<B::CustomApi>>;
    type Error = Error;

            
14
    async fn handle_subaction(
14
        &self,
14
        permissions: &Permissions,
14
        subaction: Self::Subaction,
14
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
14
        let dispatcher =
14
            <B::CustomApiDispatcher as CustomApiDispatcher<B>>::new(self.server, self.client);
14
        match dispatcher.dispatch(permissions, subaction).await {
10
            Ok(response) => Ok(Response::Api(Ok(response))),
4
            Err(err) => match err {
                BackendError::Backend(backend) => Ok(Response::Api(Err(backend))),
4
                BackendError::Server(server) => Err(server),
            },
        }
28
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::ServerHandler for ServerDispatcher<'s, B> {
1103
    async fn handle(
1103
        &self,
1103
        permissions: &Permissions,
1103
        request: ServerRequest,
1103
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
4451
        ServerRequestDispatcher::dispatch_to_handlers(self, permissions, request).await
2206
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::DatabaseHandler for ServerDispatcher<'s, B> {
51046
    async fn handle(
51046
        &self,
51046
        permissions: &Permissions,
51046
        database_name: String,
51046
        request: DatabaseRequest,
51046
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
51046
        let database = self
51046
            .server
100055
            .database_without_schema_internal(&database_name)
64557
            .await?;
51046
        DatabaseDispatcher {
51046
            name: database_name,
51046
            database,
51046
            server_dispatcher: self,
51046
        }
51047
        .dispatch(permissions, request)
30529
        .await
102092
    }
}

            
impl<'s, B: Backend> ServerRequestDispatcher for ServerDispatcher<'s, B> {
    type Output = Response<CustomApiResult<B::CustomApi>>;
    type Error = Error;
}

            
#[async_trait]
impl<'s, B: Backend> CreateDatabaseHandler for ServerDispatcher<'s, B> {
    type Action = BonsaiAction;

            
570
    async fn resource_name<'a>(
570
        &'a self,
570
        database: &'a bonsaidb_core::connection::Database,
570
        _only_if_needed: &'a bool,
570
    ) -> Result<ResourceName<'a>, Error> {
570
        Ok(database_resource_name(&database.name))
570
    }

            
570
    fn action() -> Self::Action {
570
        BonsaiAction::Server(ServerAction::CreateDatabase)
570
    }

            
570
    async fn handle_protected(
570
        &self,
570
        _permissions: &Permissions,
570
        database: bonsaidb_core::connection::Database,
570
        only_if_needed: bool,
570
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
570
        self.server
2605
            .create_database_with_schema(&database.name, database.schema, only_if_needed)
2605
            .await?;
564
        Ok(Response::Server(ServerResponse::DatabaseCreated {
564
            name: database.name.clone(),
564
        }))
1140
    }
}

            
#[async_trait]
impl<'s, B: Backend> DeleteDatabaseHandler for ServerDispatcher<'s, B> {
    type Action = BonsaiAction;

            
504
    async fn resource_name<'a>(&'a self, database: &'a String) -> Result<ResourceName<'a>, Error> {
504
        Ok(database_resource_name(database))
504
    }

            
504
    fn action() -> Self::Action {
504
        BonsaiAction::Server(ServerAction::DeleteDatabase)
504
    }

            
504
    async fn handle_protected(
504
        &self,
504
        _permissions: &Permissions,
504
        name: String,
504
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
1814
        self.server.delete_database(&name).await?;
502
        Ok(Response::Server(ServerResponse::DatabaseDeleted { name }))
1008
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::ListDatabasesHandler for ServerDispatcher<'s, B> {
    type Action = BonsaiAction;

            
2
    async fn resource_name<'a>(&'a self) -> Result<ResourceName<'a>, Error> {
2
        Ok(bonsaidb_resource_name())
2
    }

            
2
    fn action() -> Self::Action {
2
        BonsaiAction::Server(ServerAction::ListDatabases)
2
    }

            
2
    async fn handle_protected(
2
        &self,
2
        _permissions: &Permissions,
2
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
        Ok(Response::Server(ServerResponse::Databases(
2
            self.server.list_databases().await?,
        )))
4
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::ListAvailableSchemasHandler
    for ServerDispatcher<'s, B>
{
    type Action = BonsaiAction;

            
2
    async fn resource_name<'a>(&'a self) -> Result<ResourceName<'a>, Error> {
2
        Ok(bonsaidb_resource_name())
2
    }

            
2
    fn action() -> Self::Action {
2
        BonsaiAction::Server(ServerAction::ListAvailableSchemas)
2
    }

            
2
    async fn handle_protected(
2
        &self,
2
        _permissions: &Permissions,
2
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
        Ok(Response::Server(ServerResponse::AvailableSchemas(
2
            self.server.list_available_schemas().await?,
        )))
4
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::CreateUserHandler for ServerDispatcher<'s, B> {
    type Action = BonsaiAction;

            
4
    async fn resource_name<'a>(&'a self, _username: &'a String) -> Result<ResourceName<'a>, Error> {
4
        Ok(bonsaidb_resource_name())
4
    }

            
4
    fn action() -> Self::Action {
4
        BonsaiAction::Server(ServerAction::CreateUser)
4
    }

            
3
    async fn handle_protected(
3
        &self,
3
        _permissions: &Permissions,
3
        username: String,
3
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
        Ok(Response::Server(ServerResponse::UserCreated {
3
            id: self.server.create_user(&username).await?,
        }))
6
    }
}

            
#[cfg(feature = "password-hashing")]
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::SetUserPasswordHandler for ServerDispatcher<'s, B> {
    type Action = BonsaiAction;

            
    async fn resource_name<'a>(
        &'a self,
        user: &'a NamedReference<'static>,
        _password: &'a bonsaidb_core::connection::SensitiveString,
    ) -> Result<ResourceName<'a>, Error> {
        let id = user
            .id::<User, _>(&self.server.admin().await)
            .await?
            .ok_or(bonsaidb_core::Error::UserNotFound)?;

            
        Ok(user_resource_name(id))
    }

            
    fn action() -> Self::Action {
        BonsaiAction::Server(ServerAction::SetPassword)
    }

            
    async fn handle_protected(
        &self,
        _permissions: &Permissions,
        username: NamedReference<'static>,
        password: bonsaidb_core::connection::SensitiveString,
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
        self.server.set_user_password(username, password).await?;
        Ok(Response::Ok)
    }
}

            
#[async_trait]
#[cfg(feature = "password-hashing")]
impl<'s, B: Backend> bonsaidb_core::networking::AuthenticateHandler for ServerDispatcher<'s, B> {
5
    async fn verify_permissions(
5
        &self,
5
        permissions: &Permissions,
5
        user: &NamedReference<'static>,
5
        authentication: &Authentication,
5
    ) -> Result<(), Error> {
5
        let id = user
5
            .id::<User, _>(&self.server.admin().await)
4
            .await?
5
            .ok_or(bonsaidb_core::Error::UserNotFound)?;
5
        match authentication {
5
            Authentication::Password(_) => {
5
                permissions.check(
5
                    user_resource_name(id),
5
                    &BonsaiAction::Server(ServerAction::Authenticate(
5
                        AuthenticationMethod::PasswordHash,
5
                    )),
5
                )?;
5
                Ok(())
            }
        }
10
    }

            
5
    async fn handle_protected(
5
        &self,
5
        _permissions: &Permissions,
5
        username: NamedReference<'static>,
5
        authentication: Authentication,
5
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
5
        let mut response = self
5
            .server
11
            .authenticate(username.clone(), authentication)
11
            .await?;

            
        // TODO this should be handled by the storage layer
5
        response.permissions = Permissions::merged([
5
            &response.permissions,
5
            &self.server.data.authenticated_permissions,
5
            &self.server.data.default_permissions,
5
        ]);
5

            
5
        self.client
5
            .logged_in_as(response.user_id, response.permissions.clone())
            .await;
5
        Ok(Response::Server(ServerResponse::Authenticated(response)))
10
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::AlterUserPermissionGroupMembershipHandler
    for ServerDispatcher<'s, B>
{
    type Action = BonsaiAction;

            
8
    async fn resource_name<'a>(
8
        &'a self,
8
        user: &'a NamedReference<'static>,
8
        _group: &'a NamedReference<'static>,
8
        _should_be_member: &'a bool,
8
    ) -> Result<ResourceName<'a>, Error> {
8
        let id = user
8
            .id::<User, _>(&self.server.admin().await)
2
            .await?
8
            .ok_or(bonsaidb_core::Error::UserNotFound)?;

            
8
        Ok(user_resource_name(id))
16
    }

            
8
    fn action() -> Self::Action {
8
        BonsaiAction::Server(ServerAction::ModifyUserPermissionGroups)
8
    }

            
8
    async fn handle_protected(
8
        &self,
8
        _permissions: &Permissions,
8
        user: NamedReference<'static>,
8
        group: NamedReference<'static>,
8
        should_be_member: bool,
8
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
8
        if should_be_member {
4
            self.server
4
                .add_permission_group_to_user(user, group)
2
                .await?;
        } else {
4
            self.server
4
                .remove_permission_group_from_user(user, group)
3
                .await?;
        }

            
8
        Ok(Response::Ok)
16
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::AlterUserRoleMembershipHandler
    for ServerDispatcher<'s, B>
{
    type Action = BonsaiAction;

            
8
    async fn resource_name<'a>(
8
        &'a self,
8
        user: &'a NamedReference<'static>,
8
        _role: &'a NamedReference<'static>,
8
        _should_be_member: &'a bool,
8
    ) -> Result<ResourceName<'a>, Error> {
8
        let id = user
8
            .id::<User, _>(&self.server.admin().await)
            .await?
8
            .ok_or(bonsaidb_core::Error::UserNotFound)?;

            
8
        Ok(user_resource_name(id))
16
    }

            
8
    fn action() -> Self::Action {
8
        BonsaiAction::Server(ServerAction::ModifyUserRoles)
8
    }

            
8
    async fn handle_protected(
8
        &self,
8
        _permissions: &Permissions,
8
        user: NamedReference<'static>,
8
        role: NamedReference<'static>,
8
        should_be_member: bool,
8
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
8
        if should_be_member {
4
            self.server.add_role_to_user(user, role).await?;
        } else {
4
            self.server.remove_role_from_user(user, role).await?;
        }

            
8
        Ok(Response::Ok)
16
    }
}

            
102092
#[derive(Dispatcher, Debug)]
#[dispatcher(input = DatabaseRequest, actionable = bonsaidb_core::actionable)]
struct DatabaseDispatcher<'s, B>
where
    B: Backend,
{
    name: String,
    database: Database,
    server_dispatcher: &'s ServerDispatcher<'s, B>,
}

            
impl<'s, B: Backend> DatabaseRequestDispatcher for DatabaseDispatcher<'s, B> {
    type Output = Response<CustomApiResult<B::CustomApi>>;
    type Error = Error;
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::GetHandler for DatabaseDispatcher<'s, B> {
    type Action = BonsaiAction;

            
8066
    async fn resource_name<'a>(
8066
        &'a self,
8066
        collection: &'a CollectionName,
8066
        id: &'a u64,
8066
    ) -> Result<ResourceName<'a>, Error> {
8066
        Ok(document_resource_name(&self.name, collection, *id))
8066
    }

            
8066
    fn action() -> Self::Action {
8066
        BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Get))
8066
    }

            
8066
    async fn handle_protected(
8066
        &self,
8066
        _permissions: &Permissions,
8066
        collection: CollectionName,
8066
        id: u64,
8066
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
8066
        let document = self
8066
            .database
8066
            .internal_get_from_collection_id(id, &collection)
4801
            .await?
8066
            .ok_or(Error::Core(bonsaidb_core::Error::DocumentNotFound(
8066
                collection, id,
8066
            )))?;
7562
        Ok(Response::Database(DatabaseResponse::Documents(vec![
7562
            document,
7562
        ])))
16132
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::GetMultipleHandler for DatabaseDispatcher<'s, B> {
4
    async fn verify_permissions(
4
        &self,
4
        permissions: &Permissions,
4
        collection: &CollectionName,
4
        ids: &Vec<u64>,
4
    ) -> Result<(), Error> {
12
        for &id in ids {
8
            let document_name = document_resource_name(&self.name, collection, id);
8
            let action = BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Get));
8
            permissions.check(&document_name, &action)?;
        }

            
4
        Ok(())
8
    }

            
4
    async fn handle_protected(
4
        &self,
4
        _permissions: &Permissions,
4
        collection: CollectionName,
4
        ids: Vec<u64>,
4
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
4
        let documents = self
4
            .database
4
            .internal_get_multiple_from_collection_id(&ids, &collection)
4
            .await?;
4
        Ok(Response::Database(DatabaseResponse::Documents(documents)))
8
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::ListHandler for DatabaseDispatcher<'s, B> {
    type Action = BonsaiAction;

            
8
    async fn resource_name<'a>(
8
        &'a self,
8
        collection: &'a CollectionName,
8
        _ids: &'a Range<u64>,
8
        _order: &'a Sort,
8
        _limit: &'a Option<usize>,
8
    ) -> Result<ResourceName<'a>, Error> {
8
        Ok(collection_resource_name(&self.name, collection))
8
    }

            
8
    fn action() -> Self::Action {
8
        BonsaiAction::Database(DatabaseAction::Document(DocumentAction::List))
8
    }

            
8
    async fn handle_protected(
8
        &self,
8
        _permissions: &Permissions,
8
        collection: CollectionName,
8
        ids: Range<u64>,
8
        order: Sort,
8
        limit: Option<usize>,
8
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
8
        let documents = self
8
            .database
8
            .list_from_collection(ids, order, limit, &collection)
5
            .await?;
8
        Ok(Response::Database(DatabaseResponse::Documents(documents)))
16
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::QueryHandler for DatabaseDispatcher<'s, B> {
    type Action = BonsaiAction;

            
4082
    async fn resource_name<'a>(
4082
        &'a self,
4082
        view: &'a ViewName,
4082
        _key: &'a Option<QueryKey<Bytes>>,
4082
        _order: &'a Sort,
4082
        _limit: &'a Option<usize>,
4082
        _access_policy: &'a AccessPolicy,
4082
        _with_docs: &'a bool,
4082
    ) -> Result<ResourceName<'a>, Error> {
4082
        Ok(view_resource_name(&self.name, view))
4082
    }

            
4082
    fn action() -> Self::Action {
4082
        BonsaiAction::Database(DatabaseAction::View(ViewAction::Query))
4082
    }

            
4082
    async fn handle_protected(
4082
        &self,
4082
        _permissions: &Permissions,
4082
        view: ViewName,
4082
        key: Option<QueryKey<Bytes>>,
4082
        order: Sort,
4082
        limit: Option<usize>,
4082
        access_policy: AccessPolicy,
4082
        with_docs: bool,
4082
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
4082
        if with_docs {
4032
            let mappings = self
4032
                .database
4130
                .query_by_name_with_docs(&view, key, order, limit, access_policy)
4130
                .await?;
4032
            Ok(Response::Database(DatabaseResponse::ViewMappingsWithDocs(
4032
                mappings,
4032
            )))
        } else {
50
            let mappings = self
50
                .database
50
                .query_by_name(&view, key, order, limit, access_policy)
35
                .await?;
50
            Ok(Response::Database(DatabaseResponse::ViewMappings(mappings)))
        }
8164
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::ReduceHandler for DatabaseDispatcher<'s, B> {
    type Action = BonsaiAction;

            
8009
    async fn resource_name<'a>(
8009
        &'a self,
8009
        view: &'a ViewName,
8009
        _key: &'a Option<QueryKey<Bytes>>,
8009
        _access_policy: &'a AccessPolicy,
8009
        _grouped: &'a bool,
8009
    ) -> Result<ResourceName<'a>, Error> {
8009
        Ok(view_resource_name(&self.name, view))
8009
    }

            
8009
    fn action() -> Self::Action {
8009
        BonsaiAction::Database(DatabaseAction::View(ViewAction::Reduce))
8009
    }

            
8009
    async fn handle_protected(
8009
        &self,
8009
        _permissions: &Permissions,
8009
        view: ViewName,
8009
        key: Option<QueryKey<Bytes>>,
8009
        access_policy: AccessPolicy,
8009
        grouped: bool,
8009
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
8009
        if grouped {
6
            let values = self
6
                .database
7
                .reduce_grouped_by_name(&view, key, access_policy)
5
                .await?;
6
            Ok(Response::Database(DatabaseResponse::ViewGroupedReduction(
6
                values,
6
            )))
        } else {
8003
            let value = self
8003
                .database
8004
                .reduce_by_name(&view, key, access_policy)
155
                .await?;
8001
            Ok(Response::Database(DatabaseResponse::ViewReduction(
8001
                Bytes::from(value),
8001
            )))
        }
16018
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::ApplyTransactionHandler
    for DatabaseDispatcher<'s, B>
{
9551
    async fn verify_permissions(
9551
        &self,
9551
        permissions: &Permissions,
9551
        transaction: &Transaction,
9551
    ) -> Result<(), Error> {
23040
        for op in &transaction.operations {
13491
            let (resource, action) = match &op.command {
9761
                Command::Insert { .. } => (
9761
                    collection_resource_name(&self.name, &op.collection),
9761
                    BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Insert)),
9761
                ),
3026
                Command::Update { header, .. } => (
3026
                    document_resource_name(&self.name, &op.collection, header.id),
3026
                    BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Update)),
3026
                ),
704
                Command::Delete { header } => (
704
                    document_resource_name(&self.name, &op.collection, header.id),
704
                    BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Delete)),
704
                ),
            };
13491
            permissions.check(&resource, &action)?;
        }

            
9549
        Ok(())
19102
    }

            
9549
    async fn handle_protected(
9549
        &self,
9549
        _permissions: &Permissions,
9549
        transaction: Transaction,
9549
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
9549
        let results = self.database.apply_transaction(transaction).await?;
9033
        Ok(Response::Database(DatabaseResponse::TransactionResults(
9033
            results,
9033
        )))
19098
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::DeleteDocsHandler for DatabaseDispatcher<'s, B> {
    type Action = BonsaiAction;

            
2
    async fn resource_name<'a>(
2
        &'a self,
2
        view: &'a ViewName,
2
        _key: &'a Option<QueryKey<Bytes>>,
2
        _access_policy: &'a AccessPolicy,
2
    ) -> Result<ResourceName<'a>, Error> {
2
        Ok(view_resource_name(&self.name, view))
2
    }

            
2
    fn action() -> Self::Action {
2
        BonsaiAction::Database(DatabaseAction::View(ViewAction::DeleteDocs))
2
    }

            
2
    async fn handle_protected(
2
        &self,
2
        _permissions: &Permissions,
2
        view: ViewName,
2
        key: Option<QueryKey<Bytes>>,
2
        access_policy: AccessPolicy,
2
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
2
        let count = self
2
            .database
4
            .delete_docs_by_name(&view, key, access_policy)
4
            .await?;
2
        Ok(Response::Database(DatabaseResponse::DocumentsDeleted(
2
            count,
2
        )))
4
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::ListExecutedTransactionsHandler
    for DatabaseDispatcher<'s, B>
{
    type Action = BonsaiAction;

            
1036
    async fn resource_name<'a>(
1036
        &'a self,
1036
        _starting_id: &'a Option<u64>,
1036
        _result_limit: &'a Option<usize>,
1036
    ) -> Result<ResourceName<'a>, Error> {
1036
        Ok(database_resource_name(&self.name))
1036
    }

            
1036
    fn action() -> Self::Action {
1036
        BonsaiAction::Database(DatabaseAction::Transaction(TransactionAction::ListExecuted))
1036
    }

            
1036
    async fn handle_protected(
1036
        &self,
1036
        _permissions: &Permissions,
1036
        starting_id: Option<u64>,
1036
        result_limit: Option<usize>,
1036
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
        Ok(Response::Database(DatabaseResponse::ExecutedTransactions(
1036
            self.database
1036
                .list_executed_transactions(starting_id, result_limit)
791
                .await?,
        )))
2072
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::LastTransactionIdHandler
    for DatabaseDispatcher<'s, B>
{
    type Action = BonsaiAction;

            
2
    async fn resource_name<'a>(&'a self) -> Result<ResourceName<'a>, Error> {
2
        Ok(database_resource_name(&self.name))
2
    }

            
2
    fn action() -> Self::Action {
2
        BonsaiAction::Database(DatabaseAction::Transaction(TransactionAction::GetLastId))
2
    }

            
2
    async fn handle_protected(
2
        &self,
2
        _permissions: &Permissions,
2
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
        Ok(Response::Database(DatabaseResponse::LastTransactionId(
2
            self.database.last_transaction_id().await?,
        )))
4
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::CreateSubscriberHandler
    for DatabaseDispatcher<'s, B>
{
    type Action = BonsaiAction;

            
14
    async fn resource_name<'a>(&'a self) -> Result<ResourceName<'a>, Error> {
14
        Ok(database_resource_name(&self.name))
14
    }

            
14
    fn action() -> Self::Action {
14
        BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::CreateSuscriber))
14
    }

            
14
    async fn handle_protected(
14
        &self,
14
        _permissions: &Permissions,
14
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
14
        let server = self.server_dispatcher.server;
14
        let subscriber = server.create_subscriber(self.name.clone()).await;
14
        let subscriber_id = subscriber.id;
14
        self.server_dispatcher
14
            .client
14
            .register_subscriber(subscriber_id)
            .await;

            
14
        let task_self = server.clone();
14
        let response_sender = self.server_dispatcher.response_sender.clone();
14
        tokio::spawn(async move {
14
            task_self
14
                .forward_notifications_for(
14
                    subscriber.id,
14
                    subscriber.receiver,
14
                    response_sender.clone(),
22
                )
22
                .await;
14
        });
14
        Ok(Response::Database(DatabaseResponse::SubscriberCreated {
14
            subscriber_id,
14
        }))
28
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::PublishHandler for DatabaseDispatcher<'s, B> {
    type Action = BonsaiAction;

            
16
    async fn resource_name<'a>(
16
        &'a self,
16
        topic: &'a String,
16
        _payload: &'a Bytes,
16
    ) -> Result<ResourceName<'a>, Error> {
16
        Ok(pubsub_topic_resource_name(&self.name, topic))
16
    }

            
16
    fn action() -> Self::Action {
16
        BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::Publish))
16
    }

            
16
    async fn handle_protected(
16
        &self,
16
        _permissions: &Permissions,
16
        topic: String,
16
        payload: Bytes,
16
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
16
        self.server_dispatcher
16
            .server
16
            .publish_message(&self.name, &topic, payload.into_vec())
            .await;
16
        Ok(Response::Ok)
32
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::PublishToAllHandler for DatabaseDispatcher<'s, B> {
2
    async fn verify_permissions(
2
        &self,
2
        permissions: &Permissions,
2
        topics: &Vec<String>,
2
        _payload: &Bytes,
2
    ) -> Result<(), Error> {
8
        for topic in topics {
6
            let topic_name = pubsub_topic_resource_name(&self.name, topic);
6
            let action = BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::Publish));
6
            if !permissions.allowed_to(&topic_name, &action) {
                return Err(Error::from(PermissionDenied {
                    resource: topic_name.to_owned(),
                    action: action.name(),
                }));
6
            }
        }

            
2
        Ok(())
4
    }

            
2
    async fn handle_protected(
2
        &self,
2
        _permissions: &Permissions,
2
        topics: Vec<String>,
2
        payload: Bytes,
2
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
2
        self.server_dispatcher
2
            .server
2
            .publish_serialized_to_all(&self.name, &topics, payload.into_vec())
            .await;
2
        Ok(Response::Ok)
4
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::SubscribeToHandler for DatabaseDispatcher<'s, B> {
    type Action = BonsaiAction;

            
24
    async fn resource_name<'a>(
24
        &'a self,
24
        _subscriber_id: &'a u64,
24
        topic: &'a String,
24
    ) -> Result<ResourceName<'a>, Error> {
24
        Ok(pubsub_topic_resource_name(&self.name, topic))
24
    }

            
24
    fn action() -> Self::Action {
24
        BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::SubscribeTo))
24
    }

            
24
    async fn handle_protected(
24
        &self,
24
        _permissions: &Permissions,
24
        subscriber_id: u64,
24
        topic: String,
24
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
24
        if self
24
            .server_dispatcher
24
            .client
24
            .owns_subscriber(subscriber_id)
            .await
        {
24
            self.server_dispatcher
24
                .server
24
                .subscribe_to(subscriber_id, &self.name, topic)
                .await
24
                .map(|_| Response::Ok)
24
                .map_err(Error::from)
        } else {
            Err(Error::Transport(String::from("invalid subscriber_id")))
        }
48
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::UnsubscribeFromHandler
    for DatabaseDispatcher<'s, B>
{
    type Action = BonsaiAction;

            
2
    async fn resource_name<'a>(
2
        &'a self,
2
        _subscriber_id: &'a u64,
2
        topic: &'a String,
2
    ) -> Result<ResourceName<'a>, Error> {
2
        Ok(pubsub_topic_resource_name(&self.name, topic))
2
    }

            
2
    fn action() -> Self::Action {
2
        BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::UnsubscribeFrom))
2
    }

            
2
    async fn handle_protected(
2
        &self,
2
        _permissions: &Permissions,
2
        subscriber_id: u64,
2
        topic: String,
2
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
2
        if self
2
            .server_dispatcher
2
            .client
2
            .owns_subscriber(subscriber_id)
            .await
        {
2
            self.server_dispatcher
2
                .server
2
                .unsubscribe_from(subscriber_id, &self.name, &topic)
                .await
2
                .map(|_| Response::Ok)
2
                .map_err(Error::from)
        } else {
            Err(Error::Transport(String::from("invalid subscriber_id")))
        }
4
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::UnregisterSubscriberHandler
    for DatabaseDispatcher<'s, B>
{
    async fn handle(
        &self,
        _permissions: &Permissions,
        subscriber_id: u64,
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
        if self
            .server_dispatcher
            .client
            .remove_subscriber(subscriber_id)
            .await
        {
            let mut subscribers = fast_async_write!(self.server_dispatcher.subscribers);
            if subscribers.remove(&subscriber_id).is_none() {
                Ok(Response::Error(bonsaidb_core::Error::Server(String::from(
                    "invalid subscriber id",
                ))))
            } else {
                Ok(Response::Ok)
            }
        } else {
            Err(Error::Transport(String::from("invalid subscriber_id")))
        }
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::ExecuteKeyOperationHandler
    for DatabaseDispatcher<'s, B>
{
    type Action = BonsaiAction;

            
20222
    async fn resource_name<'a>(&'a self, op: &'a KeyOperation) -> Result<ResourceName<'a>, Error> {
20222
        Ok(keyvalue_key_resource_name(
20222
            &self.name,
20222
            op.namespace.as_deref(),
20222
            &op.key,
20222
        ))
20222
    }

            
20222
    fn action() -> Self::Action {
20222
        BonsaiAction::Database(DatabaseAction::KeyValue(KeyValueAction::ExecuteOperation))
20222
    }

            
20222
    async fn handle_protected(
20222
        &self,
20222
        _permissions: &Permissions,
20222
        op: KeyOperation,
20222
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
20222
        let result = self.database.execute_key_operation(op).await?;
20214
        Ok(Response::Database(DatabaseResponse::KvOutput(result)))
40444
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::CompactCollectionHandler
    for DatabaseDispatcher<'s, B>
{
    type Action = BonsaiAction;

            
2
    async fn resource_name<'a>(
2
        &'a self,
2
        collection: &'a CollectionName,
2
    ) -> Result<ResourceName<'a>, Error> {
2
        Ok(collection_resource_name(&self.name, collection))
2
    }

            
2
    fn action() -> Self::Action {
2
        BonsaiAction::Database(DatabaseAction::Compact)
2
    }

            
2
    async fn handle_protected(
2
        &self,
2
        _permissions: &Permissions,
2
        collection: CollectionName,
2
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
2
        self.database.compact_collection_by_name(collection).await?;

            
2
        Ok(Response::Ok)
4
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::CompactKeyValueStoreHandler
    for DatabaseDispatcher<'s, B>
{
    type Action = BonsaiAction;

            
2
    async fn resource_name<'a>(&'a self) -> Result<ResourceName<'a>, Error> {
2
        Ok(kv_resource_name(&self.name))
2
    }

            
2
    fn action() -> Self::Action {
2
        BonsaiAction::Database(DatabaseAction::Compact)
2
    }

            
2
    async fn handle_protected(
2
        &self,
2
        _permissions: &Permissions,
2
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
2
        self.database.compact_key_value_store().await?;

            
2
        Ok(Response::Ok)
4
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::CompactHandler for DatabaseDispatcher<'s, B> {
    type Action = BonsaiAction;

            
2
    async fn resource_name<'a>(&'a self) -> Result<ResourceName<'a>, Error> {
2
        Ok(database_resource_name(&self.name))
2
    }

            
2
    fn action() -> Self::Action {
2
        BonsaiAction::Database(DatabaseAction::Compact)
2
    }

            
2
    async fn handle_protected(
2
        &self,
2
        _permissions: &Permissions,
2
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
2
        self.database.compact().await?;

            
2
        Ok(Response::Ok)
4
    }
}

            
76
#[derive(Default)]
struct AlpnKeys(Arc<std::sync::Mutex<HashMap<String, Arc<rustls::sign::CertifiedKey>>>>);

            
impl Debug for AlpnKeys {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_tuple("AlpnKeys").finish()
    }
}

            
impl Deref for AlpnKeys {
    type Target = Arc<std::sync::Mutex<HashMap<String, Arc<rustls::sign::CertifiedKey>>>>;

            
    fn deref(&self) -> &Self::Target {
        &self.0
    }
}