1
use std::ops::Deref;
2
use std::sync::Arc;
3

            
4
use async_trait::async_trait;
5
use bonsaidb_core::connection::{
6
    AccessPolicy, AsyncConnection, AsyncLowLevelConnection, HasSchema, HasSession, Range,
7
    SerializedQueryKey, Session, Sort,
8
};
9
use bonsaidb_core::document::{DocumentId, Header, OwnedDocument};
10
use bonsaidb_core::networking::{
11
    ApplyTransaction, Compact, CompactCollection, CompactKeyValueStore, Count, DeleteDocs, Get,
12
    GetMultiple, LastTransactionId, List, ListExecutedTransactions, ListHeaders, Query,
13
    QueryWithDocs, Reduce, ReduceGrouped,
14
};
15
use bonsaidb_core::schema::view::map::MappedSerializedValue;
16
use bonsaidb_core::schema::{self, CollectionName, Schematic, ViewName};
17
use bonsaidb_core::transaction::{Executed, OperationResult, Transaction};
18

            
19
use crate::AsyncClient;
20

            
21
mod pubsub;
22
pub use pubsub::*;
23

            
24
mod keyvalue;
25

            
26
/// A database on a remote server.
27
405
#[derive(Debug, Clone)]
28
pub struct AsyncRemoteDatabase {
29
    pub(crate) client: AsyncClient,
30
    pub(crate) name: Arc<String>,
31
    pub(crate) schema: Arc<Schematic>,
32
}
33
impl AsyncRemoteDatabase {
34
    /// Returns the name of the database.
35
    #[must_use]
36
    pub fn name(&self) -> &str {
37
        self.name.as_ref()
38
    }
39
}
40

            
41
impl Deref for AsyncRemoteDatabase {
42
    type Target = AsyncClient;
43

            
44
162
    fn deref(&self) -> &Self::Target {
45
162
        &self.client
46
162
    }
47
}
48

            
49
impl AsyncRemoteDatabase {
50
23076
    pub(crate) fn new(client: AsyncClient, name: String, schema: Arc<Schematic>) -> Self {
51
23076
        Self {
52
23076
            client,
53
23076
            name: Arc::new(name),
54
23076
            schema,
55
23076
        }
56
23076
    }
57
}
58

            
59
impl HasSession for AsyncRemoteDatabase {
60
    fn session(&self) -> Option<&Session> {
61
        self.client.session()
62
    }
63
}
64

            
65
#[async_trait]
66
impl AsyncConnection for AsyncRemoteDatabase {
67
    type Storage = AsyncClient;
68

            
69
    fn storage(&self) -> Self::Storage {
70
        self.client.clone()
71
    }
72

            
73
18648
    async fn list_executed_transactions(
74
18648
        &self,
75
18648
        starting_id: Option<u64>,
76
18648
        result_limit: Option<u32>,
77
18648
    ) -> Result<Vec<Executed>, bonsaidb_core::Error> {
78
18648
        Ok(self
79
18648
            .client
80
18648
            .send_api_request(&ListExecutedTransactions {
81
18648
                database: self.name.to_string(),
82
18648
                starting_id,
83
18648
                result_limit,
84
18648
            })
85
60498
            .await?)
86
37296
    }
87

            
88
36
    async fn last_transaction_id(&self) -> Result<Option<u64>, bonsaidb_core::Error> {
89
36
        Ok(self
90
36
            .client
91
36
            .send_api_request(&LastTransactionId {
92
36
                database: self.name.to_string(),
93
36
            })
94
36
            .await?)
95
72
    }
96

            
97
36
    async fn compact(&self) -> Result<(), bonsaidb_core::Error> {
98
36
        self.send_api_request(&Compact {
99
36
            database: self.name.to_string(),
100
36
        })
101
36
        .await?;
102
36
        Ok(())
103
72
    }
104

            
105
36
    async fn compact_key_value_store(&self) -> Result<(), bonsaidb_core::Error> {
106
36
        self.send_api_request(&CompactKeyValueStore {
107
36
            database: self.name.to_string(),
108
36
        })
109
36
        .await?;
110
36
        Ok(())
111
72
    }
112
}
113

            
114
#[async_trait]
115
impl AsyncLowLevelConnection for AsyncRemoteDatabase {
116
194130
    async fn apply_transaction(
117
194130
        &self,
118
194130
        transaction: Transaction,
119
194130
    ) -> Result<Vec<OperationResult>, bonsaidb_core::Error> {
120
194130
        Ok(self
121
194130
            .client
122
194130
            .send_api_request(&ApplyTransaction {
123
194130
                database: self.name.to_string(),
124
194130
                transaction,
125
194130
            })
126
719838
            .await?)
127
388260
    }
128

            
129
162252
    async fn get_from_collection(
130
162252
        &self,
131
162252
        id: DocumentId,
132
162252
        collection: &CollectionName,
133
162252
    ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error> {
134
162252
        Ok(self
135
162252
            .client
136
162252
            .send_api_request(&Get {
137
162252
                database: self.name.to_string(),
138
162252
                collection: collection.clone(),
139
162252
                id,
140
162252
            })
141
256698
            .await?)
142
324504
    }
143

            
144
84636
    async fn get_multiple_from_collection(
145
84636
        &self,
146
84636
        ids: &[DocumentId],
147
84636
        collection: &CollectionName,
148
84636
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
149
84636
        Ok(self
150
84636
            .client
151
84636
            .send_api_request(&GetMultiple {
152
84636
                database: self.name.to_string(),
153
84636
                collection: collection.clone(),
154
84636
                ids: ids.to_vec(),
155
84636
            })
156
84636
            .await?)
157
169272
    }
158

            
159
144
    async fn list_from_collection(
160
144
        &self,
161
144
        ids: Range<DocumentId>,
162
144
        order: Sort,
163
144
        limit: Option<u32>,
164
144
        collection: &CollectionName,
165
144
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
166
144
        Ok(self
167
144
            .client
168
144
            .send_api_request(&List {
169
144
                database: self.name.to_string(),
170
144
                collection: collection.clone(),
171
144
                ids,
172
144
                order,
173
144
                limit,
174
144
            })
175
144
            .await?)
176
288
    }
177

            
178
36
    async fn list_headers_from_collection(
179
36
        &self,
180
36
        ids: Range<DocumentId>,
181
36
        order: Sort,
182
36
        limit: Option<u32>,
183
36
        collection: &CollectionName,
184
36
    ) -> Result<Vec<Header>, bonsaidb_core::Error> {
185
36
        Ok(self
186
36
            .client
187
36
            .send_api_request(&ListHeaders(List {
188
36
                database: self.name.to_string(),
189
36
                collection: collection.clone(),
190
36
                ids,
191
36
                order,
192
36
                limit,
193
36
            }))
194
36
            .await?)
195
72
    }
196

            
197
72
    async fn count_from_collection(
198
72
        &self,
199
72
        ids: Range<DocumentId>,
200
72
        collection: &CollectionName,
201
72
    ) -> Result<u64, bonsaidb_core::Error> {
202
72
        Ok(self
203
72
            .client
204
72
            .send_api_request(&Count {
205
72
                database: self.name.to_string(),
206
72
                collection: collection.clone(),
207
72
                ids,
208
72
            })
209
72
            .await?)
210
144
    }
211

            
212
36
    async fn compact_collection_by_name(
213
36
        &self,
214
36
        collection: CollectionName,
215
36
    ) -> Result<(), bonsaidb_core::Error> {
216
36
        self.send_api_request(&CompactCollection {
217
36
            database: self.name.to_string(),
218
36
            name: collection,
219
36
        })
220
36
        .await?;
221
36
        Ok(())
222
72
    }
223

            
224
85716
    async fn query_by_name(
225
85716
        &self,
226
85716
        view: &ViewName,
227
85716
        key: Option<SerializedQueryKey>,
228
85716
        order: Sort,
229
85716
        limit: Option<u32>,
230
85716
        access_policy: AccessPolicy,
231
85716
    ) -> Result<Vec<schema::view::map::Serialized>, bonsaidb_core::Error> {
232
85716
        Ok(self
233
85716
            .client
234
85716
            .send_api_request(&Query {
235
85716
                database: self.name.to_string(),
236
85716
                view: view.clone(),
237
85716
                key,
238
85716
                order,
239
85716
                limit,
240
85716
                access_policy,
241
85716
            })
242
85716
            .await?)
243
171432
    }
244

            
245
    async fn query_by_name_with_docs(
246
        &self,
247
        view: &ViewName,
248
        key: Option<SerializedQueryKey>,
249
        order: Sort,
250
        limit: Option<u32>,
251
        access_policy: AccessPolicy,
252
    ) -> Result<schema::view::map::MappedSerializedDocuments, bonsaidb_core::Error> {
253
        Ok(self
254
            .client
255
            .send_api_request(&QueryWithDocs(Query {
256
                database: self.name.to_string(),
257
                view: view.clone(),
258
                key,
259
                order,
260
                limit,
261
                access_policy,
262
            }))
263
            .await?)
264
    }
265

            
266
166554
    async fn reduce_by_name(
267
166554
        &self,
268
166554
        view: &ViewName,
269
166554
        key: Option<SerializedQueryKey>,
270
166554
        access_policy: AccessPolicy,
271
166554
    ) -> Result<Vec<u8>, bonsaidb_core::Error> {
272
166554
        Ok(self
273
166554
            .client
274
166554
            .send_api_request(&Reduce {
275
166554
                database: self.name.to_string(),
276
166554
                view: view.clone(),
277
166554
                key,
278
166554
                access_policy,
279
166554
            })
280
166554
            .await?
281
166518
            .into_vec())
282
333108
    }
283

            
284
108
    async fn reduce_grouped_by_name(
285
108
        &self,
286
108
        view: &ViewName,
287
108
        key: Option<SerializedQueryKey>,
288
108
        access_policy: AccessPolicy,
289
108
    ) -> Result<Vec<MappedSerializedValue>, bonsaidb_core::Error> {
290
108
        Ok(self
291
108
            .client
292
108
            .send_api_request(&ReduceGrouped(Reduce {
293
108
                database: self.name.to_string(),
294
108
                view: view.clone(),
295
108
                key,
296
108
                access_policy,
297
108
            }))
298
144
            .await?)
299
216
    }
300

            
301
72
    async fn delete_docs_by_name(
302
72
        &self,
303
72
        view: &ViewName,
304
72
        key: Option<SerializedQueryKey>,
305
72
        access_policy: AccessPolicy,
306
72
    ) -> Result<u64, bonsaidb_core::Error> {
307
72
        Ok(self
308
72
            .client
309
72
            .send_api_request(&DeleteDocs {
310
72
                database: self.name.to_string(),
311
72
                view: view.clone(),
312
72
                key,
313
72
                access_policy,
314
72
            })
315
72
            .await?)
316
144
    }
317
}
318

            
319
impl HasSchema for AsyncRemoteDatabase {
320
253350
    fn schematic(&self) -> &Schematic {
321
253350
        &self.schema
322
253350
    }
323
}