1
use bonsaidb_core::api::ApiName;
2
use bonsaidb_core::arc_bytes::serde::Bytes;
3
use bonsaidb_core::async_trait::async_trait;
4
use bonsaidb_core::connection::{
5
    AsyncConnection, AsyncLowLevelConnection, AsyncStorageConnection, HasSession,
6
};
7
use bonsaidb_core::keyvalue::AsyncKeyValue;
8
use bonsaidb_core::networking::{
9
    AlterUserPermissionGroupMembership, AlterUserRoleMembership, ApplyTransaction, AssumeIdentity,
10
    Compact, CompactCollection, CompactKeyValueStore, Count, CreateDatabase, CreateSubscriber,
11
    CreateUser, DeleteDatabase, DeleteDocs, DeleteUser, ExecuteKeyOperation, Get, GetMultiple,
12
    LastTransactionId, List, ListAvailableSchemas, ListDatabases, ListExecutedTransactions,
13
    ListHeaders, LogOutSession, Publish, PublishToAll, Query, QueryWithDocs, Reduce, ReduceGrouped,
14
    SubscribeTo, UnregisterSubscriber, UnsubscribeFrom,
15
};
16
#[cfg(feature = "password-hashing")]
17
use bonsaidb_core::networking::{Authenticate, SetUserPassword};
18
use bonsaidb_core::pubsub::AsyncPubSub;
19

            
20
use crate::api::{Handler, HandlerError, HandlerResult, HandlerSession};
21
use crate::{Backend, Error, ServerConfiguration};
22

            
23
#[cfg_attr(not(feature = "password-hashing"), allow(unused_mut))]
24
92
pub fn register_api_handlers<B: Backend>(
25
92
    config: ServerConfiguration<B>,
26
92
) -> Result<ServerConfiguration<B>, Error> {
27
92
    let mut config = config
28
92
        .with_api::<ServerDispatcher, AlterUserPermissionGroupMembership>()?
29
92
        .with_api::<ServerDispatcher, AlterUserRoleMembership>()?
30
92
        .with_api::<ServerDispatcher, ApplyTransaction>()?
31
92
        .with_api::<ServerDispatcher, AssumeIdentity>()?
32
92
        .with_api::<ServerDispatcher, Compact>()?
33
92
        .with_api::<ServerDispatcher, CompactCollection>()?
34
92
        .with_api::<ServerDispatcher, CompactKeyValueStore>()?
35
92
        .with_api::<ServerDispatcher, Count>()?
36
92
        .with_api::<ServerDispatcher, CreateDatabase>()?
37
92
        .with_api::<ServerDispatcher, CreateSubscriber>()?
38
92
        .with_api::<ServerDispatcher, CreateUser>()?
39
92
        .with_api::<ServerDispatcher, DeleteDatabase>()?
40
92
        .with_api::<ServerDispatcher, DeleteDocs>()?
41
92
        .with_api::<ServerDispatcher, DeleteUser>()?
42
92
        .with_api::<ServerDispatcher, ExecuteKeyOperation>()?
43
92
        .with_api::<ServerDispatcher, Get>()?
44
92
        .with_api::<ServerDispatcher, GetMultiple>()?
45
92
        .with_api::<ServerDispatcher, LastTransactionId>()?
46
92
        .with_api::<ServerDispatcher, List>()?
47
92
        .with_api::<ServerDispatcher, ListHeaders>()?
48
92
        .with_api::<ServerDispatcher, ListAvailableSchemas>()?
49
92
        .with_api::<ServerDispatcher, ListDatabases>()?
50
92
        .with_api::<ServerDispatcher, ListExecutedTransactions>()?
51
92
        .with_api::<ServerDispatcher, LogOutSession>()?
52
92
        .with_api::<ServerDispatcher, Publish>()?
53
92
        .with_api::<ServerDispatcher, PublishToAll>()?
54
92
        .with_api::<ServerDispatcher, Query>()?
55
92
        .with_api::<ServerDispatcher, QueryWithDocs>()?
56
92
        .with_api::<ServerDispatcher, Reduce>()?
57
92
        .with_api::<ServerDispatcher, ReduceGrouped>()?
58
92
        .with_api::<ServerDispatcher, SubscribeTo>()?
59
92
        .with_api::<ServerDispatcher, UnregisterSubscriber>()?
60
92
        .with_api::<ServerDispatcher, UnsubscribeFrom>()?;
61

            
62
    #[cfg(feature = "password-hashing")]
63
92
    {
64
92
        config = config
65
92
            .with_api::<ServerDispatcher, Authenticate>()?
66
92
            .with_api::<ServerDispatcher, SetUserPassword>()?;
67
    }
68

            
69
92
    Ok(config)
70
92
}
71

            
72
#[derive(Debug)]
73
pub struct ServerDispatcher;
74
impl ServerDispatcher {
75
72953
    pub async fn dispatch_api_request<B: Backend>(
76
72953
        session: HandlerSession<'_, B>,
77
72953
        name: &ApiName,
78
72953
        request: Bytes,
79
72953
    ) -> Result<Bytes, Error> {
80
72953
        if let Some(dispatcher) = session.server.custom_api_dispatcher(name) {
81
88613
            dispatcher.handle(session, &request).await
82
        } else {
83
            Err(Error::from(bonsaidb_core::Error::ApiNotFound(name.clone())))
84
        }
85
72949
    }
86
}
87

            
88
#[async_trait]
89
impl<B: Backend> Handler<CreateDatabase, B> for ServerDispatcher {
90
623
    async fn handle(
91
623
        session: HandlerSession<'_, B>,
92
623
        request: CreateDatabase,
93
623
    ) -> HandlerResult<CreateDatabase> {
94
623
        session
95
623
            .as_client
96
623
            .create_database_with_schema(
97
623
                &request.database.name,
98
623
                request.database.schema,
99
623
                request.only_if_needed,
100
623
            )
101
623
            .await?;
102
614
        Ok(())
103
1246
    }
104
}
105

            
106
#[async_trait]
107
impl<B: Backend> Handler<DeleteDatabase, B> for ServerDispatcher {
108
506
    async fn handle(
109
506
        session: HandlerSession<'_, B>,
110
506
        command: DeleteDatabase,
111
506
    ) -> HandlerResult<DeleteDatabase> {
112
506
        session.as_client.delete_database(&command.name).await?;
113
503
        Ok(())
114
1012
    }
115
}
116

            
117
#[async_trait]
118
impl<B: Backend> Handler<ListDatabases, B> for ServerDispatcher {
119
3
    async fn handle(
120
3
        session: HandlerSession<'_, B>,
121
3
        _command: ListDatabases,
122
3
    ) -> HandlerResult<ListDatabases> {
123
3
        session
124
3
            .as_client
125
3
            .list_databases()
126
3
            .await
127
3
            .map_err(HandlerError::from)
128
6
    }
129
}
130

            
131
#[async_trait]
132
impl<B: Backend> Handler<ListAvailableSchemas, B> for ServerDispatcher {
133
3
    async fn handle(
134
3
        session: HandlerSession<'_, B>,
135
3
        _command: ListAvailableSchemas,
136
3
    ) -> HandlerResult<ListAvailableSchemas> {
137
3
        session
138
3
            .as_client
139
3
            .list_available_schemas()
140
3
            .await
141
3
            .map_err(HandlerError::from)
142
6
    }
143
}
144

            
145
#[async_trait]
146
impl<B: Backend> Handler<CreateUser, B> for ServerDispatcher {
147
9
    async fn handle(
148
9
        session: HandlerSession<'_, B>,
149
9
        command: CreateUser,
150
9
    ) -> HandlerResult<CreateUser> {
151
9
        session
152
9
            .as_client
153
9
            .create_user(&command.username)
154
9
            .await
155
9
            .map_err(HandlerError::from)
156
18
    }
157
}
158

            
159
#[async_trait]
160
impl<B: Backend> Handler<DeleteUser, B> for ServerDispatcher {
161
3
    async fn handle(
162
3
        session: HandlerSession<'_, B>,
163
3
        command: DeleteUser,
164
3
    ) -> HandlerResult<DeleteUser> {
165
3
        session
166
3
            .as_client
167
3
            .delete_user(command.user)
168
3
            .await
169
3
            .map_err(HandlerError::from)
170
6
    }
171
}
172

            
173
#[cfg(feature = "password-hashing")]
174
#[async_trait]
175
impl<B: Backend> Handler<SetUserPassword, B> for ServerDispatcher {
176
    async fn handle(
177
        session: HandlerSession<'_, B>,
178
        command: SetUserPassword,
179
    ) -> HandlerResult<SetUserPassword> {
180
        session
181
            .as_client
182
            .set_user_password(command.user, command.password)
183
            .await
184
            .map_err(HandlerError::from)
185
    }
186
}
187

            
188
#[cfg(feature = "password-hashing")]
189
#[async_trait]
190
impl<B: Backend> Handler<Authenticate, B> for ServerDispatcher {
191
19
    async fn handle(
192
19
        session: HandlerSession<'_, B>,
193
19
        command: Authenticate,
194
19
    ) -> HandlerResult<Authenticate> {
195
19
        let authenticated = session
196
19
            .as_client
197
19
            .authenticate(command.authentication)
198
19
            .await?;
199
19
        let new_session = authenticated.session().cloned().unwrap();
200
19

            
201
19
        session.client.logged_in_as(new_session.clone());
202
19

            
203
19
        Ok(new_session)
204
38
    }
205
}
206

            
207
#[async_trait]
208
impl<B: Backend> Handler<AssumeIdentity, B> for ServerDispatcher {
209
2
    async fn handle(
210
2
        session: HandlerSession<'_, B>,
211
2
        command: AssumeIdentity,
212
2
    ) -> HandlerResult<AssumeIdentity> {
213
2
        let authenticated = session.as_client.assume_identity(command.0).await?;
214
2
        let new_session = authenticated.session().cloned().unwrap();
215
2

            
216
2
        session.client.logged_in_as(new_session.clone());
217
2

            
218
2
        Ok(new_session)
219
4
    }
220
}
221

            
222
#[async_trait]
223
impl<B: Backend> Handler<LogOutSession, B> for ServerDispatcher {
224
3
    async fn handle(
225
3
        session: HandlerSession<'_, B>,
226
3
        command: LogOutSession,
227
3
    ) -> HandlerResult<LogOutSession> {
228
3
        if let Some(logged_out) = session.client.log_out(command.0) {
229
3
            if let Err(err) = session
230
3
                .server
231
3
                .backend()
232
3
                .client_session_ended(logged_out, session.client, false, session.server)
233
                .await
234
            {
235
                log::error!("[server] Error in `client_session_ended`: {err:?}");
236
3
            }
237
        }
238

            
239
3
        Ok(())
240
6
    }
241
}
242

            
243
#[async_trait]
244
impl<B: Backend> Handler<AlterUserPermissionGroupMembership, B> for ServerDispatcher {
245
12
    async fn handle(
246
12
        session: HandlerSession<'_, B>,
247
12
        command: AlterUserPermissionGroupMembership,
248
12
    ) -> HandlerResult<AlterUserPermissionGroupMembership> {
249
12
        if command.should_be_member {
250
6
            session
251
6
                .as_client
252
6
                .add_permission_group_to_user(command.user, command.group)
253
6
                .await
254
6
                .map_err(HandlerError::from)
255
        } else {
256
6
            session
257
6
                .as_client
258
6
                .remove_permission_group_from_user(command.user, command.group)
259
6
                .await
260
6
                .map_err(HandlerError::from)
261
        }
262
24
    }
263
}
264

            
265
#[async_trait]
266
impl<B: Backend> Handler<AlterUserRoleMembership, B> for ServerDispatcher {
267
12
    async fn handle(
268
12
        session: HandlerSession<'_, B>,
269
12
        command: AlterUserRoleMembership,
270
12
    ) -> HandlerResult<AlterUserRoleMembership> {
271
12
        if command.should_be_member {
272
6
            session
273
6
                .as_client
274
6
                .add_role_to_user(command.user, command.role)
275
6
                .await
276
6
                .map_err(HandlerError::from)
277
        } else {
278
6
            session
279
6
                .as_client
280
6
                .remove_role_from_user(command.user, command.role)
281
6
                .await
282
6
                .map_err(HandlerError::from)
283
        }
284
24
    }
285
}
286

            
287
#[async_trait]
288
impl<B: Backend> Handler<Get, B> for ServerDispatcher {
289
9274
    async fn handle(session: HandlerSession<'_, B>, command: Get) -> HandlerResult<Get> {
290
9274
        let database = session
291
9274
            .as_client
292
9274
            .database_without_schema(&command.database)
293
4820
            .await?;
294
9274
        database
295
9274
            .get_from_collection(command.id, &command.collection)
296
3109
            .await
297
9274
            .map_err(HandlerError::from)
298
18548
    }
299
}
300

            
301
#[async_trait]
302
impl<B: Backend> Handler<GetMultiple, B> for ServerDispatcher {
303
4750
    async fn handle(
304
4750
        session: HandlerSession<'_, B>,
305
4750
        command: GetMultiple,
306
4750
    ) -> HandlerResult<GetMultiple> {
307
4750
        let database = session
308
4750
            .as_client
309
4750
            .database_without_schema(&command.database)
310
2306
            .await?;
311
4750
        database
312
4750
            .get_multiple_from_collection(&command.ids, &command.collection)
313
1406
            .await
314
4750
            .map_err(HandlerError::from)
315
9500
    }
316
}
317

            
318
#[async_trait]
319
impl<B: Backend> Handler<List, B> for ServerDispatcher {
320
12
    async fn handle(session: HandlerSession<'_, B>, command: List) -> HandlerResult<List> {
321
12
        let database = session
322
12
            .as_client
323
12
            .database_without_schema(&command.database)
324
11
            .await?;
325
12
        database
326
12
            .list_from_collection(
327
12
                command.ids,
328
12
                command.order,
329
12
                command.limit,
330
12
                &command.collection,
331
12
            )
332
10
            .await
333
12
            .map_err(HandlerError::from)
334
24
    }
335
}
336

            
337
#[async_trait]
338
impl<B: Backend> Handler<ListHeaders, B> for ServerDispatcher {
339
3
    async fn handle(
340
3
        session: HandlerSession<'_, B>,
341
3
        command: ListHeaders,
342
3
    ) -> HandlerResult<ListHeaders> {
343
3
        let database = session
344
3
            .as_client
345
3
            .database_without_schema(&command.0.database)
346
3
            .await?;
347
3
        database
348
3
            .list_headers_from_collection(
349
3
                command.0.ids,
350
3
                command.0.order,
351
3
                command.0.limit,
352
3
                &command.0.collection,
353
3
            )
354
3
            .await
355
3
            .map_err(HandlerError::from)
356
6
    }
357
}
358

            
359
#[async_trait]
360
impl<B: Backend> Handler<Count, B> for ServerDispatcher {
361
6
    async fn handle(session: HandlerSession<'_, B>, command: Count) -> HandlerResult<Count> {
362
6
        let database = session
363
6
            .as_client
364
6
            .database_without_schema(&command.database)
365
6
            .await?;
366
6
        database
367
6
            .count_from_collection(command.ids, &command.collection)
368
6
            .await
369
6
            .map_err(HandlerError::from)
370
12
    }
371
}
372

            
373
#[async_trait]
374
impl<B: Backend> Handler<Query, B> for ServerDispatcher {
375
4841
    async fn handle(session: HandlerSession<'_, B>, command: Query) -> HandlerResult<Query> {
376
4841
        let database = session
377
4841
            .as_client
378
4841
            .database_without_schema(&command.database)
379
2431
            .await?;
380
4841
        database
381
4841
            .query_by_name(
382
4841
                &command.view,
383
4841
                command.key,
384
4841
                command.order,
385
4841
                command.limit,
386
4841
                command.access_policy,
387
4841
            )
388
2527
            .await
389
4841
            .map_err(HandlerError::from)
390
9682
    }
391
}
392

            
393
#[async_trait]
394
impl<B: Backend> Handler<QueryWithDocs, B> for ServerDispatcher {
395
    async fn handle(
396
        session: HandlerSession<'_, B>,
397
        command: QueryWithDocs,
398
    ) -> HandlerResult<QueryWithDocs> {
399
        let database = session
400
            .as_client
401
            .database_without_schema(&command.0.database)
402
            .await?;
403
        database
404
            .query_by_name_with_docs(
405
                &command.0.view,
406
                command.0.key,
407
                command.0.order,
408
                command.0.limit,
409
                command.0.access_policy,
410
            )
411
            .await
412
            .map_err(HandlerError::from)
413
    }
414
}
415

            
416
#[async_trait]
417
impl<B: Backend> Handler<Reduce, B> for ServerDispatcher {
418
9544
    async fn handle(session: HandlerSession<'_, B>, command: Reduce) -> HandlerResult<Reduce> {
419
9544
        let database = session
420
9544
            .as_client
421
9544
            .database_without_schema(&command.database)
422
4657
            .await?;
423
9544
        database
424
9544
            .reduce_by_name(&command.view, command.key, command.access_policy)
425
3772
            .await
426
9544
            .map(Bytes::from)
427
9544
            .map_err(HandlerError::from)
428
19088
    }
429
}
430

            
431
#[async_trait]
432
impl<B: Backend> Handler<ReduceGrouped, B> for ServerDispatcher {
433
8
    async fn handle(
434
8
        session: HandlerSession<'_, B>,
435
8
        command: ReduceGrouped,
436
8
    ) -> HandlerResult<ReduceGrouped> {
437
8
        let database = session
438
8
            .as_client
439
8
            .database_without_schema(&command.0.database)
440
8
            .await?;
441
8
        database
442
8
            .reduce_grouped_by_name(&command.0.view, command.0.key, command.0.access_policy)
443
8
            .await
444
8
            .map_err(HandlerError::from)
445
16
    }
446
}
447

            
448
#[async_trait]
449
impl<B: Backend> Handler<ApplyTransaction, B> for ServerDispatcher {
450
11802
    async fn handle(
451
11802
        session: HandlerSession<'_, B>,
452
11802
        command: ApplyTransaction,
453
11802
    ) -> HandlerResult<ApplyTransaction> {
454
11802
        let database = session
455
11802
            .as_client
456
11802
            .database_without_schema(&command.database)
457
7887
            .await?;
458
11802
        database
459
11802
            .apply_transaction(command.transaction)
460
11528
            .await
461
11802
            .map_err(HandlerError::from)
462
23604
    }
463
}
464

            
465
#[async_trait]
466
impl<B: Backend> Handler<DeleteDocs, B> for ServerDispatcher {
467
6
    async fn handle(
468
6
        session: HandlerSession<'_, B>,
469
6
        command: DeleteDocs,
470
6
    ) -> HandlerResult<DeleteDocs> {
471
6
        let database = session
472
6
            .as_client
473
6
            .database_without_schema(&command.database)
474
6
            .await?;
475
6
        database
476
6
            .delete_docs_by_name(&command.view, command.key, command.access_policy)
477
6
            .await
478
6
            .map_err(HandlerError::from)
479
12
    }
480
}
481

            
482
#[async_trait]
483
impl<B: Backend> Handler<ListExecutedTransactions, B> for ServerDispatcher {
484
1054
    async fn handle(
485
1054
        session: HandlerSession<'_, B>,
486
1054
        command: ListExecutedTransactions,
487
1054
    ) -> HandlerResult<ListExecutedTransactions> {
488
1054
        let database = session
489
1054
            .as_client
490
1054
            .database_without_schema(&command.database)
491
680
            .await?;
492
1054
        database
493
1054
            .list_executed_transactions(command.starting_id, command.result_limit)
494
477
            .await
495
1054
            .map_err(HandlerError::from)
496
2108
    }
497
}
498

            
499
#[async_trait]
500
impl<B: Backend> Handler<LastTransactionId, B> for ServerDispatcher {
501
3
    async fn handle(
502
3
        session: HandlerSession<'_, B>,
503
3
        command: LastTransactionId,
504
3
    ) -> HandlerResult<LastTransactionId> {
505
3
        let database = session
506
3
            .as_client
507
3
            .database_without_schema(&command.database)
508
3
            .await?;
509
3
        database
510
3
            .last_transaction_id()
511
            .await
512
3
            .map_err(HandlerError::from)
513
6
    }
514
}
515

            
516
#[async_trait]
517
impl<B: Backend> Handler<CreateSubscriber, B> for ServerDispatcher {
518
24
    async fn handle(
519
24
        session: HandlerSession<'_, B>,
520
24
        command: CreateSubscriber,
521
24
    ) -> HandlerResult<CreateSubscriber> {
522
24
        let database = session
523
24
            .as_client
524
24
            .database_without_schema(&command.database)
525
24
            .await?;
526
24
        let subscriber = database.create_subscriber().await?;
527
24
        let subscriber_id = subscriber.id();
528
24

            
529
24
        session.client.register_subscriber(
530
24
            subscriber,
531
24
            session.as_client.session().and_then(|session| session.id),
532
24
        );
533
24

            
534
24
        Ok(subscriber_id)
535
48
    }
536
}
537

            
538
#[async_trait]
539
impl<B: Backend> Handler<Publish, B> for ServerDispatcher {
540
30
    async fn handle(session: HandlerSession<'_, B>, command: Publish) -> HandlerResult<Publish> {
541
30
        let database = session
542
30
            .as_client
543
30
            .database_without_schema(&command.database)
544
29
            .await?;
545
30
        database
546
30
            .publish_bytes(command.topic.into_vec(), command.payload.into_vec())
547
            .await
548
30
            .map_err(HandlerError::from)
549
60
    }
550
}
551

            
552
#[async_trait]
553
impl<B: Backend> Handler<PublishToAll, B> for ServerDispatcher {
554
3
    async fn handle(
555
3
        session: HandlerSession<'_, B>,
556
3
        command: PublishToAll,
557
3
    ) -> HandlerResult<PublishToAll> {
558
3
        let database = session
559
3
            .as_client
560
3
            .database_without_schema(&command.database)
561
2
            .await?;
562
3
        database
563
3
            .publish_bytes_to_all(
564
3
                command.topics.into_iter().map(Bytes::into_vec),
565
3
                command.payload.into_vec(),
566
3
            )
567
            .await
568
3
            .map_err(HandlerError::from)
569
6
    }
570
}
571

            
572
#[async_trait]
573
impl<B: Backend> Handler<SubscribeTo, B> for ServerDispatcher {
574
39
    async fn handle(
575
39
        session: HandlerSession<'_, B>,
576
39
        command: SubscribeTo,
577
39
    ) -> HandlerResult<SubscribeTo> {
578
39
        session
579
39
            .client
580
39
            .subscribe_by_id(
581
39
                command.subscriber_id,
582
39
                command.topic,
583
39
                session.as_client.session().and_then(|session| session.id),
584
39
            )
585
39
            .map_err(HandlerError::from)
586
39
    }
587
}
588

            
589
#[async_trait]
590
impl<B: Backend> Handler<UnsubscribeFrom, B> for ServerDispatcher {
591
3
    async fn handle(
592
3
        session: HandlerSession<'_, B>,
593
3
        command: UnsubscribeFrom,
594
3
    ) -> HandlerResult<UnsubscribeFrom> {
595
3
        session
596
3
            .client
597
3
            .unsubscribe_by_id(
598
3
                command.subscriber_id,
599
3
                &command.topic,
600
3
                session.as_client.session().and_then(|session| session.id),
601
3
            )
602
3
            .map_err(HandlerError::from)
603
3
    }
604
}
605

            
606
#[async_trait]
607
impl<B: Backend> Handler<UnregisterSubscriber, B> for ServerDispatcher {
608
10
    async fn handle(
609
10
        session: HandlerSession<'_, B>,
610
10
        command: UnregisterSubscriber,
611
10
    ) -> HandlerResult<UnregisterSubscriber> {
612
10
        session
613
10
            .client
614
10
            .unregister_subscriber_by_id(
615
10
                command.subscriber_id,
616
10
                session.as_client.session().and_then(|session| session.id),
617
10
            )
618
10
            .map_err(HandlerError::from)
619
10
    }
620
}
621

            
622
#[async_trait]
623
impl<B: Backend> Handler<ExecuteKeyOperation, B> for ServerDispatcher {
624
30321
    async fn handle(
625
30321
        session: HandlerSession<'_, B>,
626
30321
        command: ExecuteKeyOperation,
627
30321
    ) -> HandlerResult<ExecuteKeyOperation> {
628
30321
        let database = session
629
30321
            .as_client
630
30321
            .database_without_schema(&command.database)
631
20539
            .await?;
632
30321
        database
633
30321
            .execute_key_operation(command.op)
634
19175
            .await
635
30321
            .map_err(HandlerError::from)
636
60642
    }
637
}
638

            
639
#[async_trait]
640
impl<B: Backend> Handler<CompactCollection, B> for ServerDispatcher {
641
3
    async fn handle(
642
3
        session: HandlerSession<'_, B>,
643
3
        command: CompactCollection,
644
3
    ) -> HandlerResult<CompactCollection> {
645
3
        let database = session
646
3
            .as_client
647
3
            .database_without_schema(&command.database)
648
3
            .await?;
649
3
        database
650
3
            .compact_collection_by_name(command.name)
651
3
            .await
652
3
            .map_err(HandlerError::from)
653
6
    }
654
}
655

            
656
#[async_trait]
657
impl<B: Backend> Handler<CompactKeyValueStore, B> for ServerDispatcher {
658
3
    async fn handle(
659
3
        session: HandlerSession<'_, B>,
660
3
        command: CompactKeyValueStore,
661
3
    ) -> HandlerResult<CompactKeyValueStore> {
662
3
        let database = session
663
3
            .as_client
664
3
            .database_without_schema(&command.database)
665
3
            .await?;
666
3
        database
667
3
            .compact_key_value_store()
668
3
            .await
669
3
            .map_err(HandlerError::from)
670
6
    }
671
}
672

            
673
#[async_trait]
674
impl<B: Backend> Handler<Compact, B> for ServerDispatcher {
675
3
    async fn handle(client: HandlerSession<'_, B>, command: Compact) -> HandlerResult<Compact> {
676
3
        let database = client
677
3
            .as_client
678
3
            .database_without_schema(&command.database)
679
3
            .await?;
680
3
        database.compact().await.map_err(HandlerError::from)
681
6
    }
682
}