1
use std::{ops::Deref, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use bonsaidb_core::{
5
    connection::{AccessPolicy, Connection, QueryKey, Range, Sort},
6
    custom_api::CustomApi,
7
    document::OwnedDocument,
8
    networking::{DatabaseRequest, DatabaseResponse, Request, Response},
9
    schema::{
10
        view::{
11
            self,
12
            map::{self, MappedDocuments},
13
            SerializedView,
14
        },
15
        Collection, Key, Map, MappedValue, Schematic,
16
    },
17
    transaction::{Executed, OperationResult, Transaction},
18
};
19
use derive_where::derive_where;
20

            
21
use crate::Client;
22

            
23
mod pubsub;
24
pub use pubsub::*;
25

            
26
mod keyvalue;
27

            
28
/// A database on a remote server.
29
#[derive(Debug)]
30
258
#[derive_where(Clone)]
31
pub struct RemoteDatabase<A: CustomApi = ()> {
32
    client: Client<A>,
33
    name: Arc<String>,
34
    schema: Arc<Schematic>,
35
}
36
impl<A: CustomApi> RemoteDatabase<A> {
37
    /// Returns the name of the database.
38
    #[must_use]
39
    pub fn name(&self) -> &str {
40
        self.name.as_ref()
41
    }
42
}
43

            
44
impl<A: CustomApi> Deref for RemoteDatabase<A> {
45
    type Target = Client<A>;
46

            
47
6
    fn deref(&self) -> &Self::Target {
48
6
        &self.client
49
6
    }
50
}
51

            
52
impl<A: CustomApi> RemoteDatabase<A> {
53
616
    pub(crate) fn new(client: Client<A>, name: String, schema: Arc<Schematic>) -> Self {
54
616
        Self {
55
616
            client,
56
616
            name: Arc::new(name),
57
616
            schema,
58
616
        }
59
616
    }
60
}
61

            
62
#[async_trait]
63
impl<A: CustomApi> Connection for RemoteDatabase<A> {
64
8066
    async fn get<C: Collection>(
65
8066
        &self,
66
8066
        id: u64,
67
8066
    ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error> {
68
8066
        match self
69
8066
            .client
70
8066
            .send_request(Request::Database {
71
8066
                database: self.name.to_string(),
72
8066
                request: DatabaseRequest::Get {
73
8066
                    collection: C::collection_name(),
74
8066
                    id,
75
8066
                },
76
12098
            })
77
12098
            .await?
78
        {
79
7562
            Response::Database(DatabaseResponse::Documents(documents)) => {
80
7562
                Ok(documents.into_iter().next())
81
            }
82
504
            Response::Error(bonsaidb_core::Error::DocumentNotFound(_, _)) => Ok(None),
83
            Response::Error(err) => Err(err),
84
            other => Err(bonsaidb_core::Error::Networking(
85
                bonsaidb_core::networking::Error::UnexpectedResponse(format!("{:?}", other)),
86
            )),
87
        }
88
16132
    }
89

            
90
4
    async fn get_multiple<C: Collection>(
91
4
        &self,
92
4
        ids: &[u64],
93
4
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
94
4
        match self
95
4
            .client
96
4
            .send_request(Request::Database {
97
4
                database: self.name.to_string(),
98
4
                request: DatabaseRequest::GetMultiple {
99
4
                    collection: C::collection_name(),
100
4
                    ids: ids.to_vec(),
101
4
                },
102
4
            })
103
4
            .await?
104
        {
105
4
            Response::Database(DatabaseResponse::Documents(documents)) => Ok(documents),
106
            Response::Error(err) => Err(err),
107
            other => Err(bonsaidb_core::Error::Networking(
108
                bonsaidb_core::networking::Error::UnexpectedResponse(format!("{:?}", other)),
109
            )),
110
        }
111
8
    }
112

            
113
8
    async fn list<C: Collection, R: Into<Range<u64>> + Send>(
114
8
        &self,
115
8
        ids: R,
116
8
        order: Sort,
117
8
        limit: Option<usize>,
118
8
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
119
8
        match self
120
8
            .client
121
8
            .send_request(Request::Database {
122
8
                database: self.name.to_string(),
123
8
                request: DatabaseRequest::List {
124
8
                    collection: C::collection_name(),
125
8
                    ids: ids.into(),
126
8
                    order,
127
8
                    limit,
128
8
                },
129
8
            })
130
8
            .await?
131
        {
132
8
            Response::Database(DatabaseResponse::Documents(documents)) => Ok(documents),
133
            Response::Error(err) => Err(err),
134
            other => Err(bonsaidb_core::Error::Networking(
135
                bonsaidb_core::networking::Error::UnexpectedResponse(format!("{:?}", other)),
136
            )),
137
        }
138
16
    }
139

            
140
50
    async fn query<V: SerializedView>(
141
50
        &self,
142
50
        key: Option<QueryKey<V::Key>>,
143
50
        order: Sort,
144
50
        limit: Option<usize>,
145
50
        access_policy: AccessPolicy,
146
50
    ) -> Result<Vec<Map<V::Key, V::Value>>, bonsaidb_core::Error>
147
50
    where
148
50
        Self: Sized,
149
50
    {
150
        match self
151
            .client
152
            .send_request(Request::Database {
153
50
                database: self.name.to_string(),
154
50
                request: DatabaseRequest::Query {
155
50
                    view: self
156
50
                        .schema
157
50
                        .view::<V>()
158
50
                        .ok_or(bonsaidb_core::Error::CollectionNotFound)?
159
50
                        .view_name(),
160
50
                    key: key.map(|key| key.serialized()).transpose()?,
161
50
                    order,
162
50
                    limit,
163
50
                    access_policy,
164
                    with_docs: false,
165
                },
166
50
            })
167
50
            .await?
168
        {
169
50
            Response::Database(DatabaseResponse::ViewMappings(mappings)) => Ok(mappings
170
50
                .iter()
171
50
                .map(map::Serialized::deserialized::<V>)
172
50
                .collect::<Result<Vec<_>, _>>()
173
50
                .map_err(|err| bonsaidb_core::Error::Database(err.to_string()))?),
174
            Response::Error(err) => Err(err),
175
            other => Err(bonsaidb_core::Error::Networking(
176
                bonsaidb_core::networking::Error::UnexpectedResponse(format!("{:?}", other)),
177
            )),
178
        }
179
100
    }
180

            
181
4032
    async fn query_with_docs<V: SerializedView>(
182
4032
        &self,
183
4032
        key: Option<QueryKey<V::Key>>,
184
4032
        order: Sort,
185
4032
        limit: Option<usize>,
186
4032
        access_policy: AccessPolicy,
187
4032
    ) -> Result<MappedDocuments<OwnedDocument, V>, bonsaidb_core::Error>
188
4032
    where
189
4032
        Self: Sized,
190
4032
    {
191
        match self
192
            .client
193
            .send_request(Request::Database {
194
4032
                database: self.name.to_string(),
195
4032
                request: DatabaseRequest::Query {
196
4032
                    view: self
197
4032
                        .schema
198
4032
                        .view::<V>()
199
4032
                        .ok_or(bonsaidb_core::Error::CollectionNotFound)?
200
4032
                        .view_name(),
201
4032
                    key: key.map(|key| key.serialized()).transpose()?,
202
4032
                    order,
203
4032
                    limit,
204
4032
                    access_policy,
205
                    with_docs: true,
206
                },
207
4032
            })
208
4032
            .await?
209
        {
210
4032
            Response::Database(DatabaseResponse::ViewMappingsWithDocs(mappings)) => Ok(mappings
211
4032
                .deserialized::<V>()
212
4032
                .map_err(|err| bonsaidb_core::Error::Database(err.to_string()))?),
213
            Response::Error(err) => Err(err),
214
            other => Err(bonsaidb_core::Error::Networking(
215
                bonsaidb_core::networking::Error::UnexpectedResponse(format!("{:?}", other)),
216
            )),
217
        }
218
8064
    }
219

            
220
8003
    async fn reduce<V: SerializedView>(
221
8003
        &self,
222
8003
        key: Option<QueryKey<V::Key>>,
223
8003
        access_policy: AccessPolicy,
224
8003
    ) -> Result<V::Value, bonsaidb_core::Error>
225
8003
    where
226
8003
        Self: Sized,
227
8003
    {
228
        match self
229
            .client
230
            .send_request(Request::Database {
231
8003
                database: self.name.to_string(),
232
8003
                request: DatabaseRequest::Reduce {
233
8003
                    view: self
234
8003
                        .schema
235
8003
                        .view::<V>()
236
8003
                        .ok_or(bonsaidb_core::Error::CollectionNotFound)?
237
8003
                        .view_name(),
238
8003
                    key: key.map(|key| key.serialized()).transpose()?,
239
8003
                    access_policy,
240
                    grouped: false,
241
                },
242
8003
            })
243
8003
            .await?
244
        {
245
8001
            Response::Database(DatabaseResponse::ViewReduction(value)) => {
246
8001
                let value = V::deserialize(&value)?;
247
8001
                Ok(value)
248
            }
249
2
            Response::Error(err) => Err(err),
250
            other => Err(bonsaidb_core::Error::Networking(
251
                bonsaidb_core::networking::Error::UnexpectedResponse(format!("{:?}", other)),
252
            )),
253
        }
254
16006
    }
255

            
256
6
    async fn reduce_grouped<V: SerializedView>(
257
6
        &self,
258
6
        key: Option<QueryKey<V::Key>>,
259
6
        access_policy: AccessPolicy,
260
6
    ) -> Result<Vec<MappedValue<V::Key, V::Value>>, bonsaidb_core::Error>
261
6
    where
262
6
        Self: Sized,
263
6
    {
264
        match self
265
            .client
266
            .send_request(Request::Database {
267
6
                database: self.name.to_string(),
268
6
                request: DatabaseRequest::Reduce {
269
6
                    view: self
270
6
                        .schema
271
6
                        .view::<V>()
272
6
                        .ok_or(bonsaidb_core::Error::CollectionNotFound)?
273
6
                        .view_name(),
274
6
                    key: key.map(|key| key.serialized()).transpose()?,
275
6
                    access_policy,
276
                    grouped: true,
277
                },
278
8
            })
279
8
            .await?
280
        {
281
6
            Response::Database(DatabaseResponse::ViewGroupedReduction(values)) => values
282
6
                .into_iter()
283
22
                .map(|map| {
284
22
                    Ok(MappedValue::new(
285
22
                        V::Key::from_big_endian_bytes(&map.key).map_err(|err| {
286
                            bonsaidb_core::Error::Database(
287
                                view::Error::key_serialization(err).to_string(),
288
                            )
289
22
                        })?,
290
22
                        V::deserialize(&map.value)?,
291
                    ))
292
22
                })
293
6
                .collect::<Result<Vec<_>, bonsaidb_core::Error>>(),
294
            Response::Error(err) => Err(err),
295
            other => Err(bonsaidb_core::Error::Networking(
296
                bonsaidb_core::networking::Error::UnexpectedResponse(format!("{:?}", other)),
297
            )),
298
        }
299
12
    }
300

            
301
2
    async fn delete_docs<V: SerializedView>(
302
2
        &self,
303
2
        key: Option<QueryKey<V::Key>>,
304
2
        access_policy: AccessPolicy,
305
2
    ) -> Result<u64, bonsaidb_core::Error>
306
2
    where
307
2
        Self: Sized,
308
2
    {
309
        match self
310
            .client
311
            .send_request(Request::Database {
312
2
                database: self.name.to_string(),
313
2
                request: DatabaseRequest::DeleteDocs {
314
2
                    view: self
315
2
                        .schema
316
2
                        .view::<V>()
317
2
                        .ok_or(bonsaidb_core::Error::CollectionNotFound)?
318
2
                        .view_name(),
319
2
                    key: key.map(|key| key.serialized()).transpose()?,
320
2
                    access_policy,
321
                },
322
2
            })
323
2
            .await?
324
        {
325
2
            Response::Database(DatabaseResponse::DocumentsDeleted(count)) => Ok(count),
326
            Response::Error(err) => Err(err),
327
            other => Err(bonsaidb_core::Error::Networking(
328
                bonsaidb_core::networking::Error::UnexpectedResponse(format!("{:?}", other)),
329
            )),
330
        }
331
4
    }
332

            
333
9551
    async fn apply_transaction(
334
9551
        &self,
335
9551
        transaction: Transaction,
336
9551
    ) -> Result<Vec<OperationResult>, bonsaidb_core::Error> {
337
9551
        match self
338
9551
            .client
339
9551
            .send_request(Request::Database {
340
9551
                database: self.name.to_string(),
341
9551
                request: DatabaseRequest::ApplyTransaction { transaction },
342
19599
            })
343
19599
            .await?
344
        {
345
9033
            Response::Database(DatabaseResponse::TransactionResults(results)) => Ok(results),
346
518
            Response::Error(err) => Err(err),
347
            other => Err(bonsaidb_core::Error::Networking(
348
                bonsaidb_core::networking::Error::UnexpectedResponse(format!("{:?}", other)),
349
            )),
350
        }
351
19102
    }
352

            
353
1036
    async fn list_executed_transactions(
354
1036
        &self,
355
1036
        starting_id: Option<u64>,
356
1036
        result_limit: Option<usize>,
357
1036
    ) -> Result<Vec<Executed>, bonsaidb_core::Error> {
358
1036
        match self
359
1036
            .client
360
1036
            .send_request(Request::Database {
361
1036
                database: self.name.to_string(),
362
1036
                request: DatabaseRequest::ListExecutedTransactions {
363
1036
                    starting_id,
364
1036
                    result_limit,
365
1036
                },
366
3030
            })
367
3030
            .await?
368
        {
369
1036
            Response::Database(DatabaseResponse::ExecutedTransactions(results)) => Ok(results),
370
            Response::Error(err) => Err(err),
371
            other => Err(bonsaidb_core::Error::Networking(
372
                bonsaidb_core::networking::Error::UnexpectedResponse(format!("{:?}", other)),
373
            )),
374
        }
375
2072
    }
376

            
377
2
    async fn last_transaction_id(&self) -> Result<Option<u64>, bonsaidb_core::Error> {
378
2
        match self
379
2
            .client
380
2
            .send_request(Request::Database {
381
2
                database: self.name.to_string(),
382
2
                request: DatabaseRequest::LastTransactionId,
383
2
            })
384
2
            .await?
385
        {
386
2
            Response::Database(DatabaseResponse::LastTransactionId(result)) => Ok(result),
387
            Response::Error(err) => Err(err),
388
            other => Err(bonsaidb_core::Error::Networking(
389
                bonsaidb_core::networking::Error::UnexpectedResponse(format!("{:?}", other)),
390
            )),
391
        }
392
4
    }
393

            
394
2
    async fn compact_collection<C: Collection>(&self) -> Result<(), bonsaidb_core::Error> {
395
2
        match self
396
2
            .send_request(Request::Database {
397
2
                database: self.name.to_string(),
398
2
                request: DatabaseRequest::CompactCollection {
399
2
                    name: C::collection_name(),
400
2
                },
401
2
            })
402
2
            .await?
403
        {
404
2
            Response::Ok => Ok(()),
405
            Response::Error(err) => Err(err),
406
            other => Err(bonsaidb_core::Error::Networking(
407
                bonsaidb_core::networking::Error::UnexpectedResponse(format!("{:?}", other)),
408
            )),
409
        }
410
4
    }
411

            
412
2
    async fn compact(&self) -> Result<(), bonsaidb_core::Error> {
413
2
        match self
414
2
            .send_request(Request::Database {
415
2
                database: self.name.to_string(),
416
2
                request: DatabaseRequest::Compact,
417
2
            })
418
2
            .await?
419
        {
420
2
            Response::Ok => Ok(()),
421
            Response::Error(err) => Err(err),
422
            other => Err(bonsaidb_core::Error::Networking(
423
                bonsaidb_core::networking::Error::UnexpectedResponse(format!("{:?}", other)),
424
            )),
425
        }
426
4
    }
427

            
428
2
    async fn compact_key_value_store(&self) -> Result<(), bonsaidb_core::Error> {
429
2
        match self
430
2
            .send_request(Request::Database {
431
2
                database: self.name.to_string(),
432
2
                request: DatabaseRequest::CompactKeyValueStore,
433
2
            })
434
2
            .await?
435
        {
436
2
            Response::Ok => Ok(()),
437
            Response::Error(err) => Err(err),
438
            other => Err(bonsaidb_core::Error::Networking(
439
                bonsaidb_core::networking::Error::UnexpectedResponse(format!("{:?}", other)),
440
            )),
441
        }
442
4
    }
443
}