1
use bonsaidb_core::api::ApiName;
2
use bonsaidb_core::arc_bytes::serde::Bytes;
3
use bonsaidb_core::async_trait::async_trait;
4
use bonsaidb_core::connection::{
5
    AsyncConnection, AsyncLowLevelConnection, AsyncStorageConnection, HasSession,
6
};
7
use bonsaidb_core::keyvalue::AsyncKeyValue;
8
use bonsaidb_core::networking::{
9
    AlterUserPermissionGroupMembership, AlterUserRoleMembership, ApplyTransaction, AssumeIdentity,
10
    Compact, CompactCollection, CompactKeyValueStore, Count, CreateDatabase, CreateSubscriber,
11
    CreateUser, DeleteDatabase, DeleteDocs, DeleteUser, ExecuteKeyOperation, Get, GetMultiple,
12
    LastTransactionId, List, ListAvailableSchemas, ListDatabases, ListExecutedTransactions,
13
    ListHeaders, LogOutSession, Publish, PublishToAll, Query, QueryWithDocs, Reduce, ReduceGrouped,
14
    SubscribeTo, UnregisterSubscriber, UnsubscribeFrom,
15
};
16
#[cfg(feature = "password-hashing")]
17
use bonsaidb_core::networking::{Authenticate, SetUserPassword};
18
use bonsaidb_core::pubsub::AsyncPubSub;
19

            
20
use crate::api::{Handler, HandlerError, HandlerResult, HandlerSession};
21
use crate::{Backend, Error, ServerConfiguration};
22

            
23
#[cfg_attr(not(feature = "password-hashing"), allow(unused_mut))]
24
92
pub fn register_api_handlers<B: Backend>(
25
92
    config: ServerConfiguration<B>,
26
92
) -> Result<ServerConfiguration<B>, Error> {
27
92
    let mut config = config
28
92
        .with_api::<ServerDispatcher, AlterUserPermissionGroupMembership>()?
29
92
        .with_api::<ServerDispatcher, AlterUserRoleMembership>()?
30
92
        .with_api::<ServerDispatcher, ApplyTransaction>()?
31
92
        .with_api::<ServerDispatcher, AssumeIdentity>()?
32
92
        .with_api::<ServerDispatcher, Compact>()?
33
92
        .with_api::<ServerDispatcher, CompactCollection>()?
34
92
        .with_api::<ServerDispatcher, CompactKeyValueStore>()?
35
92
        .with_api::<ServerDispatcher, Count>()?
36
92
        .with_api::<ServerDispatcher, CreateDatabase>()?
37
92
        .with_api::<ServerDispatcher, CreateSubscriber>()?
38
92
        .with_api::<ServerDispatcher, CreateUser>()?
39
92
        .with_api::<ServerDispatcher, DeleteDatabase>()?
40
92
        .with_api::<ServerDispatcher, DeleteDocs>()?
41
92
        .with_api::<ServerDispatcher, DeleteUser>()?
42
92
        .with_api::<ServerDispatcher, ExecuteKeyOperation>()?
43
92
        .with_api::<ServerDispatcher, Get>()?
44
92
        .with_api::<ServerDispatcher, GetMultiple>()?
45
92
        .with_api::<ServerDispatcher, LastTransactionId>()?
46
92
        .with_api::<ServerDispatcher, List>()?
47
92
        .with_api::<ServerDispatcher, ListHeaders>()?
48
92
        .with_api::<ServerDispatcher, ListAvailableSchemas>()?
49
92
        .with_api::<ServerDispatcher, ListDatabases>()?
50
92
        .with_api::<ServerDispatcher, ListExecutedTransactions>()?
51
92
        .with_api::<ServerDispatcher, LogOutSession>()?
52
92
        .with_api::<ServerDispatcher, Publish>()?
53
92
        .with_api::<ServerDispatcher, PublishToAll>()?
54
92
        .with_api::<ServerDispatcher, Query>()?
55
92
        .with_api::<ServerDispatcher, QueryWithDocs>()?
56
92
        .with_api::<ServerDispatcher, Reduce>()?
57
92
        .with_api::<ServerDispatcher, ReduceGrouped>()?
58
92
        .with_api::<ServerDispatcher, SubscribeTo>()?
59
92
        .with_api::<ServerDispatcher, UnregisterSubscriber>()?
60
92
        .with_api::<ServerDispatcher, UnsubscribeFrom>()?;
61

            
62
    #[cfg(feature = "password-hashing")]
63
92
    {
64
92
        config = config
65
92
            .with_api::<ServerDispatcher, Authenticate>()?
66
92
            .with_api::<ServerDispatcher, SetUserPassword>()?;
67
    }
68

            
69
92
    Ok(config)
70
92
}
71

            
72
#[derive(Debug)]
73
pub struct ServerDispatcher;
74
impl ServerDispatcher {
75
72049
    pub async fn dispatch_api_request<B: Backend>(
76
72049
        session: HandlerSession<'_, B>,
77
72049
        name: &ApiName,
78
72049
        request: Bytes,
79
72049
    ) -> Result<Bytes, Error> {
80
72049
        if let Some(dispatcher) = session.server.custom_api_dispatcher(name) {
81
132364
            dispatcher.handle(session, &request).await
82
        } else {
83
            Err(Error::from(bonsaidb_core::Error::ApiNotFound(name.clone())))
84
        }
85
72045
    }
86
}
87

            
88
#[async_trait]
89
impl<B: Backend> Handler<CreateDatabase, B> for ServerDispatcher {
90
623
    async fn handle(
91
623
        session: HandlerSession<'_, B>,
92
623
        request: CreateDatabase,
93
623
    ) -> HandlerResult<CreateDatabase> {
94
623
        session
95
623
            .as_client
96
623
            .create_database_with_schema(
97
623
                &request.database.name,
98
623
                request.database.schema,
99
623
                request.only_if_needed,
100
623
            )
101
622
            .await?;
102
614
        Ok(())
103
1869
    }
104
}
105

            
106
#[async_trait]
107
impl<B: Backend> Handler<DeleteDatabase, B> for ServerDispatcher {
108
506
    async fn handle(
109
506
        session: HandlerSession<'_, B>,
110
506
        command: DeleteDatabase,
111
506
    ) -> HandlerResult<DeleteDatabase> {
112
506
        session.as_client.delete_database(&command.name).await?;
113
503
        Ok(())
114
1518
    }
115
}
116

            
117
#[async_trait]
118
impl<B: Backend> Handler<ListDatabases, B> for ServerDispatcher {
119
3
    async fn handle(
120
3
        session: HandlerSession<'_, B>,
121
3
        _command: ListDatabases,
122
3
    ) -> HandlerResult<ListDatabases> {
123
3
        session
124
3
            .as_client
125
3
            .list_databases()
126
3
            .await
127
3
            .map_err(HandlerError::from)
128
9
    }
129
}
130

            
131
#[async_trait]
132
impl<B: Backend> Handler<ListAvailableSchemas, B> for ServerDispatcher {
133
3
    async fn handle(
134
3
        session: HandlerSession<'_, B>,
135
3
        _command: ListAvailableSchemas,
136
3
    ) -> HandlerResult<ListAvailableSchemas> {
137
3
        session
138
3
            .as_client
139
3
            .list_available_schemas()
140
3
            .await
141
3
            .map_err(HandlerError::from)
142
9
    }
143
}
144

            
145
#[async_trait]
146
impl<B: Backend> Handler<CreateUser, B> for ServerDispatcher {
147
9
    async fn handle(
148
9
        session: HandlerSession<'_, B>,
149
9
        command: CreateUser,
150
9
    ) -> HandlerResult<CreateUser> {
151
9
        session
152
9
            .as_client
153
9
            .create_user(&command.username)
154
8
            .await
155
9
            .map_err(HandlerError::from)
156
27
    }
157
}
158

            
159
#[async_trait]
160
impl<B: Backend> Handler<DeleteUser, B> for ServerDispatcher {
161
3
    async fn handle(
162
3
        session: HandlerSession<'_, B>,
163
3
        command: DeleteUser,
164
3
    ) -> HandlerResult<DeleteUser> {
165
3
        session
166
3
            .as_client
167
3
            .delete_user(command.user)
168
3
            .await
169
3
            .map_err(HandlerError::from)
170
9
    }
171
}
172

            
173
#[cfg(feature = "password-hashing")]
174
#[async_trait]
175
impl<B: Backend> Handler<SetUserPassword, B> for ServerDispatcher {
176
    async fn handle(
177
        session: HandlerSession<'_, B>,
178
        command: SetUserPassword,
179
    ) -> HandlerResult<SetUserPassword> {
180
        session
181
            .as_client
182
            .set_user_password(command.user, command.password)
183
            .await
184
            .map_err(HandlerError::from)
185
    }
186
}
187

            
188
#[cfg(feature = "password-hashing")]
189
#[async_trait]
190
impl<B: Backend> Handler<Authenticate, B> for ServerDispatcher {
191
19
    async fn handle(
192
19
        session: HandlerSession<'_, B>,
193
19
        command: Authenticate,
194
19
    ) -> HandlerResult<Authenticate> {
195
19
        let authenticated = session
196
19
            .as_client
197
19
            .authenticate(command.authentication)
198
16
            .await?;
199
19
        let new_session = authenticated.session().cloned().unwrap();
200
19

            
201
19
        session.client.logged_in_as(new_session.clone());
202

            
203
19
        if let Err(err) = session
204
19
            .server
205
19
            .backend()
206
19
            .client_authenticated(session.client.clone(), &new_session, session.server)
207
            .await
208
        {
209
            log::error!("[server] Error in `client_authenticated`: {err:?}");
210
19
        }
211

            
212
19
        Ok(new_session)
213
57
    }
214
}
215

            
216
#[async_trait]
217
impl<B: Backend> Handler<AssumeIdentity, B> for ServerDispatcher {
218
2
    async fn handle(
219
2
        session: HandlerSession<'_, B>,
220
2
        command: AssumeIdentity,
221
2
    ) -> HandlerResult<AssumeIdentity> {
222
2
        let authenticated = session.as_client.assume_identity(command.0).await?;
223
2
        let new_session = authenticated.session().cloned().unwrap();
224
2

            
225
2
        session.client.logged_in_as(new_session.clone());
226

            
227
2
        if let Err(err) = session
228
2
            .server
229
2
            .backend()
230
2
            .client_authenticated(session.client.clone(), &new_session, session.server)
231
            .await
232
        {
233
            log::error!("[server] Error in `client_authenticated`: {err:?}");
234
2
        }
235

            
236
2
        Ok(new_session)
237
6
    }
238
}
239

            
240
#[async_trait]
241
impl<B: Backend> Handler<LogOutSession, B> for ServerDispatcher {
242
3
    async fn handle(
243
3
        session: HandlerSession<'_, B>,
244
3
        command: LogOutSession,
245
3
    ) -> HandlerResult<LogOutSession> {
246
3
        if let Some(logged_out) = session.client.log_out(command.0) {
247
3
            if let Err(err) = session
248
3
                .server
249
3
                .backend()
250
3
                .client_session_ended(logged_out, session.client, false, session.server)
251
                .await
252
            {
253
                log::error!("[server] Error in `client_session_ended`: {err:?}");
254
3
            }
255
        }
256

            
257
3
        Ok(())
258
9
    }
259
}
260

            
261
#[async_trait]
262
impl<B: Backend> Handler<AlterUserPermissionGroupMembership, B> for ServerDispatcher {
263
12
    async fn handle(
264
12
        session: HandlerSession<'_, B>,
265
12
        command: AlterUserPermissionGroupMembership,
266
12
    ) -> HandlerResult<AlterUserPermissionGroupMembership> {
267
12
        if command.should_be_member {
268
6
            session
269
6
                .as_client
270
6
                .add_permission_group_to_user(command.user, command.group)
271
6
                .await
272
6
                .map_err(HandlerError::from)
273
        } else {
274
6
            session
275
6
                .as_client
276
6
                .remove_permission_group_from_user(command.user, command.group)
277
5
                .await
278
6
                .map_err(HandlerError::from)
279
        }
280
36
    }
281
}
282

            
283
#[async_trait]
284
impl<B: Backend> Handler<AlterUserRoleMembership, B> for ServerDispatcher {
285
12
    async fn handle(
286
12
        session: HandlerSession<'_, B>,
287
12
        command: AlterUserRoleMembership,
288
12
    ) -> HandlerResult<AlterUserRoleMembership> {
289
12
        if command.should_be_member {
290
6
            session
291
6
                .as_client
292
6
                .add_role_to_user(command.user, command.role)
293
6
                .await
294
6
                .map_err(HandlerError::from)
295
        } else {
296
6
            session
297
6
                .as_client
298
6
                .remove_role_from_user(command.user, command.role)
299
6
                .await
300
6
                .map_err(HandlerError::from)
301
        }
302
36
    }
303
}
304

            
305
#[async_trait]
306
impl<B: Backend> Handler<Get, B> for ServerDispatcher {
307
9010
    async fn handle(session: HandlerSession<'_, B>, command: Get) -> HandlerResult<Get> {
308
9010
        let database = session
309
9010
            .as_client
310
9010
            .database_without_schema(&command.database)
311
8414
            .await?;
312
9010
        database
313
9010
            .get_from_collection(command.id, &command.collection)
314
8239
            .await
315
9010
            .map_err(HandlerError::from)
316
27030
    }
317
}
318

            
319
#[async_trait]
320
impl<B: Backend> Handler<GetMultiple, B> for ServerDispatcher {
321
4628
    async fn handle(
322
4628
        session: HandlerSession<'_, B>,
323
4628
        command: GetMultiple,
324
4628
    ) -> HandlerResult<GetMultiple> {
325
4628
        let database = session
326
4628
            .as_client
327
4628
            .database_without_schema(&command.database)
328
4260
            .await?;
329
4628
        database
330
4628
            .get_multiple_from_collection(&command.ids, &command.collection)
331
4252
            .await
332
4628
            .map_err(HandlerError::from)
333
13884
    }
334
}
335

            
336
#[async_trait]
337
impl<B: Backend> Handler<List, B> for ServerDispatcher {
338
12
    async fn handle(session: HandlerSession<'_, B>, command: List) -> HandlerResult<List> {
339
12
        let database = session
340
12
            .as_client
341
12
            .database_without_schema(&command.database)
342
12
            .await?;
343
12
        database
344
12
            .list_from_collection(
345
12
                command.ids,
346
12
                command.order,
347
12
                command.limit,
348
12
                &command.collection,
349
12
            )
350
12
            .await
351
12
            .map_err(HandlerError::from)
352
36
    }
353
}
354

            
355
#[async_trait]
356
impl<B: Backend> Handler<ListHeaders, B> for ServerDispatcher {
357
3
    async fn handle(
358
3
        session: HandlerSession<'_, B>,
359
3
        command: ListHeaders,
360
3
    ) -> HandlerResult<ListHeaders> {
361
3
        let database = session
362
3
            .as_client
363
3
            .database_without_schema(&command.0.database)
364
3
            .await?;
365
3
        database
366
3
            .list_headers_from_collection(
367
3
                command.0.ids,
368
3
                command.0.order,
369
3
                command.0.limit,
370
3
                &command.0.collection,
371
3
            )
372
3
            .await
373
3
            .map_err(HandlerError::from)
374
9
    }
375
}
376

            
377
#[async_trait]
378
impl<B: Backend> Handler<Count, B> for ServerDispatcher {
379
6
    async fn handle(session: HandlerSession<'_, B>, command: Count) -> HandlerResult<Count> {
380
6
        let database = session
381
6
            .as_client
382
6
            .database_without_schema(&command.database)
383
6
            .await?;
384
6
        database
385
6
            .count_from_collection(command.ids, &command.collection)
386
6
            .await
387
6
            .map_err(HandlerError::from)
388
18
    }
389
}
390

            
391
#[async_trait]
392
impl<B: Backend> Handler<Query, B> for ServerDispatcher {
393
4719
    async fn handle(session: HandlerSession<'_, B>, command: Query) -> HandlerResult<Query> {
394
4719
        let database = session
395
4719
            .as_client
396
4719
            .database_without_schema(&command.database)
397
4383
            .await?;
398
4719
        database
399
4719
            .query_by_name(
400
4719
                &command.view,
401
4719
                command.key,
402
4719
                command.order,
403
4719
                command.limit,
404
4719
                command.access_policy,
405
4719
            )
406
4411
            .await
407
4719
            .map_err(HandlerError::from)
408
14157
    }
409
}
410

            
411
#[async_trait]
412
impl<B: Backend> Handler<QueryWithDocs, B> for ServerDispatcher {
413
    async fn handle(
414
        session: HandlerSession<'_, B>,
415
        command: QueryWithDocs,
416
    ) -> HandlerResult<QueryWithDocs> {
417
        let database = session
418
            .as_client
419
            .database_without_schema(&command.0.database)
420
            .await?;
421
        database
422
            .query_by_name_with_docs(
423
                &command.0.view,
424
                command.0.key,
425
                command.0.order,
426
                command.0.limit,
427
                command.0.access_policy,
428
            )
429
            .await
430
            .map_err(HandlerError::from)
431
    }
432
}
433

            
434
#[async_trait]
435
impl<B: Backend> Handler<Reduce, B> for ServerDispatcher {
436
9210
    async fn handle(session: HandlerSession<'_, B>, command: Reduce) -> HandlerResult<Reduce> {
437
9210
        let database = session
438
9210
            .as_client
439
9210
            .database_without_schema(&command.database)
440
8525
            .await?;
441
9210
        database
442
9210
            .reduce_by_name(&command.view, command.key, command.access_policy)
443
8435
            .await
444
9210
            .map(Bytes::from)
445
9210
            .map_err(HandlerError::from)
446
27630
    }
447
}
448

            
449
#[async_trait]
450
impl<B: Backend> Handler<ReduceGrouped, B> for ServerDispatcher {
451
8
    async fn handle(
452
8
        session: HandlerSession<'_, B>,
453
8
        command: ReduceGrouped,
454
8
    ) -> HandlerResult<ReduceGrouped> {
455
8
        let database = session
456
8
            .as_client
457
8
            .database_without_schema(&command.0.database)
458
8
            .await?;
459
8
        database
460
8
            .reduce_grouped_by_name(&command.0.view, command.0.key, command.0.access_policy)
461
8
            .await
462
8
            .map_err(HandlerError::from)
463
24
    }
464
}
465

            
466
#[async_trait]
467
impl<B: Backend> Handler<ApplyTransaction, B> for ServerDispatcher {
468
11740
    async fn handle(
469
11740
        session: HandlerSession<'_, B>,
470
11740
        command: ApplyTransaction,
471
11740
    ) -> HandlerResult<ApplyTransaction> {
472
11740
        let database = session
473
11740
            .as_client
474
11740
            .database_without_schema(&command.database)
475
11145
            .await?;
476
11740
        database
477
11740
            .apply_transaction(command.transaction)
478
11723
            .await
479
11740
            .map_err(HandlerError::from)
480
35220
    }
481
}
482

            
483
#[async_trait]
484
impl<B: Backend> Handler<DeleteDocs, B> for ServerDispatcher {
485
6
    async fn handle(
486
6
        session: HandlerSession<'_, B>,
487
6
        command: DeleteDocs,
488
6
    ) -> HandlerResult<DeleteDocs> {
489
6
        let database = session
490
6
            .as_client
491
6
            .database_without_schema(&command.database)
492
6
            .await?;
493
6
        database
494
6
            .delete_docs_by_name(&command.view, command.key, command.access_policy)
495
6
            .await
496
6
            .map_err(HandlerError::from)
497
18
    }
498
}
499

            
500
#[async_trait]
501
impl<B: Backend> Handler<ListExecutedTransactions, B> for ServerDispatcher {
502
1054
    async fn handle(
503
1054
        session: HandlerSession<'_, B>,
504
1054
        command: ListExecutedTransactions,
505
1054
    ) -> HandlerResult<ListExecutedTransactions> {
506
1054
        let database = session
507
1054
            .as_client
508
1054
            .database_without_schema(&command.database)
509
1030
            .await?;
510
1054
        database
511
1054
            .list_executed_transactions(command.starting_id, command.result_limit)
512
968
            .await
513
1054
            .map_err(HandlerError::from)
514
3162
    }
515
}
516

            
517
#[async_trait]
518
impl<B: Backend> Handler<LastTransactionId, B> for ServerDispatcher {
519
3
    async fn handle(
520
3
        session: HandlerSession<'_, B>,
521
3
        command: LastTransactionId,
522
3
    ) -> HandlerResult<LastTransactionId> {
523
3
        let database = session
524
3
            .as_client
525
3
            .database_without_schema(&command.database)
526
3
            .await?;
527
3
        database
528
3
            .last_transaction_id()
529
            .await
530
3
            .map_err(HandlerError::from)
531
9
    }
532
}
533

            
534
#[async_trait]
535
impl<B: Backend> Handler<CreateSubscriber, B> for ServerDispatcher {
536
24
    async fn handle(
537
24
        session: HandlerSession<'_, B>,
538
24
        command: CreateSubscriber,
539
24
    ) -> HandlerResult<CreateSubscriber> {
540
24
        let database = session
541
24
            .as_client
542
24
            .database_without_schema(&command.database)
543
24
            .await?;
544
24
        let subscriber = database.create_subscriber().await?;
545
24
        let subscriber_id = subscriber.id();
546
24

            
547
24
        session.client.register_subscriber(
548
24
            subscriber,
549
24
            session.as_client.session().and_then(|session| session.id),
550
24
        );
551
24

            
552
24
        Ok(subscriber_id)
553
72
    }
554
}
555

            
556
#[async_trait]
557
impl<B: Backend> Handler<Publish, B> for ServerDispatcher {
558
30
    async fn handle(session: HandlerSession<'_, B>, command: Publish) -> HandlerResult<Publish> {
559
30
        let database = session
560
30
            .as_client
561
30
            .database_without_schema(&command.database)
562
29
            .await?;
563
30
        database
564
30
            .publish_bytes(command.topic.into_vec(), command.payload.into_vec())
565
            .await
566
30
            .map_err(HandlerError::from)
567
90
    }
568
}
569

            
570
#[async_trait]
571
impl<B: Backend> Handler<PublishToAll, B> for ServerDispatcher {
572
3
    async fn handle(
573
3
        session: HandlerSession<'_, B>,
574
3
        command: PublishToAll,
575
3
    ) -> HandlerResult<PublishToAll> {
576
3
        let database = session
577
3
            .as_client
578
3
            .database_without_schema(&command.database)
579
2
            .await?;
580
3
        database
581
3
            .publish_bytes_to_all(
582
3
                command.topics.into_iter().map(Bytes::into_vec),
583
3
                command.payload.into_vec(),
584
3
            )
585
            .await
586
3
            .map_err(HandlerError::from)
587
9
    }
588
}
589

            
590
#[async_trait]
591
impl<B: Backend> Handler<SubscribeTo, B> for ServerDispatcher {
592
39
    async fn handle(
593
39
        session: HandlerSession<'_, B>,
594
39
        command: SubscribeTo,
595
39
    ) -> HandlerResult<SubscribeTo> {
596
39
        session
597
39
            .client
598
39
            .subscribe_by_id(
599
39
                command.subscriber_id,
600
39
                command.topic,
601
39
                session.as_client.session().and_then(|session| session.id),
602
39
            )
603
39
            .map_err(HandlerError::from)
604
78
    }
605
}
606

            
607
#[async_trait]
608
impl<B: Backend> Handler<UnsubscribeFrom, B> for ServerDispatcher {
609
3
    async fn handle(
610
3
        session: HandlerSession<'_, B>,
611
3
        command: UnsubscribeFrom,
612
3
    ) -> HandlerResult<UnsubscribeFrom> {
613
3
        session
614
3
            .client
615
3
            .unsubscribe_by_id(
616
3
                command.subscriber_id,
617
3
                &command.topic,
618
3
                session.as_client.session().and_then(|session| session.id),
619
3
            )
620
3
            .map_err(HandlerError::from)
621
6
    }
622
}
623

            
624
#[async_trait]
625
impl<B: Backend> Handler<UnregisterSubscriber, B> for ServerDispatcher {
626
10
    async fn handle(
627
10
        session: HandlerSession<'_, B>,
628
10
        command: UnregisterSubscriber,
629
10
    ) -> HandlerResult<UnregisterSubscriber> {
630
10
        session
631
10
            .client
632
10
            .unregister_subscriber_by_id(
633
10
                command.subscriber_id,
634
10
                session.as_client.session().and_then(|session| session.id),
635
10
            )
636
10
            .map_err(HandlerError::from)
637
20
    }
638
}
639

            
640
#[async_trait]
641
impl<B: Backend> Handler<ExecuteKeyOperation, B> for ServerDispatcher {
642
30321
    async fn handle(
643
30321
        session: HandlerSession<'_, B>,
644
30321
        command: ExecuteKeyOperation,
645
30321
    ) -> HandlerResult<ExecuteKeyOperation> {
646
30321
        let database = session
647
30321
            .as_client
648
30321
            .database_without_schema(&command.database)
649
28283
            .await?;
650
30321
        database
651
30321
            .execute_key_operation(command.op)
652
26948
            .await
653
30321
            .map_err(HandlerError::from)
654
90963
    }
655
}
656

            
657
#[async_trait]
658
impl<B: Backend> Handler<CompactCollection, B> for ServerDispatcher {
659
3
    async fn handle(
660
3
        session: HandlerSession<'_, B>,
661
3
        command: CompactCollection,
662
3
    ) -> HandlerResult<CompactCollection> {
663
3
        let database = session
664
3
            .as_client
665
3
            .database_without_schema(&command.database)
666
3
            .await?;
667
3
        database
668
3
            .compact_collection_by_name(command.name)
669
3
            .await
670
3
            .map_err(HandlerError::from)
671
9
    }
672
}
673

            
674
#[async_trait]
675
impl<B: Backend> Handler<CompactKeyValueStore, B> for ServerDispatcher {
676
3
    async fn handle(
677
3
        session: HandlerSession<'_, B>,
678
3
        command: CompactKeyValueStore,
679
3
    ) -> HandlerResult<CompactKeyValueStore> {
680
3
        let database = session
681
3
            .as_client
682
3
            .database_without_schema(&command.database)
683
2
            .await?;
684
3
        database
685
3
            .compact_key_value_store()
686
3
            .await
687
3
            .map_err(HandlerError::from)
688
9
    }
689
}
690

            
691
#[async_trait]
692
impl<B: Backend> Handler<Compact, B> for ServerDispatcher {
693
3
    async fn handle(client: HandlerSession<'_, B>, command: Compact) -> HandlerResult<Compact> {
694
3
        let database = client
695
3
            .as_client
696
3
            .database_without_schema(&command.database)
697
3
            .await?;
698
3
        database.compact().await.map_err(HandlerError::from)
699
9
    }
700
}