1
use std::{ops::Deref, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use bonsaidb_core::{
5
    connection::{AccessPolicy, Connection, QueryKey, Range, Sort},
6
    custom_api::CustomApi,
7
    document::{AnyDocumentId, OwnedDocument},
8
    key::Key,
9
    networking::{DatabaseRequest, DatabaseResponse, Request, Response},
10
    schema::{
11
        view::{
12
            self,
13
            map::{self, MappedDocuments},
14
            SerializedView,
15
        },
16
        Collection, Map, MappedValue, Schematic,
17
    },
18
    transaction::{Executed, OperationResult, Transaction},
19
};
20
use derive_where::derive_where;
21

            
22
use crate::Client;
23

            
24
mod pubsub;
25
pub use pubsub::*;
26

            
27
mod keyvalue;
28

            
29
/// A database on a remote server.
30
#[derive(Debug)]
31
258
#[derive_where(Clone)]
32
pub struct RemoteDatabase<A: CustomApi = ()> {
33
    client: Client<A>,
34
    name: Arc<String>,
35
    schema: Arc<Schematic>,
36
}
37
impl<A: CustomApi> RemoteDatabase<A> {
38
    /// Returns the name of the database.
39
    #[must_use]
40
    pub fn name(&self) -> &str {
41
        self.name.as_ref()
42
    }
43
}
44

            
45
impl<A: CustomApi> Deref for RemoteDatabase<A> {
46
    type Target = Client<A>;
47

            
48
6
    fn deref(&self) -> &Self::Target {
49
6
        &self.client
50
6
    }
51
}
52

            
53
impl<A: CustomApi> RemoteDatabase<A> {
54
616
    pub(crate) fn new(client: Client<A>, name: String, schema: Arc<Schematic>) -> Self {
55
616
        Self {
56
616
            client,
57
616
            name: Arc::new(name),
58
616
            schema,
59
616
        }
60
616
    }
61
}
62

            
63
#[async_trait]
64
impl<A: CustomApi> Connection for RemoteDatabase<A> {
65
7894
    async fn get<C, PrimaryKey>(
66
7894
        &self,
67
7894
        id: PrimaryKey,
68
7894
    ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error>
69
7894
    where
70
7894
        C: Collection,
71
7894
        PrimaryKey: Into<AnyDocumentId<C::PrimaryKey>> + Send,
72
7894
    {
73
        match self
74
            .client
75
            .send_request(Request::Database {
76
7894
                database: self.name.to_string(),
77
7894
                request: DatabaseRequest::Get {
78
7894
                    collection: C::collection_name(),
79
7894
                    id: id.into().to_document_id()?,
80
                },
81
11622
            })
82
11622
            .await?
83
        {
84
7390
            Response::Database(DatabaseResponse::Documents(documents)) => {
85
7390
                Ok(documents.into_iter().next())
86
            }
87
504
            Response::Error(bonsaidb_core::Error::DocumentNotFound(_, _)) => Ok(None),
88
            Response::Error(err) => Err(err),
89
            other => Err(bonsaidb_core::Error::Networking(
90
                bonsaidb_core::networking::Error::UnexpectedResponse(format!("{:?}", other)),
91
            )),
92
        }
93
15788
    }
94

            
95
4
    async fn get_multiple<C, PrimaryKey, DocumentIds, I>(
96
4
        &self,
97
4
        ids: DocumentIds,
98
4
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error>
99
4
    where
100
4
        C: Collection,
101
4
        DocumentIds: IntoIterator<Item = PrimaryKey, IntoIter = I> + Send + Sync,
102
4
        I: Iterator<Item = PrimaryKey> + Send + Sync,
103
4
        PrimaryKey: Into<AnyDocumentId<C::PrimaryKey>> + Send + Sync,
104
4
    {
105
        match self
106
            .client
107
            .send_request(Request::Database {
108
4
                database: self.name.to_string(),
109
4
                request: DatabaseRequest::GetMultiple {
110
4
                    collection: C::collection_name(),
111
4
                    ids: ids
112
4
                        .into_iter()
113
8
                        .map(|id| id.into().to_document_id())
114
4
                        .collect::<Result<Vec<_>, _>>()?,
115
                },
116
4
            })
117
4
            .await?
118
        {
119
4
            Response::Database(DatabaseResponse::Documents(documents)) => Ok(documents),
120
            Response::Error(err) => Err(err),
121
            other => Err(bonsaidb_core::Error::Networking(
122
                bonsaidb_core::networking::Error::UnexpectedResponse(format!("{:?}", other)),
123
            )),
124
        }
125
8
    }
126

            
127
8
    async fn list<C, R, PrimaryKey>(
128
8
        &self,
129
8
        ids: R,
130
8
        order: Sort,
131
8
        limit: Option<usize>,
132
8
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error>
133
8
    where
134
8
        C: Collection,
135
8
        R: Into<Range<PrimaryKey>> + Send,
136
8
        PrimaryKey: Into<AnyDocumentId<C::PrimaryKey>> + Send,
137
8
    {
138
        match self
139
            .client
140
            .send_request(Request::Database {
141
8
                database: self.name.to_string(),
142
8
                request: DatabaseRequest::List {
143
8
                    collection: C::collection_name(),
144
12
                    ids: ids.into().map_result(|id| id.into().to_document_id())?,
145
8
                    order,
146
8
                    limit,
147
                },
148
8
            })
149
8
            .await?
150
        {
151
8
            Response::Database(DatabaseResponse::Documents(documents)) => Ok(documents),
152
            Response::Error(err) => Err(err),
153
            other => Err(bonsaidb_core::Error::Networking(
154
                bonsaidb_core::networking::Error::UnexpectedResponse(format!("{:?}", other)),
155
            )),
156
        }
157
16
    }
158

            
159
50
    async fn query<V: SerializedView>(
160
50
        &self,
161
50
        key: Option<QueryKey<V::Key>>,
162
50
        order: Sort,
163
50
        limit: Option<usize>,
164
50
        access_policy: AccessPolicy,
165
50
    ) -> Result<Vec<Map<V::Key, V::Value>>, bonsaidb_core::Error>
166
50
    where
167
50
        Self: Sized,
168
50
    {
169
        match self
170
            .client
171
            .send_request(Request::Database {
172
50
                database: self.name.to_string(),
173
50
                request: DatabaseRequest::Query {
174
50
                    view: self
175
50
                        .schema
176
50
                        .view::<V>()
177
50
                        .ok_or(bonsaidb_core::Error::CollectionNotFound)?
178
50
                        .view_name(),
179
50
                    key: key.map(|key| key.serialized()).transpose()?,
180
50
                    order,
181
50
                    limit,
182
50
                    access_policy,
183
                    with_docs: false,
184
                },
185
50
            })
186
50
            .await?
187
        {
188
50
            Response::Database(DatabaseResponse::ViewMappings(mappings)) => Ok(mappings
189
50
                .iter()
190
50
                .map(map::Serialized::deserialized::<V>)
191
50
                .collect::<Result<Vec<_>, _>>()
192
50
                .map_err(|err| bonsaidb_core::Error::Database(err.to_string()))?),
193
            Response::Error(err) => Err(err),
194
            other => Err(bonsaidb_core::Error::Networking(
195
                bonsaidb_core::networking::Error::UnexpectedResponse(format!("{:?}", other)),
196
            )),
197
        }
198
100
    }
199

            
200
3840
    async fn query_with_docs<V: SerializedView>(
201
3840
        &self,
202
3840
        key: Option<QueryKey<V::Key>>,
203
3840
        order: Sort,
204
3840
        limit: Option<usize>,
205
3840
        access_policy: AccessPolicy,
206
3840
    ) -> Result<MappedDocuments<OwnedDocument, V>, bonsaidb_core::Error>
207
3840
    where
208
3840
        Self: Sized,
209
3840
    {
210
        match self
211
            .client
212
            .send_request(Request::Database {
213
3840
                database: self.name.to_string(),
214
3840
                request: DatabaseRequest::Query {
215
3840
                    view: self
216
3840
                        .schema
217
3840
                        .view::<V>()
218
3840
                        .ok_or(bonsaidb_core::Error::CollectionNotFound)?
219
3840
                        .view_name(),
220
3840
                    key: key.map(|key| key.serialized()).transpose()?,
221
3840
                    order,
222
3840
                    limit,
223
3840
                    access_policy,
224
                    with_docs: true,
225
                },
226
3840
            })
227
3840
            .await?
228
        {
229
3840
            Response::Database(DatabaseResponse::ViewMappingsWithDocs(mappings)) => Ok(mappings
230
3840
                .deserialized::<V>()
231
3840
                .map_err(|err| bonsaidb_core::Error::Database(err.to_string()))?),
232
            Response::Error(err) => Err(err),
233
            other => Err(bonsaidb_core::Error::Networking(
234
                bonsaidb_core::networking::Error::UnexpectedResponse(format!("{:?}", other)),
235
            )),
236
        }
237
7680
    }
238

            
239
7709
    async fn reduce<V: SerializedView>(
240
7709
        &self,
241
7709
        key: Option<QueryKey<V::Key>>,
242
7709
        access_policy: AccessPolicy,
243
7709
    ) -> Result<V::Value, bonsaidb_core::Error>
244
7709
    where
245
7709
        Self: Sized,
246
7709
    {
247
        match self
248
            .client
249
            .send_request(Request::Database {
250
7709
                database: self.name.to_string(),
251
7709
                request: DatabaseRequest::Reduce {
252
7709
                    view: self
253
7709
                        .schema
254
7709
                        .view::<V>()
255
7709
                        .ok_or(bonsaidb_core::Error::CollectionNotFound)?
256
7709
                        .view_name(),
257
7709
                    key: key.map(|key| key.serialized()).transpose()?,
258
7709
                    access_policy,
259
                    grouped: false,
260
                },
261
7709
            })
262
7709
            .await?
263
        {
264
7707
            Response::Database(DatabaseResponse::ViewReduction(value)) => {
265
7707
                let value = V::deserialize(&value)?;
266
7707
                Ok(value)
267
            }
268
2
            Response::Error(err) => Err(err),
269
            other => Err(bonsaidb_core::Error::Networking(
270
                bonsaidb_core::networking::Error::UnexpectedResponse(format!("{:?}", other)),
271
            )),
272
        }
273
15418
    }
274

            
275
6
    async fn reduce_grouped<V: SerializedView>(
276
6
        &self,
277
6
        key: Option<QueryKey<V::Key>>,
278
6
        access_policy: AccessPolicy,
279
6
    ) -> Result<Vec<MappedValue<V::Key, V::Value>>, bonsaidb_core::Error>
280
6
    where
281
6
        Self: Sized,
282
6
    {
283
        match self
284
            .client
285
            .send_request(Request::Database {
286
6
                database: self.name.to_string(),
287
6
                request: DatabaseRequest::Reduce {
288
6
                    view: self
289
6
                        .schema
290
6
                        .view::<V>()
291
6
                        .ok_or(bonsaidb_core::Error::CollectionNotFound)?
292
6
                        .view_name(),
293
6
                    key: key.map(|key| key.serialized()).transpose()?,
294
6
                    access_policy,
295
                    grouped: true,
296
                },
297
9
            })
298
9
            .await?
299
        {
300
6
            Response::Database(DatabaseResponse::ViewGroupedReduction(values)) => values
301
6
                .into_iter()
302
22
                .map(|map| {
303
22
                    Ok(MappedValue::new(
304
22
                        V::Key::from_ord_bytes(&map.key).map_err(|err| {
305
                            bonsaidb_core::Error::Database(
306
                                view::Error::key_serialization(err).to_string(),
307
                            )
308
22
                        })?,
309
22
                        V::deserialize(&map.value)?,
310
                    ))
311
22
                })
312
6
                .collect::<Result<Vec<_>, bonsaidb_core::Error>>(),
313
            Response::Error(err) => Err(err),
314
            other => Err(bonsaidb_core::Error::Networking(
315
                bonsaidb_core::networking::Error::UnexpectedResponse(format!("{:?}", other)),
316
            )),
317
        }
318
12
    }
319

            
320
2
    async fn delete_docs<V: SerializedView>(
321
2
        &self,
322
2
        key: Option<QueryKey<V::Key>>,
323
2
        access_policy: AccessPolicy,
324
2
    ) -> Result<u64, bonsaidb_core::Error>
325
2
    where
326
2
        Self: Sized,
327
2
    {
328
        match self
329
            .client
330
            .send_request(Request::Database {
331
2
                database: self.name.to_string(),
332
2
                request: DatabaseRequest::DeleteDocs {
333
2
                    view: self
334
2
                        .schema
335
2
                        .view::<V>()
336
2
                        .ok_or(bonsaidb_core::Error::CollectionNotFound)?
337
2
                        .view_name(),
338
2
                    key: key.map(|key| key.serialized()).transpose()?,
339
2
                    access_policy,
340
                },
341
2
            })
342
2
            .await?
343
        {
344
2
            Response::Database(DatabaseResponse::DocumentsDeleted(count)) => Ok(count),
345
            Response::Error(err) => Err(err),
346
            other => Err(bonsaidb_core::Error::Networking(
347
                bonsaidb_core::networking::Error::UnexpectedResponse(format!("{:?}", other)),
348
            )),
349
        }
350
4
    }
351

            
352
9977
    async fn apply_transaction(
353
9977
        &self,
354
9977
        transaction: Transaction,
355
9977
    ) -> Result<Vec<OperationResult>, bonsaidb_core::Error> {
356
9977
        match self
357
9977
            .client
358
9977
            .send_request(Request::Database {
359
9977
                database: self.name.to_string(),
360
9977
                request: DatabaseRequest::ApplyTransaction { transaction },
361
20103
            })
362
20103
            .await?
363
        {
364
9461
            Response::Database(DatabaseResponse::TransactionResults(results)) => Ok(results),
365
516
            Response::Error(err) => Err(err),
366
            other => Err(bonsaidb_core::Error::Networking(
367
                bonsaidb_core::networking::Error::UnexpectedResponse(format!("{:?}", other)),
368
            )),
369
        }
370
19954
    }
371

            
372
1036
    async fn list_executed_transactions(
373
1036
        &self,
374
1036
        starting_id: Option<u64>,
375
1036
        result_limit: Option<usize>,
376
1036
    ) -> Result<Vec<Executed>, bonsaidb_core::Error> {
377
1036
        match self
378
1036
            .client
379
1036
            .send_request(Request::Database {
380
1036
                database: self.name.to_string(),
381
1036
                request: DatabaseRequest::ListExecutedTransactions {
382
1036
                    starting_id,
383
1036
                    result_limit,
384
1036
                },
385
2895
            })
386
2895
            .await?
387
        {
388
1036
            Response::Database(DatabaseResponse::ExecutedTransactions(results)) => Ok(results),
389
            Response::Error(err) => Err(err),
390
            other => Err(bonsaidb_core::Error::Networking(
391
                bonsaidb_core::networking::Error::UnexpectedResponse(format!("{:?}", other)),
392
            )),
393
        }
394
2072
    }
395

            
396
2
    async fn last_transaction_id(&self) -> Result<Option<u64>, bonsaidb_core::Error> {
397
2
        match self
398
2
            .client
399
2
            .send_request(Request::Database {
400
2
                database: self.name.to_string(),
401
2
                request: DatabaseRequest::LastTransactionId,
402
2
            })
403
2
            .await?
404
        {
405
2
            Response::Database(DatabaseResponse::LastTransactionId(result)) => Ok(result),
406
            Response::Error(err) => Err(err),
407
            other => Err(bonsaidb_core::Error::Networking(
408
                bonsaidb_core::networking::Error::UnexpectedResponse(format!("{:?}", other)),
409
            )),
410
        }
411
4
    }
412

            
413
2
    async fn compact_collection<C: Collection>(&self) -> Result<(), bonsaidb_core::Error> {
414
2
        match self
415
2
            .send_request(Request::Database {
416
2
                database: self.name.to_string(),
417
2
                request: DatabaseRequest::CompactCollection {
418
2
                    name: C::collection_name(),
419
2
                },
420
2
            })
421
2
            .await?
422
        {
423
2
            Response::Ok => Ok(()),
424
            Response::Error(err) => Err(err),
425
            other => Err(bonsaidb_core::Error::Networking(
426
                bonsaidb_core::networking::Error::UnexpectedResponse(format!("{:?}", other)),
427
            )),
428
        }
429
4
    }
430

            
431
2
    async fn compact(&self) -> Result<(), bonsaidb_core::Error> {
432
2
        match self
433
2
            .send_request(Request::Database {
434
2
                database: self.name.to_string(),
435
2
                request: DatabaseRequest::Compact,
436
2
            })
437
2
            .await?
438
        {
439
2
            Response::Ok => Ok(()),
440
            Response::Error(err) => Err(err),
441
            other => Err(bonsaidb_core::Error::Networking(
442
                bonsaidb_core::networking::Error::UnexpectedResponse(format!("{:?}", other)),
443
            )),
444
        }
445
4
    }
446

            
447
2
    async fn compact_key_value_store(&self) -> Result<(), bonsaidb_core::Error> {
448
2
        match self
449
2
            .send_request(Request::Database {
450
2
                database: self.name.to_string(),
451
2
                request: DatabaseRequest::CompactKeyValueStore,
452
2
            })
453
2
            .await?
454
        {
455
2
            Response::Ok => Ok(()),
456
            Response::Error(err) => Err(err),
457
            other => Err(bonsaidb_core::Error::Networking(
458
                bonsaidb_core::networking::Error::UnexpectedResponse(format!("{:?}", other)),
459
            )),
460
        }
461
4
    }
462
}