1
use std::{ops::Deref, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use bonsaidb_core::{
5
    connection::{AccessPolicy, Connection, QueryKey, Range, Sort},
6
    custom_api::CustomApi,
7
    document::{AnyDocumentId, OwnedDocument},
8
    key::Key,
9
    networking::{DatabaseRequest, DatabaseResponse, Request, Response},
10
    schema::{
11
        view::{
12
            self,
13
            map::{self, MappedDocuments},
14
            SerializedView,
15
        },
16
        Collection, Map, MappedValue, Schematic,
17
    },
18
    transaction::{Executed, OperationResult, Transaction},
19
};
20
use derive_where::derive_where;
21

            
22
use crate::Client;
23

            
24
mod pubsub;
25
pub use pubsub::*;
26

            
27
mod keyvalue;
28

            
29
/// A database on a remote server.
30
#[derive(Debug)]
31
258
#[derive_where(Clone)]
32
pub struct RemoteDatabase<A: CustomApi = ()> {
33
    client: Client<A>,
34
    name: Arc<String>,
35
    schema: Arc<Schematic>,
36
}
37
impl<A: CustomApi> RemoteDatabase<A> {
38
    /// Returns the name of the database.
39
    #[must_use]
40
    pub fn name(&self) -> &str {
41
        self.name.as_ref()
42
    }
43
}
44

            
45
impl<A: CustomApi> Deref for RemoteDatabase<A> {
46
    type Target = Client<A>;
47

            
48
6
    fn deref(&self) -> &Self::Target {
49
6
        &self.client
50
6
    }
51
}
52

            
53
impl<A: CustomApi> RemoteDatabase<A> {
54
616
    pub(crate) fn new(client: Client<A>, name: String, schema: Arc<Schematic>) -> Self {
55
616
        Self {
56
616
            client,
57
616
            name: Arc::new(name),
58
616
            schema,
59
616
        }
60
616
    }
61
}
62

            
63
#[async_trait]
64
impl<A: CustomApi> Connection for RemoteDatabase<A> {
65
8154
    async fn get<C, PrimaryKey>(
66
8154
        &self,
67
8154
        id: PrimaryKey,
68
8154
    ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error>
69
8154
    where
70
8154
        C: Collection,
71
8154
        PrimaryKey: Into<AnyDocumentId<C::PrimaryKey>> + Send,
72
8154
    {
73
        match self
74
            .client
75
            .send_request(Request::Database {
76
8154
                database: self.name.to_string(),
77
8154
                request: DatabaseRequest::Get {
78
8154
                    collection: C::collection_name(),
79
8154
                    id: id.into().to_document_id()?,
80
                },
81
12261
            })
82
12261
            .await?
83
        {
84
7648
            Response::Database(DatabaseResponse::Documents(documents)) => {
85
7648
                Ok(documents.into_iter().next())
86
            }
87
506
            Response::Error(bonsaidb_core::Error::DocumentNotFound(_, _)) => Ok(None),
88
            Response::Error(err) => Err(err),
89
            other => Err(bonsaidb_core::Error::Networking(
90
                bonsaidb_core::networking::Error::UnexpectedResponse(format!("{:?}", other)),
91
            )),
92
        }
93
16308
    }
94

            
95
4
    async fn get_multiple<C, PrimaryKey, DocumentIds, I>(
96
4
        &self,
97
4
        ids: DocumentIds,
98
4
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error>
99
4
    where
100
4
        C: Collection,
101
4
        DocumentIds: IntoIterator<Item = PrimaryKey, IntoIter = I> + Send + Sync,
102
4
        I: Iterator<Item = PrimaryKey> + Send + Sync,
103
4
        PrimaryKey: Into<AnyDocumentId<C::PrimaryKey>> + Send + Sync,
104
4
    {
105
        match self
106
            .client
107
            .send_request(Request::Database {
108
4
                database: self.name.to_string(),
109
4
                request: DatabaseRequest::GetMultiple {
110
4
                    collection: C::collection_name(),
111
4
                    ids: ids
112
4
                        .into_iter()
113
8
                        .map(|id| id.into().to_document_id())
114
4
                        .collect::<Result<Vec<_>, _>>()?,
115
                },
116
4
            })
117
4
            .await?
118
        {
119
4
            Response::Database(DatabaseResponse::Documents(documents)) => Ok(documents),
120
            Response::Error(err) => Err(err),
121
            other => Err(bonsaidb_core::Error::Networking(
122
                bonsaidb_core::networking::Error::UnexpectedResponse(format!("{:?}", other)),
123
            )),
124
        }
125
8
    }
126

            
127
8
    async fn list<C, R, PrimaryKey>(
128
8
        &self,
129
8
        ids: R,
130
8
        order: Sort,
131
8
        limit: Option<usize>,
132
8
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error>
133
8
    where
134
8
        C: Collection,
135
8
        R: Into<Range<PrimaryKey>> + Send,
136
8
        PrimaryKey: Into<AnyDocumentId<C::PrimaryKey>> + Send,
137
8
    {
138
        match self
139
            .client
140
            .send_request(Request::Database {
141
8
                database: self.name.to_string(),
142
8
                request: DatabaseRequest::List {
143
8
                    collection: C::collection_name(),
144
12
                    ids: ids.into().map_result(|id| id.into().to_document_id())?,
145
8
                    order,
146
8
                    limit,
147
                },
148
8
            })
149
8
            .await?
150
        {
151
8
            Response::Database(DatabaseResponse::Documents(documents)) => Ok(documents),
152
            Response::Error(err) => Err(err),
153
            other => Err(bonsaidb_core::Error::Networking(
154
                bonsaidb_core::networking::Error::UnexpectedResponse(format!("{:?}", other)),
155
            )),
156
        }
157
16
    }
158

            
159
4
    async fn count<C, R, PrimaryKey>(&self, ids: R) -> Result<u64, bonsaidb_core::Error>
160
4
    where
161
4
        C: Collection,
162
4
        R: Into<Range<PrimaryKey>> + Send,
163
4
        PrimaryKey: Into<AnyDocumentId<C::PrimaryKey>> + Send,
164
4
    {
165
        match self
166
            .client
167
            .send_request(Request::Database {
168
4
                database: self.name.to_string(),
169
4
                request: DatabaseRequest::Count {
170
4
                    collection: C::collection_name(),
171
4
                    ids: ids.into().map_result(|id| id.into().to_document_id())?,
172
                },
173
4
            })
174
4
            .await?
175
        {
176
4
            Response::Database(DatabaseResponse::Count(count)) => Ok(count),
177
            Response::Error(err) => Err(err),
178
            other => Err(bonsaidb_core::Error::Networking(
179
                bonsaidb_core::networking::Error::UnexpectedResponse(format!("{:?}", other)),
180
            )),
181
        }
182
8
    }
183

            
184
51
    async fn query<V: SerializedView>(
185
51
        &self,
186
51
        key: Option<QueryKey<V::Key>>,
187
51
        order: Sort,
188
51
        limit: Option<usize>,
189
51
        access_policy: AccessPolicy,
190
51
    ) -> Result<Vec<Map<V::Key, V::Value>>, bonsaidb_core::Error>
191
51
    where
192
51
        Self: Sized,
193
51
    {
194
        match self
195
            .client
196
            .send_request(Request::Database {
197
51
                database: self.name.to_string(),
198
51
                request: DatabaseRequest::Query {
199
51
                    view: self
200
51
                        .schema
201
51
                        .view::<V>()
202
51
                        .ok_or(bonsaidb_core::Error::CollectionNotFound)?
203
51
                        .view_name(),
204
51
                    key: key.map(|key| key.serialized()).transpose()?,
205
51
                    order,
206
51
                    limit,
207
51
                    access_policy,
208
                    with_docs: false,
209
                },
210
51
            })
211
51
            .await?
212
        {
213
51
            Response::Database(DatabaseResponse::ViewMappings(mappings)) => Ok(mappings
214
51
                .iter()
215
51
                .map(map::Serialized::deserialized::<V>)
216
51
                .collect::<Result<Vec<_>, _>>()
217
51
                .map_err(|err| bonsaidb_core::Error::Database(err.to_string()))?),
218
            Response::Error(err) => Err(err),
219
            other => Err(bonsaidb_core::Error::Networking(
220
                bonsaidb_core::networking::Error::UnexpectedResponse(format!("{:?}", other)),
221
            )),
222
        }
223
102
    }
224

            
225
3878
    async fn query_with_docs<V: SerializedView>(
226
3878
        &self,
227
3878
        key: Option<QueryKey<V::Key>>,
228
3878
        order: Sort,
229
3878
        limit: Option<usize>,
230
3878
        access_policy: AccessPolicy,
231
3878
    ) -> Result<MappedDocuments<OwnedDocument, V>, bonsaidb_core::Error>
232
3878
    where
233
3878
        Self: Sized,
234
3878
    {
235
        match self
236
            .client
237
            .send_request(Request::Database {
238
3878
                database: self.name.to_string(),
239
3878
                request: DatabaseRequest::Query {
240
3878
                    view: self
241
3878
                        .schema
242
3878
                        .view::<V>()
243
3878
                        .ok_or(bonsaidb_core::Error::CollectionNotFound)?
244
3878
                        .view_name(),
245
3878
                    key: key.map(|key| key.serialized()).transpose()?,
246
3878
                    order,
247
3878
                    limit,
248
3878
                    access_policy,
249
                    with_docs: true,
250
                },
251
3878
            })
252
3878
            .await?
253
        {
254
3878
            Response::Database(DatabaseResponse::ViewMappingsWithDocs(mappings)) => Ok(mappings
255
3878
                .deserialized::<V>()
256
3878
                .map_err(|err| bonsaidb_core::Error::Database(err.to_string()))?),
257
            Response::Error(err) => Err(err),
258
            other => Err(bonsaidb_core::Error::Networking(
259
                bonsaidb_core::networking::Error::UnexpectedResponse(format!("{:?}", other)),
260
            )),
261
        }
262
7756
    }
263

            
264
7969
    async fn reduce<V: SerializedView>(
265
7969
        &self,
266
7969
        key: Option<QueryKey<V::Key>>,
267
7969
        access_policy: AccessPolicy,
268
7969
    ) -> Result<V::Value, bonsaidb_core::Error>
269
7969
    where
270
7969
        Self: Sized,
271
7969
    {
272
        match self
273
            .client
274
            .send_request(Request::Database {
275
7969
                database: self.name.to_string(),
276
7969
                request: DatabaseRequest::Reduce {
277
7969
                    view: self
278
7969
                        .schema
279
7969
                        .view::<V>()
280
7969
                        .ok_or(bonsaidb_core::Error::CollectionNotFound)?
281
7969
                        .view_name(),
282
7969
                    key: key.map(|key| key.serialized()).transpose()?,
283
7969
                    access_policy,
284
                    grouped: false,
285
                },
286
7969
            })
287
7969
            .await?
288
        {
289
7967
            Response::Database(DatabaseResponse::ViewReduction(value)) => {
290
7967
                let value = V::deserialize(&value)?;
291
7967
                Ok(value)
292
            }
293
2
            Response::Error(err) => Err(err),
294
            other => Err(bonsaidb_core::Error::Networking(
295
                bonsaidb_core::networking::Error::UnexpectedResponse(format!("{:?}", other)),
296
            )),
297
        }
298
15938
    }
299

            
300
6
    async fn reduce_grouped<V: SerializedView>(
301
6
        &self,
302
6
        key: Option<QueryKey<V::Key>>,
303
6
        access_policy: AccessPolicy,
304
6
    ) -> Result<Vec<MappedValue<V::Key, V::Value>>, bonsaidb_core::Error>
305
6
    where
306
6
        Self: Sized,
307
6
    {
308
        match self
309
            .client
310
            .send_request(Request::Database {
311
6
                database: self.name.to_string(),
312
6
                request: DatabaseRequest::Reduce {
313
6
                    view: self
314
6
                        .schema
315
6
                        .view::<V>()
316
6
                        .ok_or(bonsaidb_core::Error::CollectionNotFound)?
317
6
                        .view_name(),
318
6
                    key: key.map(|key| key.serialized()).transpose()?,
319
6
                    access_policy,
320
                    grouped: true,
321
                },
322
8
            })
323
8
            .await?
324
        {
325
6
            Response::Database(DatabaseResponse::ViewGroupedReduction(values)) => values
326
6
                .into_iter()
327
22
                .map(|map| {
328
22
                    Ok(MappedValue::new(
329
22
                        V::Key::from_ord_bytes(&map.key).map_err(|err| {
330
                            bonsaidb_core::Error::Database(
331
                                view::Error::key_serialization(err).to_string(),
332
                            )
333
22
                        })?,
334
22
                        V::deserialize(&map.value)?,
335
                    ))
336
22
                })
337
6
                .collect::<Result<Vec<_>, bonsaidb_core::Error>>(),
338
            Response::Error(err) => Err(err),
339
            other => Err(bonsaidb_core::Error::Networking(
340
                bonsaidb_core::networking::Error::UnexpectedResponse(format!("{:?}", other)),
341
            )),
342
        }
343
12
    }
344

            
345
2
    async fn delete_docs<V: SerializedView>(
346
2
        &self,
347
2
        key: Option<QueryKey<V::Key>>,
348
2
        access_policy: AccessPolicy,
349
2
    ) -> Result<u64, bonsaidb_core::Error>
350
2
    where
351
2
        Self: Sized,
352
2
    {
353
        match self
354
            .client
355
            .send_request(Request::Database {
356
2
                database: self.name.to_string(),
357
2
                request: DatabaseRequest::DeleteDocs {
358
2
                    view: self
359
2
                        .schema
360
2
                        .view::<V>()
361
2
                        .ok_or(bonsaidb_core::Error::CollectionNotFound)?
362
2
                        .view_name(),
363
2
                    key: key.map(|key| key.serialized()).transpose()?,
364
2
                    access_policy,
365
                },
366
2
            })
367
2
            .await?
368
        {
369
2
            Response::Database(DatabaseResponse::Count(count)) => Ok(count),
370
            Response::Error(err) => Err(err),
371
            other => Err(bonsaidb_core::Error::Networking(
372
                bonsaidb_core::networking::Error::UnexpectedResponse(format!("{:?}", other)),
373
            )),
374
        }
375
4
    }
376

            
377
10009
    async fn apply_transaction(
378
10009
        &self,
379
10009
        transaction: Transaction,
380
10009
    ) -> Result<Vec<OperationResult>, bonsaidb_core::Error> {
381
10009
        match self
382
10009
            .client
383
10009
            .send_request(Request::Database {
384
10009
                database: self.name.to_string(),
385
10009
                request: DatabaseRequest::ApplyTransaction { transaction },
386
21344
            })
387
21344
            .await?
388
        {
389
9493
            Response::Database(DatabaseResponse::TransactionResults(results)) => Ok(results),
390
516
            Response::Error(err) => Err(err),
391
            other => Err(bonsaidb_core::Error::Networking(
392
                bonsaidb_core::networking::Error::UnexpectedResponse(format!("{:?}", other)),
393
            )),
394
        }
395
20018
    }
396

            
397
1036
    async fn list_executed_transactions(
398
1036
        &self,
399
1036
        starting_id: Option<u64>,
400
1036
        result_limit: Option<usize>,
401
1036
    ) -> Result<Vec<Executed>, bonsaidb_core::Error> {
402
1036
        match self
403
1036
            .client
404
1036
            .send_request(Request::Database {
405
1036
                database: self.name.to_string(),
406
1036
                request: DatabaseRequest::ListExecutedTransactions {
407
1036
                    starting_id,
408
1036
                    result_limit,
409
1036
                },
410
3063
            })
411
3063
            .await?
412
        {
413
1036
            Response::Database(DatabaseResponse::ExecutedTransactions(results)) => Ok(results),
414
            Response::Error(err) => Err(err),
415
            other => Err(bonsaidb_core::Error::Networking(
416
                bonsaidb_core::networking::Error::UnexpectedResponse(format!("{:?}", other)),
417
            )),
418
        }
419
2072
    }
420

            
421
2
    async fn last_transaction_id(&self) -> Result<Option<u64>, bonsaidb_core::Error> {
422
2
        match self
423
2
            .client
424
2
            .send_request(Request::Database {
425
2
                database: self.name.to_string(),
426
2
                request: DatabaseRequest::LastTransactionId,
427
2
            })
428
2
            .await?
429
        {
430
2
            Response::Database(DatabaseResponse::LastTransactionId(result)) => Ok(result),
431
            Response::Error(err) => Err(err),
432
            other => Err(bonsaidb_core::Error::Networking(
433
                bonsaidb_core::networking::Error::UnexpectedResponse(format!("{:?}", other)),
434
            )),
435
        }
436
4
    }
437

            
438
2
    async fn compact_collection<C: Collection>(&self) -> Result<(), bonsaidb_core::Error> {
439
2
        match self
440
2
            .send_request(Request::Database {
441
2
                database: self.name.to_string(),
442
2
                request: DatabaseRequest::CompactCollection {
443
2
                    name: C::collection_name(),
444
2
                },
445
2
            })
446
2
            .await?
447
        {
448
2
            Response::Ok => Ok(()),
449
            Response::Error(err) => Err(err),
450
            other => Err(bonsaidb_core::Error::Networking(
451
                bonsaidb_core::networking::Error::UnexpectedResponse(format!("{:?}", other)),
452
            )),
453
        }
454
4
    }
455

            
456
2
    async fn compact(&self) -> Result<(), bonsaidb_core::Error> {
457
2
        match self
458
2
            .send_request(Request::Database {
459
2
                database: self.name.to_string(),
460
2
                request: DatabaseRequest::Compact,
461
2
            })
462
2
            .await?
463
        {
464
2
            Response::Ok => Ok(()),
465
            Response::Error(err) => Err(err),
466
            other => Err(bonsaidb_core::Error::Networking(
467
                bonsaidb_core::networking::Error::UnexpectedResponse(format!("{:?}", other)),
468
            )),
469
        }
470
4
    }
471

            
472
2
    async fn compact_key_value_store(&self) -> Result<(), bonsaidb_core::Error> {
473
2
        match self
474
2
            .send_request(Request::Database {
475
2
                database: self.name.to_string(),
476
2
                request: DatabaseRequest::CompactKeyValueStore,
477
2
            })
478
2
            .await?
479
        {
480
2
            Response::Ok => Ok(()),
481
            Response::Error(err) => Err(err),
482
            other => Err(bonsaidb_core::Error::Networking(
483
                bonsaidb_core::networking::Error::UnexpectedResponse(format!("{:?}", other)),
484
            )),
485
        }
486
4
    }
487
}