1
#[cfg(feature = "password-hashing")]
2
use bonsaidb_core::networking::{Authenticate, SetUserPassword};
3
use bonsaidb_core::{
4
    arc_bytes::serde::Bytes,
5
    async_trait::async_trait,
6
    connection::{AsyncConnection, AsyncLowLevelConnection, AsyncStorageConnection, HasSession},
7
    keyvalue::AsyncKeyValue,
8
    networking::{
9
        AlterUserPermissionGroupMembership, AlterUserRoleMembership, ApplyTransaction,
10
        AssumeIdentity, Compact, CompactCollection, CompactKeyValueStore, Count, CreateDatabase,
11
        CreateSubscriber, CreateUser, DeleteDatabase, DeleteDocs, DeleteUser, ExecuteKeyOperation,
12
        Get, GetMultiple, LastTransactionId, List, ListAvailableSchemas, ListDatabases,
13
        ListExecutedTransactions, ListHeaders, LogOutSession, Publish, PublishToAll, Query,
14
        QueryWithDocs, Reduce, ReduceGrouped, SubscribeTo, UnregisterSubscriber, UnsubscribeFrom,
15
    },
16
    pubsub::AsyncPubSub,
17
    schema::ApiName,
18
};
19

            
20
use crate::{
21
    api::{Handler, HandlerError, HandlerResult, HandlerSession},
22
    Backend, Error, ServerConfiguration,
23
};
24

            
25
#[cfg_attr(not(feature = "password-hashing"), allow(unused_mut))]
26
82
pub fn register_api_handlers<B: Backend>(
27
82
    config: ServerConfiguration<B>,
28
82
) -> Result<ServerConfiguration<B>, Error> {
29
82
    let mut config = config
30
82
        .with_api::<ServerDispatcher, AlterUserPermissionGroupMembership>()?
31
82
        .with_api::<ServerDispatcher, AlterUserRoleMembership>()?
32
82
        .with_api::<ServerDispatcher, ApplyTransaction>()?
33
82
        .with_api::<ServerDispatcher, AssumeIdentity>()?
34
82
        .with_api::<ServerDispatcher, Compact>()?
35
82
        .with_api::<ServerDispatcher, CompactCollection>()?
36
82
        .with_api::<ServerDispatcher, CompactKeyValueStore>()?
37
82
        .with_api::<ServerDispatcher, Count>()?
38
82
        .with_api::<ServerDispatcher, CreateDatabase>()?
39
82
        .with_api::<ServerDispatcher, CreateSubscriber>()?
40
82
        .with_api::<ServerDispatcher, CreateUser>()?
41
82
        .with_api::<ServerDispatcher, DeleteDatabase>()?
42
82
        .with_api::<ServerDispatcher, DeleteDocs>()?
43
82
        .with_api::<ServerDispatcher, DeleteUser>()?
44
82
        .with_api::<ServerDispatcher, ExecuteKeyOperation>()?
45
82
        .with_api::<ServerDispatcher, Get>()?
46
82
        .with_api::<ServerDispatcher, GetMultiple>()?
47
82
        .with_api::<ServerDispatcher, LastTransactionId>()?
48
82
        .with_api::<ServerDispatcher, List>()?
49
82
        .with_api::<ServerDispatcher, ListHeaders>()?
50
82
        .with_api::<ServerDispatcher, ListAvailableSchemas>()?
51
82
        .with_api::<ServerDispatcher, ListDatabases>()?
52
82
        .with_api::<ServerDispatcher, ListExecutedTransactions>()?
53
82
        .with_api::<ServerDispatcher, LogOutSession>()?
54
82
        .with_api::<ServerDispatcher, Publish>()?
55
82
        .with_api::<ServerDispatcher, PublishToAll>()?
56
82
        .with_api::<ServerDispatcher, Query>()?
57
82
        .with_api::<ServerDispatcher, QueryWithDocs>()?
58
82
        .with_api::<ServerDispatcher, Reduce>()?
59
82
        .with_api::<ServerDispatcher, ReduceGrouped>()?
60
82
        .with_api::<ServerDispatcher, SubscribeTo>()?
61
82
        .with_api::<ServerDispatcher, UnregisterSubscriber>()?
62
82
        .with_api::<ServerDispatcher, UnsubscribeFrom>()?;
63

            
64
    #[cfg(feature = "password-hashing")]
65
    {
66
82
        config = config
67
82
            .with_api::<ServerDispatcher, Authenticate>()?
68
82
            .with_api::<ServerDispatcher, SetUserPassword>()?;
69
    }
70

            
71
82
    Ok(config)
72
82
}
73

            
74
#[derive(Debug)]
75
pub struct ServerDispatcher;
76
impl ServerDispatcher {
77
71132
    pub async fn dispatch_api_request<B: Backend>(
78
71132
        session: HandlerSession<'_, B>,
79
71132
        name: &ApiName,
80
71132
        request: Bytes,
81
71132
    ) -> Result<Bytes, Error> {
82
71138
        if let Some(dispatcher) = session.server.custom_api_dispatcher(name) {
83
97459
            dispatcher.handle(session, &request).await
84
        } else {
85
            Err(Error::from(bonsaidb_core::Error::ApiNotFound(name.clone())))
86
        }
87
71139
    }
88
}
89

            
90
#[async_trait]
91
impl<B: Backend> Handler<B, CreateDatabase> for ServerDispatcher {
92
608
    async fn handle(
93
608
        session: HandlerSession<'_, B>,
94
608
        request: CreateDatabase,
95
608
    ) -> HandlerResult<CreateDatabase> {
96
608
        session
97
608
            .as_client
98
608
            .create_database_with_schema(
99
608
                &request.database.name,
100
608
                request.database.schema,
101
608
                request.only_if_needed,
102
608
            )
103
604
            .await?;
104
599
        Ok(())
105
1216
    }
106
}
107

            
108
#[async_trait]
109
impl<B: Backend> Handler<B, DeleteDatabase> for ServerDispatcher {
110
506
    async fn handle(
111
506
        session: HandlerSession<'_, B>,
112
506
        command: DeleteDatabase,
113
506
    ) -> HandlerResult<DeleteDatabase> {
114
506
        session.as_client.delete_database(&command.name).await?;
115
503
        Ok(())
116
1012
    }
117
}
118

            
119
#[async_trait]
120
impl<B: Backend> Handler<B, ListDatabases> for ServerDispatcher {
121
3
    async fn handle(
122
3
        session: HandlerSession<'_, B>,
123
3
        _command: ListDatabases,
124
3
    ) -> HandlerResult<ListDatabases> {
125
3
        session
126
3
            .as_client
127
3
            .list_databases()
128
3
            .await
129
3
            .map_err(HandlerError::from)
130
6
    }
131
}
132

            
133
#[async_trait]
134
impl<B: Backend> Handler<B, ListAvailableSchemas> for ServerDispatcher {
135
3
    async fn handle(
136
3
        session: HandlerSession<'_, B>,
137
3
        _command: ListAvailableSchemas,
138
3
    ) -> HandlerResult<ListAvailableSchemas> {
139
3
        session
140
3
            .as_client
141
3
            .list_available_schemas()
142
3
            .await
143
3
            .map_err(HandlerError::from)
144
6
    }
145
}
146

            
147
#[async_trait]
148
impl<B: Backend> Handler<B, CreateUser> for ServerDispatcher {
149
5
    async fn handle(
150
5
        session: HandlerSession<'_, B>,
151
5
        command: CreateUser,
152
5
    ) -> HandlerResult<CreateUser> {
153
5
        session
154
5
            .as_client
155
5
            .create_user(&command.username)
156
5
            .await
157
5
            .map_err(HandlerError::from)
158
10
    }
159
}
160

            
161
#[async_trait]
162
impl<B: Backend> Handler<B, DeleteUser> for ServerDispatcher {
163
3
    async fn handle(
164
3
        session: HandlerSession<'_, B>,
165
3
        command: DeleteUser,
166
3
    ) -> HandlerResult<DeleteUser> {
167
3
        session
168
3
            .as_client
169
3
            .delete_user(command.user)
170
3
            .await
171
3
            .map_err(HandlerError::from)
172
6
    }
173
}
174

            
175
#[cfg(feature = "password-hashing")]
176
#[async_trait]
177
impl<B: Backend> Handler<B, SetUserPassword> for ServerDispatcher {
178
    async fn handle(
179
        session: HandlerSession<'_, B>,
180
        command: SetUserPassword,
181
    ) -> HandlerResult<SetUserPassword> {
182
        session
183
            .as_client
184
            .set_user_password(command.user, command.password)
185
            .await
186
            .map_err(HandlerError::from)
187
    }
188
}
189

            
190
#[cfg(feature = "password-hashing")]
191
#[async_trait]
192
impl<B: Backend> Handler<B, Authenticate> for ServerDispatcher {
193
5
    async fn handle(
194
5
        session: HandlerSession<'_, B>,
195
5
        command: Authenticate,
196
5
    ) -> HandlerResult<Authenticate> {
197
5
        let authenticated = session
198
5
            .as_client
199
5
            .authenticate(command.user, command.authentication)
200
5
            .await?;
201
5
        let new_session = authenticated.session().cloned().unwrap();
202
5

            
203
5
        session.client.logged_in_as(new_session.clone());
204
5

            
205
5
        Ok(new_session)
206
10
    }
207
}
208

            
209
#[async_trait]
210
impl<B: Backend> Handler<B, AssumeIdentity> for ServerDispatcher {
211
2
    async fn handle(
212
2
        session: HandlerSession<'_, B>,
213
2
        command: AssumeIdentity,
214
2
    ) -> HandlerResult<AssumeIdentity> {
215
2
        let authenticated = session.as_client.assume_identity(command.0).await?;
216
2
        let new_session = authenticated.session().cloned().unwrap();
217
2

            
218
2
        session.client.logged_in_as(new_session.clone());
219
2

            
220
2
        Ok(new_session)
221
4
    }
222
}
223

            
224
#[async_trait]
225
impl<B: Backend> Handler<B, LogOutSession> for ServerDispatcher {
226
2
    async fn handle(
227
2
        session: HandlerSession<'_, B>,
228
2
        command: LogOutSession,
229
2
    ) -> HandlerResult<LogOutSession> {
230
2
        session.client.log_out(command.0);
231
2

            
232
2
        Ok(())
233
2
    }
234
}
235

            
236
#[async_trait]
237
impl<B: Backend> Handler<B, AlterUserPermissionGroupMembership> for ServerDispatcher {
238
12
    async fn handle(
239
12
        session: HandlerSession<'_, B>,
240
12
        command: AlterUserPermissionGroupMembership,
241
12
    ) -> HandlerResult<AlterUserPermissionGroupMembership> {
242
12
        if command.should_be_member {
243
6
            session
244
6
                .as_client
245
6
                .add_permission_group_to_user(command.user, command.group)
246
6
                .await
247
6
                .map_err(HandlerError::from)
248
        } else {
249
6
            session
250
6
                .as_client
251
6
                .remove_permission_group_from_user(command.user, command.group)
252
6
                .await
253
6
                .map_err(HandlerError::from)
254
        }
255
24
    }
256
}
257

            
258
#[async_trait]
259
impl<B: Backend> Handler<B, AlterUserRoleMembership> for ServerDispatcher {
260
12
    async fn handle(
261
12
        session: HandlerSession<'_, B>,
262
12
        command: AlterUserRoleMembership,
263
12
    ) -> HandlerResult<AlterUserRoleMembership> {
264
12
        if command.should_be_member {
265
6
            session
266
6
                .as_client
267
6
                .add_role_to_user(command.user, command.role)
268
6
                .await
269
6
                .map_err(HandlerError::from)
270
        } else {
271
6
            session
272
6
                .as_client
273
6
                .remove_role_from_user(command.user, command.role)
274
6
                .await
275
6
                .map_err(HandlerError::from)
276
        }
277
24
    }
278
}
279

            
280
#[async_trait]
281
impl<B: Backend> Handler<B, Get> for ServerDispatcher {
282
8902
    async fn handle(session: HandlerSession<'_, B>, command: Get) -> HandlerResult<Get> {
283
8902
        let database = session
284
8902
            .as_client
285
8902
            .database_without_schema(&command.database)
286
6516
            .await?;
287
8902
        database
288
8902
            .get_from_collection(command.id, &command.collection)
289
6203
            .await
290
8902
            .map_err(HandlerError::from)
291
17804
    }
292
}
293

            
294
#[async_trait]
295
impl<B: Backend> Handler<B, GetMultiple> for ServerDispatcher {
296
4458
    async fn handle(
297
4458
        session: HandlerSession<'_, B>,
298
4458
        command: GetMultiple,
299
4458
    ) -> HandlerResult<GetMultiple> {
300
4458
        let database = session
301
4458
            .as_client
302
4458
            .database_without_schema(&command.database)
303
3146
            .await?;
304
4458
        database
305
4458
            .get_multiple_from_collection(&command.ids, &command.collection)
306
3003
            .await
307
4458
            .map_err(HandlerError::from)
308
8916
    }
309
}
310

            
311
#[async_trait]
312
impl<B: Backend> Handler<B, List> for ServerDispatcher {
313
12
    async fn handle(session: HandlerSession<'_, B>, command: List) -> HandlerResult<List> {
314
12
        let database = session
315
12
            .as_client
316
12
            .database_without_schema(&command.database)
317
11
            .await?;
318
12
        database
319
12
            .list_from_collection(
320
12
                command.ids,
321
12
                command.order,
322
12
                command.limit,
323
12
                &command.collection,
324
12
            )
325
9
            .await
326
12
            .map_err(HandlerError::from)
327
24
    }
328
}
329

            
330
#[async_trait]
331
impl<B: Backend> Handler<B, ListHeaders> for ServerDispatcher {
332
3
    async fn handle(
333
3
        session: HandlerSession<'_, B>,
334
3
        command: ListHeaders,
335
3
    ) -> HandlerResult<ListHeaders> {
336
3
        let database = session
337
3
            .as_client
338
3
            .database_without_schema(&command.0.database)
339
3
            .await?;
340
3
        database
341
3
            .list_headers_from_collection(
342
3
                command.0.ids,
343
3
                command.0.order,
344
3
                command.0.limit,
345
3
                &command.0.collection,
346
3
            )
347
2
            .await
348
3
            .map_err(HandlerError::from)
349
6
    }
350
}
351

            
352
#[async_trait]
353
impl<B: Backend> Handler<B, Count> for ServerDispatcher {
354
6
    async fn handle(session: HandlerSession<'_, B>, command: Count) -> HandlerResult<Count> {
355
6
        let database = session
356
6
            .as_client
357
6
            .database_without_schema(&command.database)
358
4
            .await?;
359
6
        database
360
6
            .count_from_collection(command.ids, &command.collection)
361
4
            .await
362
6
            .map_err(HandlerError::from)
363
12
    }
364
}
365

            
366
#[async_trait]
367
impl<B: Backend> Handler<B, Query> for ServerDispatcher {
368
4530
    async fn handle(session: HandlerSession<'_, B>, command: Query) -> HandlerResult<Query> {
369
4530
        let database = session
370
4530
            .as_client
371
4530
            .database_without_schema(&command.database)
372
3527
            .await?;
373
4530
        database
374
4530
            .query_by_name(
375
4530
                &command.view,
376
4530
                command.key,
377
4530
                command.order,
378
4530
                command.limit,
379
4530
                command.access_policy,
380
4530
            )
381
3658
            .await
382
4530
            .map_err(HandlerError::from)
383
9060
    }
384
}
385

            
386
#[async_trait]
387
impl<B: Backend> Handler<B, QueryWithDocs> for ServerDispatcher {
388
    async fn handle(
389
        session: HandlerSession<'_, B>,
390
        command: QueryWithDocs,
391
    ) -> HandlerResult<QueryWithDocs> {
392
        let database = session
393
            .as_client
394
            .database_without_schema(&command.0.database)
395
            .await?;
396
        database
397
            .query_by_name_with_docs(
398
                &command.0.view,
399
                command.0.key,
400
                command.0.order,
401
                command.0.limit,
402
                command.0.access_policy,
403
            )
404
            .await
405
            .map_err(HandlerError::from)
406
    }
407
}
408

            
409
#[async_trait]
410
impl<B: Backend> Handler<B, Reduce> for ServerDispatcher {
411
8992
    async fn handle(session: HandlerSession<'_, B>, command: Reduce) -> HandlerResult<Reduce> {
412
8992
        let database = session
413
8992
            .as_client
414
8992
            .database_without_schema(&command.database)
415
6425
            .await?;
416
8992
        database
417
8992
            .reduce_by_name(&command.view, command.key, command.access_policy)
418
5904
            .await
419
8992
            .map(Bytes::from)
420
8992
            .map_err(HandlerError::from)
421
17984
    }
422
}
423

            
424
#[async_trait]
425
impl<B: Backend> Handler<B, ReduceGrouped> for ServerDispatcher {
426
8
    async fn handle(
427
8
        session: HandlerSession<'_, B>,
428
8
        command: ReduceGrouped,
429
8
    ) -> HandlerResult<ReduceGrouped> {
430
8
        let database = session
431
8
            .as_client
432
8
            .database_without_schema(&command.0.database)
433
7
            .await?;
434
8
        database
435
8
            .reduce_grouped_by_name(&command.0.view, command.0.key, command.0.access_policy)
436
7
            .await
437
8
            .map_err(HandlerError::from)
438
16
    }
439
}
440

            
441
#[async_trait]
442
impl<B: Backend> Handler<B, ApplyTransaction> for ServerDispatcher {
443
11553
    async fn handle(
444
11553
        session: HandlerSession<'_, B>,
445
11553
        command: ApplyTransaction,
446
11553
    ) -> HandlerResult<ApplyTransaction> {
447
11553
        let database = session
448
11553
            .as_client
449
11553
            .database_without_schema(&command.database)
450
9955
            .await?;
451
11553
        database
452
11553
            .apply_transaction(command.transaction)
453
10247
            .await
454
11553
            .map_err(HandlerError::from)
455
23106
    }
456
}
457

            
458
#[async_trait]
459
impl<B: Backend> Handler<B, DeleteDocs> for ServerDispatcher {
460
6
    async fn handle(
461
6
        session: HandlerSession<'_, B>,
462
6
        command: DeleteDocs,
463
6
    ) -> HandlerResult<DeleteDocs> {
464
6
        let database = session
465
6
            .as_client
466
6
            .database_without_schema(&command.database)
467
4
            .await?;
468
6
        database
469
6
            .delete_docs_by_name(&command.view, command.key, command.access_policy)
470
5
            .await
471
6
            .map_err(HandlerError::from)
472
12
    }
473
}
474

            
475
#[async_trait]
476
impl<B: Backend> Handler<B, ListExecutedTransactions> for ServerDispatcher {
477
1054
    async fn handle(
478
1054
        session: HandlerSession<'_, B>,
479
1054
        command: ListExecutedTransactions,
480
1054
    ) -> HandlerResult<ListExecutedTransactions> {
481
1054
        let database = session
482
1054
            .as_client
483
1054
            .database_without_schema(&command.database)
484
937
            .await?;
485
1054
        database
486
1054
            .list_executed_transactions(command.starting_id, command.result_limit)
487
657
            .await
488
1054
            .map_err(HandlerError::from)
489
2108
    }
490
}
491

            
492
#[async_trait]
493
impl<B: Backend> Handler<B, LastTransactionId> for ServerDispatcher {
494
3
    async fn handle(
495
3
        session: HandlerSession<'_, B>,
496
3
        command: LastTransactionId,
497
3
    ) -> HandlerResult<LastTransactionId> {
498
3
        let database = session
499
3
            .as_client
500
3
            .database_without_schema(&command.database)
501
3
            .await?;
502
3
        database
503
3
            .last_transaction_id()
504
            .await
505
3
            .map_err(HandlerError::from)
506
6
    }
507
}
508

            
509
#[async_trait]
510
impl<B: Backend> Handler<B, CreateSubscriber> for ServerDispatcher {
511
24
    async fn handle(
512
24
        session: HandlerSession<'_, B>,
513
24
        command: CreateSubscriber,
514
24
    ) -> HandlerResult<CreateSubscriber> {
515
24
        let database = session
516
24
            .as_client
517
24
            .database_without_schema(&command.database)
518
24
            .await?;
519
24
        let subscriber = database.create_subscriber().await?;
520
24
        let subscriber_id = subscriber.id();
521
24

            
522
24
        session.client.register_subscriber(
523
24
            subscriber,
524
24
            session.as_client.session().and_then(|session| session.id),
525
24
        );
526
24

            
527
24
        Ok(subscriber_id)
528
48
    }
529
}
530

            
531
#[async_trait]
532
impl<B: Backend> Handler<B, Publish> for ServerDispatcher {
533
30
    async fn handle(session: HandlerSession<'_, B>, command: Publish) -> HandlerResult<Publish> {
534
30
        let database = session
535
30
            .as_client
536
30
            .database_without_schema(&command.database)
537
26
            .await?;
538
30
        database
539
30
            .publish_bytes(command.topic.into_vec(), command.payload.into_vec())
540
            .await
541
30
            .map_err(HandlerError::from)
542
60
    }
543
}
544

            
545
#[async_trait]
546
impl<B: Backend> Handler<B, PublishToAll> for ServerDispatcher {
547
3
    async fn handle(
548
3
        session: HandlerSession<'_, B>,
549
3
        command: PublishToAll,
550
3
    ) -> HandlerResult<PublishToAll> {
551
3
        let database = session
552
3
            .as_client
553
3
            .database_without_schema(&command.database)
554
3
            .await?;
555
3
        database
556
3
            .publish_bytes_to_all(
557
3
                command.topics.into_iter().map(Bytes::into_vec),
558
3
                command.payload.into_vec(),
559
3
            )
560
            .await
561
3
            .map_err(HandlerError::from)
562
6
    }
563
}
564

            
565
#[async_trait]
566
impl<B: Backend> Handler<B, SubscribeTo> for ServerDispatcher {
567
39
    async fn handle(
568
39
        session: HandlerSession<'_, B>,
569
39
        command: SubscribeTo,
570
39
    ) -> HandlerResult<SubscribeTo> {
571
39
        session
572
39
            .client
573
39
            .subscribe_by_id(
574
39
                command.subscriber_id,
575
39
                command.topic,
576
39
                session.as_client.session().and_then(|session| session.id),
577
39
            )
578
39
            .map_err(HandlerError::from)
579
39
    }
580
}
581

            
582
#[async_trait]
583
impl<B: Backend> Handler<B, UnsubscribeFrom> for ServerDispatcher {
584
3
    async fn handle(
585
3
        session: HandlerSession<'_, B>,
586
3
        command: UnsubscribeFrom,
587
3
    ) -> HandlerResult<UnsubscribeFrom> {
588
3
        session
589
3
            .client
590
3
            .unsubscribe_by_id(
591
3
                command.subscriber_id,
592
3
                &command.topic,
593
3
                session.as_client.session().and_then(|session| session.id),
594
3
            )
595
3
            .map_err(HandlerError::from)
596
3
    }
597
}
598

            
599
#[async_trait]
600
impl<B: Backend> Handler<B, UnregisterSubscriber> for ServerDispatcher {
601
10
    async fn handle(
602
10
        session: HandlerSession<'_, B>,
603
10
        command: UnregisterSubscriber,
604
10
    ) -> HandlerResult<UnregisterSubscriber> {
605
10
        session
606
10
            .client
607
10
            .unregister_subscriber_by_id(
608
10
                command.subscriber_id,
609
10
                session.as_client.session().and_then(|session| session.id),
610
10
            )
611
10
            .map_err(HandlerError::from)
612
10
    }
613
}
614

            
615
#[async_trait]
616
impl<B: Backend> Handler<B, ExecuteKeyOperation> for ServerDispatcher {
617
30321
    async fn handle(
618
30321
        session: HandlerSession<'_, B>,
619
30321
        command: ExecuteKeyOperation,
620
30321
    ) -> HandlerResult<ExecuteKeyOperation> {
621
30321
        let database = session
622
30321
            .as_client
623
30321
            .database_without_schema(&command.database)
624
17756
            .await?;
625
30321
        database
626
30321
            .execute_key_operation(command.op)
627
18222
            .await
628
30321
            .map_err(HandlerError::from)
629
60640
    }
630
}
631

            
632
#[async_trait]
633
impl<B: Backend> Handler<B, CompactCollection> for ServerDispatcher {
634
3
    async fn handle(
635
3
        session: HandlerSession<'_, B>,
636
3
        command: CompactCollection,
637
3
    ) -> HandlerResult<CompactCollection> {
638
3
        let database = session
639
3
            .as_client
640
3
            .database_without_schema(&command.database)
641
3
            .await?;
642
3
        database
643
3
            .compact_collection_by_name(command.name)
644
3
            .await
645
3
            .map_err(HandlerError::from)
646
6
    }
647
}
648

            
649
#[async_trait]
650
impl<B: Backend> Handler<B, CompactKeyValueStore> for ServerDispatcher {
651
3
    async fn handle(
652
3
        session: HandlerSession<'_, B>,
653
3
        command: CompactKeyValueStore,
654
3
    ) -> HandlerResult<CompactKeyValueStore> {
655
3
        let database = session
656
3
            .as_client
657
3
            .database_without_schema(&command.database)
658
3
            .await?;
659
3
        database
660
3
            .compact_key_value_store()
661
3
            .await
662
3
            .map_err(HandlerError::from)
663
6
    }
664
}
665

            
666
#[async_trait]
667
impl<B: Backend> Handler<B, Compact> for ServerDispatcher {
668
3
    async fn handle(client: HandlerSession<'_, B>, command: Compact) -> HandlerResult<Compact> {
669
3
        let database = client
670
3
            .as_client
671
3
            .database_without_schema(&command.database)
672
3
            .await?;
673
3
        database.compact().await.map_err(HandlerError::from)
674
6
    }
675
}