1
use std::{
2
    collections::{hash_map, HashMap},
3
    fmt::Debug,
4
    marker::PhantomData,
5
    net::SocketAddr,
6
    ops::Deref,
7
    path::PathBuf,
8
    sync::{
9
        atomic::{AtomicU32, AtomicUsize, Ordering},
10
        Arc,
11
    },
12
    time::Duration,
13
};
14

            
15
use async_lock::{Mutex, RwLock};
16
use async_trait::async_trait;
17
use bonsaidb_core::{
18
    admin::User,
19
    arc_bytes::serde::Bytes,
20
    circulate::{Message, Relay, Subscriber},
21
    connection::{self, AccessPolicy, Connection, QueryKey, Range, Sort, StorageConnection},
22
    custom_api::{CustomApi, CustomApiResult},
23
    document::DocumentId,
24
    keyvalue::{KeyOperation, KeyValue},
25
    networking::{
26
        self, CreateDatabaseHandler, DatabaseRequest, DatabaseRequestDispatcher, DatabaseResponse,
27
        DeleteDatabaseHandler, Payload, Request, RequestDispatcher, Response, ServerRequest,
28
        ServerRequestDispatcher, ServerResponse, CURRENT_PROTOCOL_VERSION,
29
    },
30
    permissions::{
31
        bonsai::{
32
            bonsaidb_resource_name, collection_resource_name, database_resource_name,
33
            document_resource_name, keyvalue_key_resource_name, kv_resource_name,
34
            pubsub_topic_resource_name, user_resource_name, view_resource_name, BonsaiAction,
35
            DatabaseAction, DocumentAction, KeyValueAction, PubSubAction, ServerAction,
36
            TransactionAction, ViewAction,
37
        },
38
        Action, Dispatcher, PermissionDenied, Permissions, ResourceName,
39
    },
40
    pubsub::database_topic,
41
    schema::{self, CollectionName, Nameable, NamedCollection, NamedReference, Schema, ViewName},
42
    transaction::{Command, Transaction},
43
};
44
#[cfg(feature = "password-hashing")]
45
use bonsaidb_core::{
46
    connection::{Authenticated, Authentication},
47
    permissions::bonsai::AuthenticationMethod,
48
};
49
use bonsaidb_local::{
50
    config::Builder,
51
    jobs::{manager::Manager, Job},
52
    Database, Storage,
53
};
54
use bonsaidb_utils::{fast_async_lock, fast_async_read, fast_async_write};
55
use derive_where::derive_where;
56
use fabruic::{self, CertificateChain, Endpoint, KeyPair, PrivateKey};
57
use flume::Sender;
58
use futures::{Future, StreamExt};
59
use rustls::sign::CertifiedKey;
60
use schema::SchemaName;
61
#[cfg(not(windows))]
62
use signal_hook::consts::SIGQUIT;
63
use signal_hook::consts::{SIGINT, SIGTERM};
64
use tokio::sync::Notify;
65

            
66
#[cfg(feature = "acme")]
67
use crate::config::AcmeConfiguration;
68
use crate::{
69
    backend::{BackendError, ConnectionHandling, CustomApiDispatcher},
70
    error::Error,
71
    hosted::{Hosted, SerializablePrivateKey, TlsCertificate, TlsCertificatesByDomain},
72
    server::shutdown::{Shutdown, ShutdownState},
73
    Backend, NoBackend, ServerConfiguration,
74
};
75

            
76
#[cfg(feature = "acme")]
77
pub mod acme;
78
mod connected_client;
79
mod database;
80

            
81
mod shutdown;
82
mod tcp;
83
#[cfg(feature = "websockets")]
84
mod websockets;
85

            
86
use self::connected_client::OwnedClient;
87
pub use self::{
88
    connected_client::{ConnectedClient, LockedClientDataGuard, Transport},
89
    database::{ServerDatabase, ServerSubscriber},
90
    tcp::{ApplicationProtocols, HttpService, Peer, StandardTcpProtocols, TcpService},
91
};
92

            
93
static CONNECTED_CLIENT_ID_COUNTER: AtomicU32 = AtomicU32::new(0);
94

            
95
/// A BonsaiDb server.
96
#[derive(Debug)]
97
52835
#[derive_where(Clone)]
98
pub struct CustomServer<B: Backend = NoBackend> {
99
    data: Arc<Data<B>>,
100
}
101

            
102
/// A BonsaiDb server without a custom backend.
103
pub type Server = CustomServer<NoBackend>;
104

            
105
#[derive(Debug)]
106
struct Data<B: Backend = NoBackend> {
107
    storage: Storage,
108
    clients: RwLock<HashMap<u32, ConnectedClient<B>>>,
109
    request_processor: Manager,
110
    default_permissions: Permissions,
111
    authenticated_permissions: Permissions,
112
    endpoint: RwLock<Option<Endpoint>>,
113
    client_simultaneous_request_limit: usize,
114
    primary_tls_key: CachedCertifiedKey,
115
    primary_domain: String,
116
    #[cfg(feature = "acme")]
117
    acme: AcmeConfiguration,
118
    #[cfg(feature = "acme")]
119
    alpn_keys: AlpnKeys,
120
    shutdown: Shutdown,
121
    relay: Relay,
122
    subscribers: Arc<RwLock<HashMap<u64, Subscriber>>>,
123
    _backend: PhantomData<B>,
124
}
125

            
126
80
#[derive(Default)]
127
struct CachedCertifiedKey(parking_lot::Mutex<Option<Arc<CertifiedKey>>>);
128

            
129
impl Debug for CachedCertifiedKey {
130
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131
        f.debug_tuple("CachedCertifiedKey").finish()
132
    }
133
}
134

            
135
impl Deref for CachedCertifiedKey {
136
    type Target = parking_lot::Mutex<Option<Arc<CertifiedKey>>>;
137

            
138
824
    fn deref(&self) -> &Self::Target {
139
824
        &self.0
140
824
    }
141
}
142

            
143
impl<B: Backend> CustomServer<B> {
144
    /// Opens a server using `directory` for storage.
145
80
    pub async fn open(configuration: ServerConfiguration) -> Result<Self, Error> {
146
80
        let request_processor = Manager::default();
147
1280
        for _ in 0..configuration.request_workers {
148
1280
            request_processor.spawn_worker();
149
1280
        }
150

            
151
1204
        let storage = Storage::open(configuration.storage.with_schema::<Hosted>()?).await?;
152

            
153
150
        storage.create_database::<Hosted>("_hosted", true).await?;
154

            
155
80
        let default_permissions = Permissions::from(configuration.default_permissions);
156
80
        let authenticated_permissions = Permissions::from(configuration.authenticated_permissions);
157
80

            
158
80
        let server = Self {
159
80
            data: Arc::new(Data {
160
80
                clients: RwLock::default(),
161
80
                storage,
162
80
                endpoint: RwLock::default(),
163
80
                request_processor,
164
80
                default_permissions,
165
80
                authenticated_permissions,
166
80
                client_simultaneous_request_limit: configuration.client_simultaneous_request_limit,
167
80
                primary_tls_key: CachedCertifiedKey::default(),
168
80
                primary_domain: configuration.server_name,
169
80
                #[cfg(feature = "acme")]
170
80
                acme: configuration.acme,
171
80
                #[cfg(feature = "acme")]
172
80
                alpn_keys: AlpnKeys::default(),
173
80
                shutdown: Shutdown::new(),
174
80
                relay: Relay::default(),
175
80
                subscribers: Arc::default(),
176
80
                _backend: PhantomData::default(),
177
80
            }),
178
80
        };
179
80
        B::initialize(&server).await;
180
79
        Ok(server)
181
79
    }
182

            
183
    /// Returns the path to the public pinned certificate, if this server has
184
    /// one. Note: this function will always succeed, but the file may not
185
    /// exist.
186
    #[must_use]
187
148
    pub fn pinned_certificate_path(&self) -> PathBuf {
188
148
        self.path().join("pinned-certificate.der")
189
148
    }
190

            
191
    /// Returns the primary domain configured for this server.
192
    #[must_use]
193
32
    pub fn primary_domain(&self) -> &str {
194
32
        &self.data.primary_domain
195
32
    }
196

            
197
    /// Returns the administration database.
198
23
    pub async fn admin(&self) -> ServerDatabase<B> {
199
23
        let db = self.data.storage.admin().await;
200
23
        ServerDatabase {
201
23
            server: self.clone(),
202
23
            db,
203
23
        }
204
23
    }
205

            
206
283
    pub(crate) async fn hosted(&self) -> ServerDatabase<B> {
207
283
        let db = self
208
283
            .data
209
283
            .storage
210
283
            .database::<Hosted>("_hosted")
211
76
            .await
212
283
            .unwrap();
213
283
        ServerDatabase {
214
283
            server: self.clone(),
215
283
            db,
216
283
        }
217
283
    }
218

            
219
    /// Installs an X.509 certificate used for general purpose connections.
220
75
    pub async fn install_self_signed_certificate(&self, overwrite: bool) -> Result<(), Error> {
221
75
        let keypair = KeyPair::new_self_signed(&self.data.primary_domain);
222
75

            
223
144
        if self.certificate_chain().await.is_ok() && !overwrite {
224
1
            return Err(Error::Core(bonsaidb_core::Error::Configuration(String::from("Certificate already installed. Enable overwrite if you wish to replace the existing certificate."))));
225
74
        }
226
74

            
227
358
        self.install_certificate(keypair.certificate_chain(), keypair.private_key())
228
358
            .await?;
229

            
230
74
        tokio::fs::write(
231
74
            self.pinned_certificate_path(),
232
74
            keypair.end_entity_certificate().as_ref(),
233
74
        )
234
62
        .await?;
235

            
236
74
        Ok(())
237
75
    }
238

            
239
    /// Installs a certificate chain and private key used for TLS connections.
240
    #[cfg(feature = "pem")]
241
    pub async fn install_pem_certificate(
242
        &self,
243
        certificate_chain: &[u8],
244
        private_key: &[u8],
245
    ) -> Result<(), Error> {
246
        let private_key = match pem::parse(private_key) {
247
            Ok(pem) => PrivateKey::unchecked_from_der(pem.contents),
248
            Err(_) => PrivateKey::from_der(private_key)?,
249
        };
250
        let certificates = pem::parse_many(&certificate_chain)?
251
            .into_iter()
252
            .map(|entry| fabruic::Certificate::unchecked_from_der(entry.contents))
253
            .collect::<Vec<_>>();
254

            
255
        self.install_certificate(
256
            &CertificateChain::unchecked_from_certificates(certificates),
257
            &private_key,
258
        )
259
        .await
260
    }
261

            
262
    /// Installs a certificate chain and private key used for TLS connections.
263
74
    pub async fn install_certificate(
264
74
        &self,
265
74
        certificate_chain: &CertificateChain,
266
74
        private_key: &PrivateKey,
267
74
    ) -> Result<(), Error> {
268
74
        let db = self.hosted().await;
269

            
270
74
        TlsCertificate::entry(&self.data.primary_domain, &db)
271
74
            .update_with(|cert: &mut TlsCertificate| {
272
1
                cert.certificate_chain = certificate_chain.clone();
273
1
                cert.private_key = SerializablePrivateKey(private_key.clone());
274
74
            })
275
74
            .or_insert_with(|| TlsCertificate {
276
73
                domains: vec![self.data.primary_domain.clone()],
277
73
                private_key: SerializablePrivateKey(private_key.clone()),
278
73
                certificate_chain: certificate_chain.clone(),
279
219
            })
280
219
            .await?;
281

            
282
138
        self.refresh_certified_key().await?;
283

            
284
74
        let pinned_certificate_path = self.pinned_certificate_path();
285
74
        if pinned_certificate_path.exists() {
286
1
            tokio::fs::remove_file(&pinned_certificate_path).await?;
287
73
        }
288

            
289
74
        Ok(())
290
74
    }
291

            
292
75
    async fn refresh_certified_key(&self) -> Result<(), Error> {
293
142
        let certificate = self.tls_certificate().await?;
294

            
295
75
        let mut cached_key = self.data.primary_tls_key.lock();
296
75
        let private_key = rustls::PrivateKey(
297
75
            fabruic::dangerous::PrivateKey::as_ref(&certificate.private_key.0).to_vec(),
298
75
        );
299
75
        let private_key = rustls::sign::any_ecdsa_type(&Arc::new(private_key))?;
300

            
301
75
        let certificates = certificate
302
75
            .certificate_chain
303
75
            .iter()
304
75
            .map(|cert| rustls::Certificate(cert.as_ref().to_vec()))
305
75
            .collect::<Vec<_>>();
306
75

            
307
75
        let certified_key = Arc::new(CertifiedKey::new(certificates, private_key));
308
75
        *cached_key = Some(certified_key);
309
75
        Ok(())
310
75
    }
311

            
312
97
    async fn tls_certificate(&self) -> Result<TlsCertificate, Error> {
313
97
        let db = self.hosted().await;
314
97
        let (_, certificate) = db
315
97
            .view::<TlsCertificatesByDomain>()
316
97
            .with_key(self.data.primary_domain.clone())
317
170
            .query_with_collection_docs()
318
170
            .await?
319
            .documents
320
97
            .into_iter()
321
97
            .next()
322
97
            .ok_or_else(|| {
323
                Error::Core(bonsaidb_core::Error::Configuration(format!(
324
                    "no certificate found for {}",
325
                    self.data.primary_domain
326
                )))
327
97
            })?;
328
97
        Ok(certificate.contents)
329
97
    }
330

            
331
    /// Returns the current certificate chain.
332
112
    pub async fn certificate_chain(&self) -> Result<CertificateChain, Error> {
333
112
        let db = self.hosted().await;
334
112
        if let Some(mapping) = db
335
112
            .view::<TlsCertificatesByDomain>()
336
112
            .with_key(self.data.primary_domain.clone())
337
113
            .query()
338
92
            .await?
339
112
            .into_iter()
340
112
            .next()
341
        {
342
36
            Ok(mapping.value)
343
        } else {
344
76
            Err(Error::Core(bonsaidb_core::Error::Configuration(format!(
345
76
                "no certificate found for {}",
346
76
                self.data.primary_domain
347
76
            ))))
348
        }
349
112
    }
350

            
351
    /// Listens for incoming client connections. Does not return until the
352
    /// server shuts down.
353
22
    pub async fn listen_on(&self, port: u16) -> Result<(), Error> {
354
29
        let certificate = self.tls_certificate().await?;
355
22
        let keypair =
356
22
            KeyPair::from_parts(certificate.certificate_chain, certificate.private_key.0)?;
357
22
        let mut builder = Endpoint::builder();
358
22
        builder.set_protocols([CURRENT_PROTOCOL_VERSION.as_bytes().to_vec()]);
359
22
        builder.set_address(([0; 8], port).into());
360
22
        builder
361
22
            .set_max_idle_timeout(None)
362
22
            .map_err(|err| Error::Core(bonsaidb_core::Error::Transport(err.to_string())))?;
363
22
        builder.set_server_key_pair(Some(keypair));
364
22
        let mut server = builder
365
22
            .build()
366
22
            .map_err(|err| Error::Core(bonsaidb_core::Error::Transport(err.to_string())))?;
367
22
        {
368
22
            let mut endpoint = fast_async_write!(self.data.endpoint);
369
22
            *endpoint = Some(server.clone());
370
        }
371

            
372
22
        let mut shutdown_watcher = self
373
22
            .data
374
22
            .shutdown
375
22
            .watcher()
376
            .await
377
22
            .expect("server already shut down");
378

            
379
83
        while let Some(result) = tokio::select! {
380
4
            shutdown_state = shutdown_watcher.wait_for_shutdown() => {
381
                drop(server.close_incoming());
382
                if matches!(shutdown_state, ShutdownState::GracefulShutdown) {
383
                    server.wait_idle().await;
384
                }
385
                drop(server.close());
386
                None
387
            },
388
61
            msg = server.next() => msg
389
61
        } {
390
61
            let connection = result.accept::<()>().await?;
391
61
            let task_self = self.clone();
392
61
            tokio::spawn(async move {
393
61
                let address = connection.remote_address();
394
61
                if let Err(err) = task_self.handle_bonsai_connection(connection).await {
395
                    log::error!("[server] closing connection {}: {:?}", address, err);
396
61
                }
397
61
            });
398
        }
399

            
400
4
        Ok(())
401
4
    }
402

            
403
    /// Returns all of the currently connected clients.
404
    pub async fn connected_clients(&self) -> Vec<ConnectedClient<B>> {
405
        let clients = fast_async_read!(self.data.clients);
406
        clients.values().cloned().collect()
407
    }
408

            
409
    /// Sends a custom API response to all connected clients.
410
    pub async fn broadcast(&self, response: CustomApiResult<B::CustomApi>) {
411
        let clients = fast_async_read!(self.data.clients);
412
        for client in clients.values() {
413
            drop(client.send(response.clone()));
414
        }
415
    }
416

            
417
116
    async fn initialize_client(
418
116
        &self,
419
116
        transport: Transport,
420
116
        address: SocketAddr,
421
116
        sender: Sender<CustomApiResult<B::CustomApi>>,
422
116
    ) -> Option<OwnedClient<B>> {
423
116
        if !self.data.default_permissions.allowed_to(
424
116
            &bonsaidb_resource_name(),
425
116
            &BonsaiAction::Server(ServerAction::Connect),
426
116
        ) {
427
            return None;
428
116
        }
429

            
430
116
        let client = loop {
431
116
            let next_id = CONNECTED_CLIENT_ID_COUNTER.fetch_add(1, Ordering::SeqCst);
432
116
            let mut clients = fast_async_write!(self.data.clients);
433
116
            if let hash_map::Entry::Vacant(e) = clients.entry(next_id) {
434
116
                let client = OwnedClient::new(next_id, address, transport, sender, self.clone());
435
116
                e.insert(client.clone());
436
116
                break client;
437
            }
438
        };
439

            
440
        if matches!(
441
116
            B::client_connected(&client, self).await,
442
            ConnectionHandling::Accept
443
        ) {
444
116
            Some(client)
445
        } else {
446
            None
447
        }
448
116
    }
449

            
450
56
    async fn disconnect_client(&self, id: u32) {
451
56
        if let Some(client) = {
452
56
            let mut clients = fast_async_write!(self.data.clients);
453
56
            clients.remove(&id)
454
        } {
455
56
            B::client_disconnected(client, self).await;
456
        }
457
56
    }
458

            
459
61
    async fn handle_bonsai_connection(
460
61
        &self,
461
61
        mut connection: fabruic::Connection<()>,
462
61
    ) -> Result<(), Error> {
463
61
        if let Some(incoming) = connection.next().await {
464
61
            let incoming = match incoming {
465
61
                Ok(incoming) => incoming,
466
                Err(err) => {
467
                    log::error!("[server] Error establishing a stream: {:?}", err);
468
                    return Ok(());
469
                }
470
            };
471

            
472
61
            match incoming
473
61
            .accept::<networking::Payload<Response<CustomApiResult<B::CustomApi>>>, networking::Payload<Request<<B::CustomApi as CustomApi>::Request>>>()
474
            .await {
475
61
                Ok((sender, receiver)) => {
476
61
                    let (api_response_sender, api_response_receiver) = flume::unbounded();
477
61
                    if let Some(disconnector) = self
478
61
                        .initialize_client(
479
61
                            Transport::Bonsai,
480
61
                            connection.remote_address(),
481
61
                            api_response_sender,
482
61
                        )
483
                        .await
484
61
                    {
485
61
                        let task_sender = sender.clone();
486
61
                        tokio::spawn(async move {
487
61
                            while let Ok(response) = api_response_receiver.recv_async().await {
488
                                if task_sender
489
                                    .send(&Payload {
490
                                        id: None,
491
                                        wrapped: Response::Api(response),
492
                                    })
493
                                    .is_err()
494
                                {
495
                                    break;
496
                                }
497
                            }
498
15
                            let _ = connection.close().await;
499
61
                        });
500
61

            
501
61
                        let task_self = self.clone();
502
61
                        tokio::spawn(async move {
503
61
                            if let Err(err) = task_self
504
18976
                                .handle_stream(disconnector, sender, receiver)
505
18976
                                .await
506
                            {
507
                                log::error!("[server] Error handling stream: {:?}", err);
508
16
                            }
509
61
                        });
510
61
                    } else {
511
                        log::error!("[server] Backend rejected connection.");
512
                        return Ok(());
513
                    }
514
                }
515
                Err(err) => {
516
                    log::error!("[server] Error accepting incoming stream: {:?}", err);
517
                    return Ok(());
518
                }
519
            }
520
        }
521
61
        Ok(())
522
61
    }
523

            
524
116
    async fn handle_client_requests(
525
116
        &self,
526
116
        client: ConnectedClient<B>,
527
116
        request_receiver: flume::Receiver<Payload<Request<<B::CustomApi as CustomApi>::Request>>>,
528
116
        response_sender: flume::Sender<Payload<Response<CustomApiResult<B::CustomApi>>>>,
529
116
    ) {
530
116
        let notify = Arc::new(Notify::new());
531
116
        let requests_in_queue = Arc::new(AtomicUsize::new(0));
532
72466
        loop {
533
72466
            let current_requests = requests_in_queue.load(Ordering::SeqCst);
534
72466
            if current_requests == self.data.client_simultaneous_request_limit {
535
                // Wait for requests to finish.
536
20419
                notify.notified().await;
537
52047
            } else if requests_in_queue
538
52047
                .compare_exchange(
539
52047
                    current_requests,
540
52047
                    current_requests + 1,
541
52047
                    Ordering::SeqCst,
542
52047
                    Ordering::SeqCst,
543
52047
                )
544
52047
                .is_ok()
545
            {
546
52047
                let payload = match request_receiver.recv_async().await {
547
51931
                    Ok(payload) => payload,
548
57
                    Err(_) => break,
549
                };
550
51931
                let id = payload.id;
551
51931
                let task_sender = response_sender.clone();
552
51931

            
553
51931
                let notify = notify.clone();
554
51931
                let requests_in_queue = requests_in_queue.clone();
555
51931
                self.handle_request_through_worker(
556
51931
                    payload.wrapped,
557
51931
                    move |response| async move {
558
51931
                        drop(task_sender.send(Payload {
559
51931
                            id,
560
51931
                            wrapped: response,
561
51931
                        }));
562
51931

            
563
51931
                        requests_in_queue.fetch_sub(1, Ordering::SeqCst);
564
51931

            
565
51931
                        notify.notify_one();
566
51931

            
567
51931
                        Ok(())
568
51931
                    },
569
51931
                    client.clone(),
570
51931
                    self.data.subscribers.clone(),
571
51931
                    response_sender.clone(),
572
51931
                )
573
618
                .await
574
51931
                .unwrap();
575
            }
576
        }
577
57
    }
578

            
579
51931
    async fn handle_request_through_worker<
580
51931
        F: FnOnce(Response<CustomApiResult<B::CustomApi>>) -> R + Send + 'static,
581
51931
        R: Future<Output = Result<(), Error>> + Send,
582
51931
    >(
583
51931
        &self,
584
51931
        request: Request<<B::CustomApi as CustomApi>::Request>,
585
51931
        callback: F,
586
51931
        client: ConnectedClient<B>,
587
51931
        subscribers: Arc<RwLock<HashMap<u64, Subscriber>>>,
588
51931
        response_sender: flume::Sender<Payload<Response<CustomApiResult<B::CustomApi>>>>,
589
51931
    ) -> Result<(), Error> {
590
51931
        let job = self
591
51931
            .data
592
51931
            .request_processor
593
51931
            .enqueue(ClientRequest::<B>::new(
594
51931
                request,
595
51931
                self.clone(),
596
51931
                client,
597
51931
                subscribers,
598
51931
                response_sender,
599
51931
            ))
600
618
            .await;
601
51931
        tokio::spawn(async move {
602
51931
            let result = job.receive().await?;
603
            // Map the error into a Response::Error. The jobs system supports
604
            // multiple receivers receiving output, and wraps Err to avoid
605
            // requiring the error to be cloneable. As such, we have to unwrap
606
            // it. Thankfully, we can guarantee nothing else is waiting on a
607
            // response to a request than the original requestor, so this can be
608
            // safely unwrapped.
609
51931
            let result =
610
51931
                result.unwrap_or_else(|err| Response::Error(Arc::try_unwrap(err).unwrap().into()));
611
51931
            callback(result).await?;
612
51930
            Result::<(), Error>::Ok(())
613
51931
        });
614
51931
        Ok(())
615
51931
    }
616

            
617
61
    async fn handle_stream(
618
61
        &self,
619
61
        client: OwnedClient<B>,
620
61
        sender: fabruic::Sender<Payload<Response<CustomApiResult<B::CustomApi>>>>,
621
61
        mut receiver: fabruic::Receiver<Payload<Request<<B::CustomApi as CustomApi>::Request>>>,
622
61
    ) -> Result<(), Error> {
623
61
        let (payload_sender, payload_receiver) = flume::unbounded();
624
61
        tokio::spawn(async move {
625
30298
            while let Ok(payload) = payload_receiver.recv_async().await {
626
30237
                if sender.send(&payload).is_err() {
627
                    break;
628
30237
                }
629
            }
630
61
        });
631
61

            
632
61
        let (request_sender, request_receiver) =
633
61
            flume::bounded::<Payload<Request<<B::CustomApi as CustomApi>::Request>>>(
634
61
                self.data.client_simultaneous_request_limit,
635
61
            );
636
61
        let task_self = self.clone();
637
61
        tokio::spawn(async move {
638
61
            task_self
639
24200
                .handle_client_requests(client.clone(), request_receiver, payload_sender)
640
24200
                .await;
641
61
        });
642

            
643
30284
        while let Some(payload) = receiver.next().await {
644
30223
            drop(request_sender.send_async(payload?).await);
645
        }
646

            
647
16
        Ok(())
648
16
    }
649

            
650
14
    async fn forward_notifications_for(
651
14
        &self,
652
14
        subscriber_id: u64,
653
14
        receiver: flume::Receiver<Arc<Message>>,
654
14
        sender: flume::Sender<Payload<Response<CustomApiResult<B::CustomApi>>>>,
655
14
    ) {
656
42
        while let Ok(message) = receiver.recv_async().await {
657
28
            if sender
658
28
                .send(Payload {
659
28
                    id: None,
660
28
                    wrapped: Response::Database(DatabaseResponse::MessageReceived {
661
28
                        subscriber_id,
662
28
                        topic: message.topic.clone(),
663
28
                        payload: Bytes::from(&message.payload[..]),
664
28
                    }),
665
28
                })
666
28
                .is_err()
667
            {
668
                break;
669
28
            }
670
        }
671
    }
672

            
673
    /// Shuts the server down. If a `timeout` is provided, the server will stop
674
    /// accepting new connections and attempt to respond to any outstanding
675
    /// requests already being processed. After the `timeout` has elapsed or if
676
    /// no `timeout` was provided, the server is forcefully shut down.
677
29
    pub async fn shutdown(&self, timeout: Option<Duration>) -> Result<(), Error> {
678
29
        if let Some(timeout) = timeout {
679
4
            self.data.shutdown.graceful_shutdown(timeout).await;
680
        } else {
681
25
            self.data.shutdown.shutdown().await;
682
        }
683

            
684
29
        Ok(())
685
29
    }
686

            
687
    /// Listens for signals from the operating system that the server should
688
    /// shut down and attempts to gracefully shut down.
689
1
    pub async fn listen_for_shutdown(&self) -> Result<(), Error> {
690
1
        const GRACEFUL_SHUTDOWN: usize = 1;
691
1
        const TERMINATE: usize = 2;
692
1

            
693
1
        enum SignalShutdownState {
694
1
            Running,
695
1
            ShuttingDown(flume::Receiver<()>),
696
1
        }
697
1

            
698
1
        let shutdown_state = Arc::new(Mutex::new(SignalShutdownState::Running));
699
1
        let flag = Arc::new(AtomicUsize::default());
700
1
        let register_hook = |flag: &Arc<AtomicUsize>| {
701
1
            signal_hook::flag::register_usize(SIGINT, flag.clone(), GRACEFUL_SHUTDOWN)?;
702
1
            signal_hook::flag::register_usize(SIGTERM, flag.clone(), TERMINATE)?;
703
            #[cfg(not(windows))]
704
1
            signal_hook::flag::register_usize(SIGQUIT, flag.clone(), TERMINATE)?;
705
1
            Result::<(), std::io::Error>::Ok(())
706
1
        };
707
1
        if let Err(err) = register_hook(&flag) {
708
            log::error!("Error installing signals for graceful shutdown: {:?}", err);
709
            tokio::time::sleep(Duration::MAX).await;
710
        } else {
711
            loop {
712
4
                match flag.load(Ordering::Relaxed) {
713
4
                    0 => {
714
4
                        // No signal
715
4
                    }
716
                    GRACEFUL_SHUTDOWN => {
717
                        let mut state = fast_async_lock!(shutdown_state);
718
                        match *state {
719
                            SignalShutdownState::Running => {
720
                                log::error!("Interrupt signal received. Shutting down gracefully.");
721
                                let task_server = self.clone();
722
                                let (shutdown_sender, shutdown_receiver) = flume::bounded(1);
723
                                tokio::task::spawn(async move {
724
                                    task_server.shutdown(Some(Duration::from_secs(30))).await?;
725
                                    let _ = shutdown_sender.send(());
726
                                    Result::<(), Error>::Ok(())
727
                                });
728
                                *state = SignalShutdownState::ShuttingDown(shutdown_receiver);
729
                            }
730
                            SignalShutdownState::ShuttingDown(_) => {
731
                                // Two interrupts, go ahead and force the shutdown
732
                                break;
733
                            }
734
                        }
735
                    }
736
                    TERMINATE => {
737
                        log::error!("Quit signal received. Shutting down.");
738
                        break;
739
                    }
740
                    _ => unreachable!(),
741
                }
742

            
743
4
                let state = fast_async_lock!(shutdown_state);
744
4
                if let SignalShutdownState::ShuttingDown(receiver) = &*state {
745
                    if receiver.try_recv().is_ok() {
746
                        // Fully shut down.
747
                        return Ok(());
748
                    }
749
4
                }
750

            
751
4
                tokio::time::sleep(Duration::from_millis(300)).await;
752
            }
753
            self.shutdown(None).await?;
754
        }
755

            
756
        Ok(())
757
    }
758

            
759
    /// Manually authenticates `client` as `user`. `user` can be the user's id
760
    /// ([`u64`]) or the username ([`String`]/[`str`]). Returns the permissions
761
    /// that the user now has.
762
    pub async fn authenticate_client_as<'name, N: Nameable<'name, u64> + Send + Sync>(
763
        &self,
764
        user: N,
765
        client: &ConnectedClient<B>,
766
    ) -> Result<Permissions, Error> {
767
        let admin = self.data.storage.admin().await;
768
        let user = User::load(user, &admin)
769
            .await?
770
            .ok_or(bonsaidb_core::Error::UserNotFound)?;
771

            
772
        let permissions = user.contents.effective_permissions(&admin).await?;
773
        let permissions = Permissions::merged(
774
            [
775
                &permissions,
776
                &self.data.authenticated_permissions,
777
                &self.data.default_permissions,
778
            ]
779
            .into_iter(),
780
        );
781
        client
782
            .logged_in_as(user.header.id, permissions.clone())
783
            .await;
784
        Ok(permissions)
785
    }
786

            
787
24
    async fn publish_message(&self, database: &str, topic: &str, payload: Vec<u8>) {
788
24
        self.data
789
24
            .relay
790
24
            .publish_message(Message {
791
24
                topic: database_topic(database, topic),
792
24
                payload,
793
24
            })
794
            .await;
795
24
    }
796

            
797
3
    async fn publish_serialized_to_all(&self, database: &str, topics: &[String], payload: Vec<u8>) {
798
3
        self.data
799
3
            .relay
800
3
            .publish_serialized_to_all(
801
3
                topics
802
3
                    .iter()
803
9
                    .map(|topic| database_topic(database, topic))
804
3
                    .collect(),
805
3
                payload,
806
3
            )
807
            .await;
808
3
    }
809

            
810
21
    async fn create_subscriber(&self, database: String) -> ServerSubscriber<B> {
811
21
        let subscriber = self.data.relay.create_subscriber().await;
812

            
813
21
        let mut subscribers = fast_async_write!(self.data.subscribers);
814
21
        let subscriber_id = subscriber.id();
815
21
        let receiver = subscriber.receiver().clone();
816
21
        subscribers.insert(subscriber_id, subscriber);
817
21

            
818
21
        ServerSubscriber {
819
21
            server: self.clone(),
820
21
            database,
821
21
            receiver,
822
21
            id: subscriber_id,
823
21
        }
824
21
    }
825

            
826
36
    async fn subscribe_to<S: Into<String> + Send>(
827
36
        &self,
828
36
        subscriber_id: u64,
829
36
        database: &str,
830
36
        topic: S,
831
36
    ) -> Result<(), bonsaidb_core::Error> {
832
36
        let subscribers = fast_async_read!(self.data.subscribers);
833
36
        if let Some(subscriber) = subscribers.get(&subscriber_id) {
834
36
            subscriber
835
36
                .subscribe_to(database_topic(database, &topic.into()))
836
                .await;
837
36
            Ok(())
838
        } else {
839
            Err(bonsaidb_core::Error::Server(String::from(
840
                "invalid subscriber id",
841
            )))
842
        }
843
36
    }
844

            
845
3
    async fn unsubscribe_from(
846
3
        &self,
847
3
        subscriber_id: u64,
848
3
        database: &str,
849
3
        topic: &str,
850
3
    ) -> Result<(), bonsaidb_core::Error> {
851
3
        let subscribers = fast_async_read!(self.data.subscribers);
852
3
        if let Some(subscriber) = subscribers.get(&subscriber_id) {
853
3
            subscriber
854
3
                .unsubscribe_from(&database_topic(database, topic))
855
                .await;
856
3
            Ok(())
857
        } else {
858
            Err(bonsaidb_core::Error::Server(String::from(
859
                "invalid subscriber id",
860
            )))
861
        }
862
3
    }
863
}
864

            
865
impl<B: Backend> Deref for CustomServer<B> {
866
    type Target = Storage;
867

            
868
50964
    fn deref(&self) -> &Self::Target {
869
50964
        &self.data.storage
870
50964
    }
871
}
872

            
873
#[derive(Debug)]
874
struct ClientRequest<B: Backend> {
875
    request: Option<Request<<B::CustomApi as CustomApi>::Request>>,
876
    client: ConnectedClient<B>,
877
    server: CustomServer<B>,
878
    subscribers: Arc<RwLock<HashMap<u64, Subscriber>>>,
879
    sender: flume::Sender<Payload<Response<CustomApiResult<B::CustomApi>>>>,
880
}
881

            
882
impl<B: Backend> ClientRequest<B> {
883
51931
    pub fn new(
884
51931
        request: Request<<B::CustomApi as CustomApi>::Request>,
885
51931
        server: CustomServer<B>,
886
51931
        client: ConnectedClient<B>,
887
51931
        subscribers: Arc<RwLock<HashMap<u64, Subscriber>>>,
888
51931
        sender: flume::Sender<Payload<Response<CustomApiResult<B::CustomApi>>>>,
889
51931
    ) -> Self {
890
51931
        Self {
891
51931
            request: Some(request),
892
51931
            server,
893
51931
            client,
894
51931
            subscribers,
895
51931
            sender,
896
51931
        }
897
51931
    }
898
}
899

            
900
#[async_trait]
901
impl<B: Backend> Job for ClientRequest<B> {
902
    type Output = Response<CustomApiResult<B::CustomApi>>;
903
    type Error = Error;
904

            
905
155793
    #[cfg_attr(feature = "tracing", tracing::instrument)]
906
51931
    async fn execute(&mut self) -> Result<Self::Output, Self::Error> {
907
51931
        let request = self.request.take().unwrap();
908
51931
        ServerDispatcher {
909
51931
            server: &self.server,
910
51931
            client: &self.client,
911
51931
            subscribers: &self.subscribers,
912
51931
            response_sender: &self.sender,
913
51931
        }
914
104427
        .dispatch(&self.client.permissions().await, request)
915
94598
        .await
916
103862
    }
917
}
918

            
919
#[async_trait]
920
impl<B: Backend> StorageConnection for CustomServer<B> {
921
    type Database = ServerDatabase<B>;
922

            
923
651
    async fn create_database_with_schema(
924
651
        &self,
925
651
        name: &str,
926
651
        schema: SchemaName,
927
651
        only_if_needed: bool,
928
651
    ) -> Result<(), bonsaidb_core::Error> {
929
651
        self.data
930
651
            .storage
931
1609
            .create_database_with_schema(name, schema, only_if_needed)
932
1607
            .await
933
1302
    }
934

            
935
68
    async fn database<DB: Schema>(
936
68
        &self,
937
68
        name: &str,
938
68
    ) -> Result<Self::Database, bonsaidb_core::Error> {
939
68
        let db = self.data.storage.database::<DB>(name).await?;
940
68
        Ok(ServerDatabase {
941
68
            server: self.clone(),
942
68
            db,
943
68
        })
944
136
    }
945

            
946
506
    async fn delete_database(&self, name: &str) -> Result<(), bonsaidb_core::Error> {
947
1737
        self.data.storage.delete_database(name).await
948
1012
    }
949

            
950
3
    async fn list_databases(&self) -> Result<Vec<connection::Database>, bonsaidb_core::Error> {
951
3
        self.data.storage.list_databases().await
952
6
    }
953

            
954
3
    async fn list_available_schemas(&self) -> Result<Vec<SchemaName>, bonsaidb_core::Error> {
955
3
        self.data.storage.list_available_schemas().await
956
6
    }
957

            
958
7
    async fn create_user(&self, username: &str) -> Result<u64, bonsaidb_core::Error> {
959
12
        self.data.storage.create_user(username).await
960
14
    }
961

            
962
    #[cfg(feature = "password-hashing")]
963
3
    async fn set_user_password<'user, U: Nameable<'user, u64> + Send + Sync>(
964
3
        &self,
965
3
        user: U,
966
3
        password: bonsaidb_core::connection::SensitiveString,
967
3
    ) -> Result<(), bonsaidb_core::Error> {
968
11
        self.data.storage.set_user_password(user, password).await
969
6
    }
970

            
971
    #[cfg(feature = "password-hashing")]
972
5
    async fn authenticate<'user, U: Nameable<'user, u64> + Send + Sync>(
973
5
        &self,
974
5
        user: U,
975
5
        authentication: Authentication,
976
5
    ) -> Result<Authenticated, bonsaidb_core::Error> {
977
10
        self.data.storage.authenticate(user, authentication).await
978
10
    }
979

            
980
6
    async fn add_permission_group_to_user<
981
6
        'user,
982
6
        'group,
983
6
        U: Nameable<'user, u64> + Send + Sync,
984
6
        G: Nameable<'group, u64> + Send + Sync,
985
6
    >(
986
6
        &self,
987
6
        user: U,
988
6
        permission_group: G,
989
6
    ) -> Result<(), bonsaidb_core::Error> {
990
6
        self.data
991
6
            .storage
992
7
            .add_permission_group_to_user(user, permission_group)
993
7
            .await
994
12
    }
995

            
996
4
    async fn remove_permission_group_from_user<
997
4
        'user,
998
4
        'group,
999
4
        U: Nameable<'user, u64> + Send + Sync,
4
        G: Nameable<'group, u64> + Send + Sync,
4
    >(
4
        &self,
4
        user: U,
4
        permission_group: G,
4
    ) -> Result<(), bonsaidb_core::Error> {
4
        self.data
4
            .storage
4
            .remove_permission_group_from_user(user, permission_group)
3
            .await
8
    }

            
4
    async fn add_role_to_user<
4
        'user,
4
        'group,
4
        U: Nameable<'user, u64> + Send + Sync,
4
        G: Nameable<'group, u64> + Send + Sync,
4
    >(
4
        &self,
4
        user: U,
4
        role: G,
4
    ) -> Result<(), bonsaidb_core::Error> {
6
        self.data.storage.add_role_to_user(user, role).await
8
    }

            
4
    async fn remove_role_from_user<
4
        'user,
4
        'group,
4
        U: Nameable<'user, u64> + Send + Sync,
4
        G: Nameable<'group, u64> + Send + Sync,
4
    >(
4
        &self,
4
        user: U,
4
        role: G,
4
    ) -> Result<(), bonsaidb_core::Error> {
4
        self.data.storage.remove_role_from_user(user, role).await
8
    }
}

            
103862
#[derive(Dispatcher, Debug)]
#[dispatcher(input = Request<<B::CustomApi as CustomApi>::Request>, input = ServerRequest, actionable = bonsaidb_core::actionable)]
struct ServerDispatcher<'s, B>
where
    B: Backend,
{
    server: &'s CustomServer<B>,
    client: &'s ConnectedClient<B>,
    subscribers: &'s Arc<RwLock<HashMap<u64, Subscriber>>>,
    response_sender: &'s flume::Sender<Payload<Response<CustomApiResult<B::CustomApi>>>>,
}

            
#[async_trait]
impl<'s, B: Backend> RequestDispatcher for ServerDispatcher<'s, B> {
    type Subaction = <B::CustomApi as CustomApi>::Request;
    type Output = Response<CustomApiResult<B::CustomApi>>;
    type Error = Error;

            
14
    async fn handle_subaction(
14
        &self,
14
        permissions: &Permissions,
14
        subaction: Self::Subaction,
14
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
14
        let dispatcher =
14
            <B::CustomApiDispatcher as CustomApiDispatcher<B>>::new(self.server, self.client);
14
        match dispatcher.dispatch(permissions, subaction).await {
10
            Ok(response) => Ok(Response::Api(Ok(response))),
4
            Err(err) => match err {
                BackendError::Backend(backend) => Ok(Response::Api(Err(backend))),
4
                BackendError::Server(server) => Err(server),
            },
        }
28
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::ServerHandler for ServerDispatcher<'s, B> {
1103
    async fn handle(
1103
        &self,
1103
        permissions: &Permissions,
1103
        request: ServerRequest,
1103
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
3239
        ServerRequestDispatcher::dispatch_to_handlers(self, permissions, request).await
2206
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::DatabaseHandler for ServerDispatcher<'s, B> {
50814
    async fn handle(
50814
        &self,
50814
        permissions: &Permissions,
50814
        database_name: String,
50814
        request: DatabaseRequest,
50814
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
50814
        let database = self
50814
            .server
94665
            .database_without_schema_internal(&database_name)
60891
            .await?;
50814
        DatabaseDispatcher {
50814
            name: database_name,
50814
            database,
50814
            server_dispatcher: self,
50814
        }
50815
        .dispatch(permissions, request)
30468
        .await
101628
    }
}

            
impl<'s, B: Backend> ServerRequestDispatcher for ServerDispatcher<'s, B> {
    type Output = Response<CustomApiResult<B::CustomApi>>;
    type Error = Error;
}

            
#[async_trait]
impl<'s, B: Backend> CreateDatabaseHandler for ServerDispatcher<'s, B> {
    type Action = BonsaiAction;

            
570
    async fn resource_name<'a>(
570
        &'a self,
570
        database: &'a bonsaidb_core::connection::Database,
570
        _only_if_needed: &'a bool,
570
    ) -> Result<ResourceName<'a>, Error> {
570
        Ok(database_resource_name(&database.name))
570
    }

            
570
    fn action() -> Self::Action {
570
        BonsaiAction::Server(ServerAction::CreateDatabase)
570
    }

            
570
    async fn handle_protected(
570
        &self,
570
        _permissions: &Permissions,
570
        database: bonsaidb_core::connection::Database,
570
        only_if_needed: bool,
570
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
570
        self.server
1469
            .create_database_with_schema(&database.name, database.schema, only_if_needed)
1469
            .await?;
564
        Ok(Response::Server(ServerResponse::DatabaseCreated {
564
            name: database.name.clone(),
564
        }))
1140
    }
}

            
#[async_trait]
impl<'s, B: Backend> DeleteDatabaseHandler for ServerDispatcher<'s, B> {
    type Action = BonsaiAction;

            
504
    async fn resource_name<'a>(&'a self, database: &'a String) -> Result<ResourceName<'a>, Error> {
504
        Ok(database_resource_name(database))
504
    }

            
504
    fn action() -> Self::Action {
504
        BonsaiAction::Server(ServerAction::DeleteDatabase)
504
    }

            
504
    async fn handle_protected(
504
        &self,
504
        _permissions: &Permissions,
504
        name: String,
504
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
1734
        self.server.delete_database(&name).await?;
502
        Ok(Response::Server(ServerResponse::DatabaseDeleted { name }))
1008
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::ListDatabasesHandler for ServerDispatcher<'s, B> {
    type Action = BonsaiAction;

            
2
    async fn resource_name<'a>(&'a self) -> Result<ResourceName<'a>, Error> {
2
        Ok(bonsaidb_resource_name())
2
    }

            
2
    fn action() -> Self::Action {
2
        BonsaiAction::Server(ServerAction::ListDatabases)
2
    }

            
2
    async fn handle_protected(
2
        &self,
2
        _permissions: &Permissions,
2
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
        Ok(Response::Server(ServerResponse::Databases(
2
            self.server.list_databases().await?,
        )))
4
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::ListAvailableSchemasHandler
    for ServerDispatcher<'s, B>
{
    type Action = BonsaiAction;

            
2
    async fn resource_name<'a>(&'a self) -> Result<ResourceName<'a>, Error> {
2
        Ok(bonsaidb_resource_name())
2
    }

            
2
    fn action() -> Self::Action {
2
        BonsaiAction::Server(ServerAction::ListAvailableSchemas)
2
    }

            
2
    async fn handle_protected(
2
        &self,
2
        _permissions: &Permissions,
2
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
        Ok(Response::Server(ServerResponse::AvailableSchemas(
2
            self.server.list_available_schemas().await?,
        )))
4
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::CreateUserHandler for ServerDispatcher<'s, B> {
    type Action = BonsaiAction;

            
4
    async fn resource_name<'a>(&'a self, _username: &'a String) -> Result<ResourceName<'a>, Error> {
4
        Ok(bonsaidb_resource_name())
4
    }

            
4
    fn action() -> Self::Action {
4
        BonsaiAction::Server(ServerAction::CreateUser)
4
    }

            
3
    async fn handle_protected(
3
        &self,
3
        _permissions: &Permissions,
3
        username: String,
3
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
        Ok(Response::Server(ServerResponse::UserCreated {
3
            id: self.server.create_user(&username).await?,
        }))
6
    }
}

            
#[cfg(feature = "password-hashing")]
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::SetUserPasswordHandler for ServerDispatcher<'s, B> {
    type Action = BonsaiAction;

            
    async fn resource_name<'a>(
        &'a self,
        user: &'a NamedReference<'static, u64>,
        _password: &'a bonsaidb_core::connection::SensitiveString,
    ) -> Result<ResourceName<'a>, Error> {
        let id = user
            .id::<User, _>(&self.server.admin().await)
            .await?
            .ok_or(bonsaidb_core::Error::UserNotFound)?;

            
        Ok(user_resource_name(id))
    }

            
    fn action() -> Self::Action {
        BonsaiAction::Server(ServerAction::SetPassword)
    }

            
    async fn handle_protected(
        &self,
        _permissions: &Permissions,
        username: NamedReference<'static, u64>,
        password: bonsaidb_core::connection::SensitiveString,
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
        self.server.set_user_password(username, password).await?;
        Ok(Response::Ok)
    }
}

            
#[async_trait]
#[cfg(feature = "password-hashing")]
impl<'s, B: Backend> bonsaidb_core::networking::AuthenticateHandler for ServerDispatcher<'s, B> {
5
    async fn verify_permissions(
5
        &self,
5
        permissions: &Permissions,
5
        user: &NamedReference<'static, u64>,
5
        authentication: &Authentication,
5
    ) -> Result<(), Error> {
5
        let id = user
5
            .id::<User, _>(&self.server.admin().await)
4
            .await?
5
            .ok_or(bonsaidb_core::Error::UserNotFound)?;
5
        match authentication {
5
            Authentication::Password(_) => {
5
                permissions.check(
5
                    user_resource_name(id),
5
                    &BonsaiAction::Server(ServerAction::Authenticate(
5
                        AuthenticationMethod::PasswordHash,
5
                    )),
5
                )?;
5
                Ok(())
            }
        }
10
    }

            
5
    async fn handle_protected(
5
        &self,
5
        _permissions: &Permissions,
5
        username: NamedReference<'static, u64>,
5
        authentication: Authentication,
5
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
5
        let mut response = self
5
            .server
10
            .authenticate(username.clone(), authentication)
10
            .await?;

            
        // TODO this should be handled by the storage layer
5
        response.permissions = Permissions::merged([
5
            &response.permissions,
5
            &self.server.data.authenticated_permissions,
5
            &self.server.data.default_permissions,
5
        ]);
5

            
5
        self.client
5
            .logged_in_as(response.user_id, response.permissions.clone())
            .await;
5
        Ok(Response::Server(ServerResponse::Authenticated(response)))
10
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::AlterUserPermissionGroupMembershipHandler
    for ServerDispatcher<'s, B>
{
    type Action = BonsaiAction;

            
8
    async fn resource_name<'a>(
8
        &'a self,
8
        user: &'a NamedReference<'static, u64>,
8
        _group: &'a NamedReference<'static, u64>,
8
        _should_be_member: &'a bool,
8
    ) -> Result<ResourceName<'a>, Error> {
8
        let id = user
8
            .id::<User, _>(&self.server.admin().await)
2
            .await?
8
            .ok_or(bonsaidb_core::Error::UserNotFound)?;

            
8
        Ok(user_resource_name(id))
16
    }

            
8
    fn action() -> Self::Action {
8
        BonsaiAction::Server(ServerAction::ModifyUserPermissionGroups)
8
    }

            
8
    async fn handle_protected(
8
        &self,
8
        _permissions: &Permissions,
8
        user: NamedReference<'static, u64>,
8
        group: NamedReference<'static, u64>,
8
        should_be_member: bool,
8
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
8
        if should_be_member {
4
            self.server
5
                .add_permission_group_to_user(user, group)
5
                .await?;
        } else {
4
            self.server
4
                .remove_permission_group_from_user(user, group)
3
                .await?;
        }

            
8
        Ok(Response::Ok)
16
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::AlterUserRoleMembershipHandler
    for ServerDispatcher<'s, B>
{
    type Action = BonsaiAction;

            
8
    async fn resource_name<'a>(
8
        &'a self,
8
        user: &'a NamedReference<'static, u64>,
8
        _role: &'a NamedReference<'static, u64>,
8
        _should_be_member: &'a bool,
8
    ) -> Result<ResourceName<'a>, Error> {
8
        let id = user
8
            .id::<User, _>(&self.server.admin().await)
            .await?
8
            .ok_or(bonsaidb_core::Error::UserNotFound)?;

            
8
        Ok(user_resource_name(id))
16
    }

            
8
    fn action() -> Self::Action {
8
        BonsaiAction::Server(ServerAction::ModifyUserRoles)
8
    }

            
8
    async fn handle_protected(
8
        &self,
8
        _permissions: &Permissions,
8
        user: NamedReference<'static, u64>,
8
        role: NamedReference<'static, u64>,
8
        should_be_member: bool,
8
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
8
        if should_be_member {
6
            self.server.add_role_to_user(user, role).await?;
        } else {
4
            self.server.remove_role_from_user(user, role).await?;
        }

            
8
        Ok(Response::Ok)
16
    }
}

            
101628
#[derive(Dispatcher, Debug)]
#[dispatcher(input = DatabaseRequest, actionable = bonsaidb_core::actionable)]
struct DatabaseDispatcher<'s, B>
where
    B: Backend,
{
    name: String,
    database: Database,
    server_dispatcher: &'s ServerDispatcher<'s, B>,
}

            
impl<'s, B: Backend> DatabaseRequestDispatcher for DatabaseDispatcher<'s, B> {
    type Output = Response<CustomApiResult<B::CustomApi>>;
    type Error = Error;
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::GetHandler for DatabaseDispatcher<'s, B> {
    type Action = BonsaiAction;

            
7894
    async fn resource_name<'a>(
7894
        &'a self,
7894
        collection: &'a CollectionName,
7894
        id: &'a DocumentId,
7894
    ) -> Result<ResourceName<'a>, Error> {
7894
        Ok(document_resource_name(&self.name, collection, id))
7894
    }

            
7894
    fn action() -> Self::Action {
7894
        BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Get))
7894
    }

            
7894
    async fn handle_protected(
7894
        &self,
7894
        _permissions: &Permissions,
7894
        collection: CollectionName,
7894
        id: DocumentId,
7894
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
7894
        let document = self
7894
            .database
7894
            .internal_get_from_collection_id(id, &collection)
4843
            .await?
7894
            .ok_or_else(|| {
504
                Error::Core(bonsaidb_core::Error::DocumentNotFound(
504
                    collection,
504
                    Box::new(id),
504
                ))
7894
            })?;
7390
        Ok(Response::Database(DatabaseResponse::Documents(vec![
7390
            document,
7390
        ])))
15788
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::GetMultipleHandler for DatabaseDispatcher<'s, B> {
4
    async fn verify_permissions(
4
        &self,
4
        permissions: &Permissions,
4
        collection: &CollectionName,
4
        ids: &Vec<DocumentId>,
4
    ) -> Result<(), Error> {
12
        for id in ids {
8
            let document_name = document_resource_name(&self.name, collection, id);
8
            let action = BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Get));
8
            permissions.check(&document_name, &action)?;
        }

            
4
        Ok(())
8
    }

            
4
    async fn handle_protected(
4
        &self,
4
        _permissions: &Permissions,
4
        collection: CollectionName,
4
        ids: Vec<DocumentId>,
4
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
4
        let documents = self
4
            .database
4
            .internal_get_multiple_from_collection_id(&ids, &collection)
3
            .await?;
4
        Ok(Response::Database(DatabaseResponse::Documents(documents)))
8
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::ListHandler for DatabaseDispatcher<'s, B> {
    type Action = BonsaiAction;

            
8
    async fn resource_name<'a>(
8
        &'a self,
8
        collection: &'a CollectionName,
8
        _ids: &'a Range<DocumentId>,
8
        _order: &'a Sort,
8
        _limit: &'a Option<usize>,
8
    ) -> Result<ResourceName<'a>, Error> {
8
        Ok(collection_resource_name(&self.name, collection))
8
    }

            
8
    fn action() -> Self::Action {
8
        BonsaiAction::Database(DatabaseAction::Document(DocumentAction::List))
8
    }

            
8
    async fn handle_protected(
8
        &self,
8
        _permissions: &Permissions,
8
        collection: CollectionName,
8
        ids: Range<DocumentId>,
8
        order: Sort,
8
        limit: Option<usize>,
8
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
8
        let documents = self
8
            .database
8
            .list_from_collection(ids, order, limit, &collection)
4
            .await?;
8
        Ok(Response::Database(DatabaseResponse::Documents(documents)))
16
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::QueryHandler for DatabaseDispatcher<'s, B> {
    type Action = BonsaiAction;

            
3890
    async fn resource_name<'a>(
3890
        &'a self,
3890
        view: &'a ViewName,
3890
        _key: &'a Option<QueryKey<Bytes>>,
3890
        _order: &'a Sort,
3890
        _limit: &'a Option<usize>,
3890
        _access_policy: &'a AccessPolicy,
3890
        _with_docs: &'a bool,
3890
    ) -> Result<ResourceName<'a>, Error> {
3890
        Ok(view_resource_name(&self.name, view))
3890
    }

            
3890
    fn action() -> Self::Action {
3890
        BonsaiAction::Database(DatabaseAction::View(ViewAction::Query))
3890
    }

            
3890
    async fn handle_protected(
3890
        &self,
3890
        _permissions: &Permissions,
3890
        view: ViewName,
3890
        key: Option<QueryKey<Bytes>>,
3890
        order: Sort,
3890
        limit: Option<usize>,
3890
        access_policy: AccessPolicy,
3890
        with_docs: bool,
3890
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
3890
        if with_docs {
3840
            let mappings = self
3840
                .database
4034
                .query_by_name_with_docs(&view, key, order, limit, access_policy)
4034
                .await?;
3840
            Ok(Response::Database(DatabaseResponse::ViewMappingsWithDocs(
3840
                mappings,
3840
            )))
        } else {
50
            let mappings = self
50
                .database
50
                .query_by_name(&view, key, order, limit, access_policy)
35
                .await?;
50
            Ok(Response::Database(DatabaseResponse::ViewMappings(mappings)))
        }
7780
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::ReduceHandler for DatabaseDispatcher<'s, B> {
    type Action = BonsaiAction;

            
7715
    async fn resource_name<'a>(
7715
        &'a self,
7715
        view: &'a ViewName,
7715
        _key: &'a Option<QueryKey<Bytes>>,
7715
        _access_policy: &'a AccessPolicy,
7715
        _grouped: &'a bool,
7715
    ) -> Result<ResourceName<'a>, Error> {
7715
        Ok(view_resource_name(&self.name, view))
7715
    }

            
7715
    fn action() -> Self::Action {
7715
        BonsaiAction::Database(DatabaseAction::View(ViewAction::Reduce))
7715
    }

            
7715
    async fn handle_protected(
7715
        &self,
7715
        _permissions: &Permissions,
7715
        view: ViewName,
7715
        key: Option<QueryKey<Bytes>>,
7715
        access_policy: AccessPolicy,
7715
        grouped: bool,
7715
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
7715
        if grouped {
6
            let values = self
6
                .database
8
                .reduce_grouped_by_name(&view, key, access_policy)
6
                .await?;
6
            Ok(Response::Database(DatabaseResponse::ViewGroupedReduction(
6
                values,
6
            )))
        } else {
7709
            let value = self
7709
                .database
7710
                .reduce_by_name(&view, key, access_policy)
150
                .await?;
7707
            Ok(Response::Database(DatabaseResponse::ViewReduction(
7707
                Bytes::from(value),
7707
            )))
        }
15430
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::ApplyTransactionHandler
    for DatabaseDispatcher<'s, B>
{
9977
    async fn verify_permissions(
9977
        &self,
9977
        permissions: &Permissions,
9977
        transaction: &Transaction,
9977
    ) -> Result<(), Error> {
23832
        for op in &transaction.operations {
13857
            let (resource, action) = match &op.command {
9695
                Command::Insert { .. } => (
9695
                    collection_resource_name(&self.name, &op.collection),
9695
                    BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Insert)),
9695
                ),
2946
                Command::Update { header, .. } => (
2946
                    document_resource_name(&self.name, &op.collection, &header.id),
2946
                    BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Update)),
2946
                ),
504
                Command::Overwrite { id, .. } => (
504
                    document_resource_name(&self.name, &op.collection, id),
504
                    BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Overwrite)),
504
                ),
712
                Command::Delete { header } => (
712
                    document_resource_name(&self.name, &op.collection, &header.id),
712
                    BonsaiAction::Database(DatabaseAction::Document(DocumentAction::Delete)),
712
                ),
            };
13857
            permissions.check(&resource, &action)?;
        }

            
9975
        Ok(())
19954
    }

            
9975
    async fn handle_protected(
9975
        &self,
9975
        _permissions: &Permissions,
9975
        transaction: Transaction,
9975
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
9975
        let results = self.database.apply_transaction(transaction).await?;
9461
        Ok(Response::Database(DatabaseResponse::TransactionResults(
9461
            results,
9461
        )))
19950
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::DeleteDocsHandler for DatabaseDispatcher<'s, B> {
    type Action = BonsaiAction;

            
2
    async fn resource_name<'a>(
2
        &'a self,
2
        view: &'a ViewName,
2
        _key: &'a Option<QueryKey<Bytes>>,
2
        _access_policy: &'a AccessPolicy,
2
    ) -> Result<ResourceName<'a>, Error> {
2
        Ok(view_resource_name(&self.name, view))
2
    }

            
2
    fn action() -> Self::Action {
2
        BonsaiAction::Database(DatabaseAction::View(ViewAction::DeleteDocs))
2
    }

            
2
    async fn handle_protected(
2
        &self,
2
        _permissions: &Permissions,
2
        view: ViewName,
2
        key: Option<QueryKey<Bytes>>,
2
        access_policy: AccessPolicy,
2
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
2
        let count = self
2
            .database
4
            .delete_docs_by_name(&view, key, access_policy)
4
            .await?;
2
        Ok(Response::Database(DatabaseResponse::DocumentsDeleted(
2
            count,
2
        )))
4
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::ListExecutedTransactionsHandler
    for DatabaseDispatcher<'s, B>
{
    type Action = BonsaiAction;

            
1036
    async fn resource_name<'a>(
1036
        &'a self,
1036
        _starting_id: &'a Option<u64>,
1036
        _result_limit: &'a Option<usize>,
1036
    ) -> Result<ResourceName<'a>, Error> {
1036
        Ok(database_resource_name(&self.name))
1036
    }

            
1036
    fn action() -> Self::Action {
1036
        BonsaiAction::Database(DatabaseAction::Transaction(TransactionAction::ListExecuted))
1036
    }

            
1036
    async fn handle_protected(
1036
        &self,
1036
        _permissions: &Permissions,
1036
        starting_id: Option<u64>,
1036
        result_limit: Option<usize>,
1036
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
        Ok(Response::Database(DatabaseResponse::ExecutedTransactions(
1036
            self.database
1036
                .list_executed_transactions(starting_id, result_limit)
824
                .await?,
        )))
2072
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::LastTransactionIdHandler
    for DatabaseDispatcher<'s, B>
{
    type Action = BonsaiAction;

            
2
    async fn resource_name<'a>(&'a self) -> Result<ResourceName<'a>, Error> {
2
        Ok(database_resource_name(&self.name))
2
    }

            
2
    fn action() -> Self::Action {
2
        BonsaiAction::Database(DatabaseAction::Transaction(TransactionAction::GetLastId))
2
    }

            
2
    async fn handle_protected(
2
        &self,
2
        _permissions: &Permissions,
2
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
        Ok(Response::Database(DatabaseResponse::LastTransactionId(
2
            self.database.last_transaction_id().await?,
        )))
4
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::CreateSubscriberHandler
    for DatabaseDispatcher<'s, B>
{
    type Action = BonsaiAction;

            
14
    async fn resource_name<'a>(&'a self) -> Result<ResourceName<'a>, Error> {
14
        Ok(database_resource_name(&self.name))
14
    }

            
14
    fn action() -> Self::Action {
14
        BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::CreateSuscriber))
14
    }

            
14
    async fn handle_protected(
14
        &self,
14
        _permissions: &Permissions,
14
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
14
        let server = self.server_dispatcher.server;
14
        let subscriber = server.create_subscriber(self.name.clone()).await;
14
        let subscriber_id = subscriber.id;
14
        self.server_dispatcher
14
            .client
14
            .register_subscriber(subscriber_id)
            .await;

            
14
        let task_self = server.clone();
14
        let response_sender = self.server_dispatcher.response_sender.clone();
14
        tokio::spawn(async move {
14
            task_self
14
                .forward_notifications_for(
14
                    subscriber.id,
14
                    subscriber.receiver,
14
                    response_sender.clone(),
22
                )
22
                .await;
14
        });
14
        Ok(Response::Database(DatabaseResponse::SubscriberCreated {
14
            subscriber_id,
14
        }))
28
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::PublishHandler for DatabaseDispatcher<'s, B> {
    type Action = BonsaiAction;

            
16
    async fn resource_name<'a>(
16
        &'a self,
16
        topic: &'a String,
16
        _payload: &'a Bytes,
16
    ) -> Result<ResourceName<'a>, Error> {
16
        Ok(pubsub_topic_resource_name(&self.name, topic))
16
    }

            
16
    fn action() -> Self::Action {
16
        BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::Publish))
16
    }

            
16
    async fn handle_protected(
16
        &self,
16
        _permissions: &Permissions,
16
        topic: String,
16
        payload: Bytes,
16
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
16
        self.server_dispatcher
16
            .server
16
            .publish_message(&self.name, &topic, payload.into_vec())
            .await;
16
        Ok(Response::Ok)
32
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::PublishToAllHandler for DatabaseDispatcher<'s, B> {
2
    async fn verify_permissions(
2
        &self,
2
        permissions: &Permissions,
2
        topics: &Vec<String>,
2
        _payload: &Bytes,
2
    ) -> Result<(), Error> {
8
        for topic in topics {
6
            let topic_name = pubsub_topic_resource_name(&self.name, topic);
6
            let action = BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::Publish));
6
            if !permissions.allowed_to(&topic_name, &action) {
                return Err(Error::from(PermissionDenied {
                    resource: topic_name.to_owned(),
                    action: action.name(),
                }));
6
            }
        }

            
2
        Ok(())
4
    }

            
2
    async fn handle_protected(
2
        &self,
2
        _permissions: &Permissions,
2
        topics: Vec<String>,
2
        payload: Bytes,
2
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
2
        self.server_dispatcher
2
            .server
2
            .publish_serialized_to_all(&self.name, &topics, payload.into_vec())
            .await;
2
        Ok(Response::Ok)
4
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::SubscribeToHandler for DatabaseDispatcher<'s, B> {
    type Action = BonsaiAction;

            
24
    async fn resource_name<'a>(
24
        &'a self,
24
        _subscriber_id: &'a u64,
24
        topic: &'a String,
24
    ) -> Result<ResourceName<'a>, Error> {
24
        Ok(pubsub_topic_resource_name(&self.name, topic))
24
    }

            
24
    fn action() -> Self::Action {
24
        BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::SubscribeTo))
24
    }

            
24
    async fn handle_protected(
24
        &self,
24
        _permissions: &Permissions,
24
        subscriber_id: u64,
24
        topic: String,
24
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
24
        if self
24
            .server_dispatcher
24
            .client
24
            .owns_subscriber(subscriber_id)
            .await
        {
24
            self.server_dispatcher
24
                .server
24
                .subscribe_to(subscriber_id, &self.name, topic)
                .await
24
                .map(|_| Response::Ok)
24
                .map_err(Error::from)
        } else {
            Err(Error::Transport(String::from("invalid subscriber_id")))
        }
48
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::UnsubscribeFromHandler
    for DatabaseDispatcher<'s, B>
{
    type Action = BonsaiAction;

            
2
    async fn resource_name<'a>(
2
        &'a self,
2
        _subscriber_id: &'a u64,
2
        topic: &'a String,
2
    ) -> Result<ResourceName<'a>, Error> {
2
        Ok(pubsub_topic_resource_name(&self.name, topic))
2
    }

            
2
    fn action() -> Self::Action {
2
        BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::UnsubscribeFrom))
2
    }

            
2
    async fn handle_protected(
2
        &self,
2
        _permissions: &Permissions,
2
        subscriber_id: u64,
2
        topic: String,
2
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
2
        if self
2
            .server_dispatcher
2
            .client
2
            .owns_subscriber(subscriber_id)
            .await
        {
2
            self.server_dispatcher
2
                .server
2
                .unsubscribe_from(subscriber_id, &self.name, &topic)
                .await
2
                .map(|_| Response::Ok)
2
                .map_err(Error::from)
        } else {
            Err(Error::Transport(String::from("invalid subscriber_id")))
        }
4
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::UnregisterSubscriberHandler
    for DatabaseDispatcher<'s, B>
{
    async fn handle(
        &self,
        _permissions: &Permissions,
        subscriber_id: u64,
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
        if self
            .server_dispatcher
            .client
            .remove_subscriber(subscriber_id)
            .await
        {
            let mut subscribers = fast_async_write!(self.server_dispatcher.subscribers);
            if subscribers.remove(&subscriber_id).is_none() {
                Ok(Response::Error(bonsaidb_core::Error::Server(String::from(
                    "invalid subscriber id",
                ))))
            } else {
                Ok(Response::Ok)
            }
        } else {
            Err(Error::Transport(String::from("invalid subscriber_id")))
        }
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::ExecuteKeyOperationHandler
    for DatabaseDispatcher<'s, B>
{
    type Action = BonsaiAction;

            
20222
    async fn resource_name<'a>(&'a self, op: &'a KeyOperation) -> Result<ResourceName<'a>, Error> {
20222
        Ok(keyvalue_key_resource_name(
20222
            &self.name,
20222
            op.namespace.as_deref(),
20222
            &op.key,
20222
        ))
20222
    }

            
20222
    fn action() -> Self::Action {
20222
        BonsaiAction::Database(DatabaseAction::KeyValue(KeyValueAction::ExecuteOperation))
20222
    }

            
20222
    async fn handle_protected(
20222
        &self,
20222
        _permissions: &Permissions,
20222
        op: KeyOperation,
20222
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
20222
        let result = self.database.execute_key_operation(op).await?;
20214
        Ok(Response::Database(DatabaseResponse::KvOutput(result)))
40444
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::CompactCollectionHandler
    for DatabaseDispatcher<'s, B>
{
    type Action = BonsaiAction;

            
2
    async fn resource_name<'a>(
2
        &'a self,
2
        collection: &'a CollectionName,
2
    ) -> Result<ResourceName<'a>, Error> {
2
        Ok(collection_resource_name(&self.name, collection))
2
    }

            
2
    fn action() -> Self::Action {
2
        BonsaiAction::Database(DatabaseAction::Compact)
2
    }

            
2
    async fn handle_protected(
2
        &self,
2
        _permissions: &Permissions,
2
        collection: CollectionName,
2
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
2
        self.database.compact_collection_by_name(collection).await?;

            
2
        Ok(Response::Ok)
4
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::CompactKeyValueStoreHandler
    for DatabaseDispatcher<'s, B>
{
    type Action = BonsaiAction;

            
2
    async fn resource_name<'a>(&'a self) -> Result<ResourceName<'a>, Error> {
2
        Ok(kv_resource_name(&self.name))
2
    }

            
2
    fn action() -> Self::Action {
2
        BonsaiAction::Database(DatabaseAction::Compact)
2
    }

            
2
    async fn handle_protected(
2
        &self,
2
        _permissions: &Permissions,
2
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
2
        self.database.compact_key_value_store().await?;

            
2
        Ok(Response::Ok)
4
    }
}

            
#[async_trait]
impl<'s, B: Backend> bonsaidb_core::networking::CompactHandler for DatabaseDispatcher<'s, B> {
    type Action = BonsaiAction;

            
2
    async fn resource_name<'a>(&'a self) -> Result<ResourceName<'a>, Error> {
2
        Ok(database_resource_name(&self.name))
2
    }

            
2
    fn action() -> Self::Action {
2
        BonsaiAction::Database(DatabaseAction::Compact)
2
    }

            
2
    async fn handle_protected(
2
        &self,
2
        _permissions: &Permissions,
2
    ) -> Result<Response<CustomApiResult<B::CustomApi>>, Error> {
2
        self.database.compact().await?;

            
2
        Ok(Response::Ok)
4
    }
}

            
80
#[derive(Default)]
struct AlpnKeys(Arc<std::sync::Mutex<HashMap<String, Arc<rustls::sign::CertifiedKey>>>>);

            
impl Debug for AlpnKeys {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_tuple("AlpnKeys").finish()
    }
}

            
impl Deref for AlpnKeys {
    type Target = Arc<std::sync::Mutex<HashMap<String, Arc<rustls::sign::CertifiedKey>>>>;

            
    fn deref(&self) -> &Self::Target {
        &self.0
    }
}