1
use std::ops::Deref;
2
use std::sync::Arc;
3

            
4
use async_trait::async_trait;
5
use bonsaidb_core::connection::{
6
    AccessPolicy, AsyncConnection, AsyncLowLevelConnection, HasSchema, HasSession, Range,
7
    SerializedQueryKey, Session, Sort,
8
};
9
use bonsaidb_core::document::{DocumentId, Header, OwnedDocument};
10
use bonsaidb_core::networking::{
11
    ApplyTransaction, Compact, CompactCollection, CompactKeyValueStore, Count, DeleteDocs, Get,
12
    GetMultiple, LastTransactionId, List, ListExecutedTransactions, ListHeaders, Query,
13
    QueryWithDocs, Reduce, ReduceGrouped,
14
};
15
use bonsaidb_core::schema::view::map::MappedSerializedValue;
16
use bonsaidb_core::schema::{self, CollectionName, Schematic, ViewName};
17
use bonsaidb_core::transaction::{Executed, OperationResult, Transaction};
18

            
19
use crate::AsyncClient;
20

            
21
mod pubsub;
22
pub use pubsub::*;
23

            
24
mod keyvalue;
25

            
26
/// A database on a remote server.
27
407
#[derive(Debug, Clone)]
28
pub struct AsyncRemoteDatabase {
29
    pub(crate) client: AsyncClient,
30
    pub(crate) name: Arc<String>,
31
    pub(crate) schema: Arc<Schematic>,
32
}
33
impl AsyncRemoteDatabase {
34
    /// Returns the name of the database.
35
    #[must_use]
36
    pub fn name(&self) -> &str {
37
        self.name.as_ref()
38
    }
39
}
40

            
41
impl Deref for AsyncRemoteDatabase {
42
    type Target = AsyncClient;
43

            
44
270
    fn deref(&self) -> &Self::Target {
45
270
        &self.client
46
270
    }
47
}
48

            
49
impl AsyncRemoteDatabase {
50
38820
    pub(crate) fn new(client: AsyncClient, name: String, schema: Arc<Schematic>) -> Self {
51
38820
        Self {
52
38820
            client,
53
38820
            name: Arc::new(name),
54
38820
            schema,
55
38820
        }
56
38820
    }
57
}
58

            
59
impl HasSession for AsyncRemoteDatabase {
60
    fn session(&self) -> Option<&Session> {
61
        self.client.session()
62
    }
63
}
64

            
65
#[async_trait]
66
impl AsyncConnection for AsyncRemoteDatabase {
67
    type Storage = AsyncClient;
68

            
69
    fn storage(&self) -> Self::Storage {
70
        self.client.clone()
71
    }
72

            
73
31080
    async fn list_executed_transactions(
74
31080
        &self,
75
31080
        starting_id: Option<u64>,
76
31080
        result_limit: Option<u32>,
77
31080
    ) -> Result<Vec<Executed>, bonsaidb_core::Error> {
78
31080
        Ok(self
79
31080
            .client
80
31080
            .send_api_request(&ListExecutedTransactions {
81
31080
                database: self.name.to_string(),
82
31080
                starting_id,
83
31080
                result_limit,
84
31080
            })
85
117660
            .await?)
86
93240
    }
87

            
88
60
    async fn last_transaction_id(&self) -> Result<Option<u64>, bonsaidb_core::Error> {
89
60
        Ok(self
90
60
            .client
91
60
            .send_api_request(&LastTransactionId {
92
60
                database: self.name.to_string(),
93
60
            })
94
60
            .await?)
95
180
    }
96

            
97
60
    async fn compact(&self) -> Result<(), bonsaidb_core::Error> {
98
60
        self.send_api_request(&Compact {
99
60
            database: self.name.to_string(),
100
60
        })
101
60
        .await?;
102
60
        Ok(())
103
180
    }
104

            
105
60
    async fn compact_key_value_store(&self) -> Result<(), bonsaidb_core::Error> {
106
60
        self.send_api_request(&CompactKeyValueStore {
107
60
            database: self.name.to_string(),
108
60
        })
109
60
        .await?;
110
60
        Ok(())
111
180
    }
112
}
113

            
114
#[async_trait]
115
impl AsyncLowLevelConnection for AsyncRemoteDatabase {
116
320130
    async fn apply_transaction(
117
320130
        &self,
118
320130
        transaction: Transaction,
119
320130
    ) -> Result<Vec<OperationResult>, bonsaidb_core::Error> {
120
320130
        Ok(self
121
320130
            .client
122
320130
            .send_api_request(&ApplyTransaction {
123
320130
                database: self.name.to_string(),
124
320130
                transaction,
125
320130
            })
126
1040670
            .await?)
127
960390
    }
128

            
129
269820
    async fn get_from_collection(
130
269820
        &self,
131
269820
        id: DocumentId,
132
269820
        collection: &CollectionName,
133
269820
    ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error> {
134
269820
        Ok(self
135
269820
            .client
136
269820
            .send_api_request(&Get {
137
269820
                database: self.name.to_string(),
138
269820
                collection: collection.clone(),
139
269820
                id,
140
269820
            })
141
437400
            .await?)
142
809460
    }
143

            
144
138600
    async fn get_multiple_from_collection(
145
138600
        &self,
146
138600
        ids: &[DocumentId],
147
138600
        collection: &CollectionName,
148
138600
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
149
138600
        Ok(self
150
138600
            .client
151
138600
            .send_api_request(&GetMultiple {
152
138600
                database: self.name.to_string(),
153
138600
                collection: collection.clone(),
154
138600
                ids: ids.to_vec(),
155
138600
            })
156
138600
            .await?)
157
415800
    }
158

            
159
240
    async fn list_from_collection(
160
240
        &self,
161
240
        ids: Range<DocumentId>,
162
240
        order: Sort,
163
240
        limit: Option<u32>,
164
240
        collection: &CollectionName,
165
240
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
166
240
        Ok(self
167
240
            .client
168
240
            .send_api_request(&List {
169
240
                database: self.name.to_string(),
170
240
                collection: collection.clone(),
171
240
                ids,
172
240
                order,
173
240
                limit,
174
240
            })
175
240
            .await?)
176
720
    }
177

            
178
60
    async fn list_headers_from_collection(
179
60
        &self,
180
60
        ids: Range<DocumentId>,
181
60
        order: Sort,
182
60
        limit: Option<u32>,
183
60
        collection: &CollectionName,
184
60
    ) -> Result<Vec<Header>, bonsaidb_core::Error> {
185
60
        Ok(self
186
60
            .client
187
60
            .send_api_request(&ListHeaders(List {
188
60
                database: self.name.to_string(),
189
60
                collection: collection.clone(),
190
60
                ids,
191
60
                order,
192
60
                limit,
193
60
            }))
194
60
            .await?)
195
180
    }
196

            
197
120
    async fn count_from_collection(
198
120
        &self,
199
120
        ids: Range<DocumentId>,
200
120
        collection: &CollectionName,
201
120
    ) -> Result<u64, bonsaidb_core::Error> {
202
120
        Ok(self
203
120
            .client
204
120
            .send_api_request(&Count {
205
120
                database: self.name.to_string(),
206
120
                collection: collection.clone(),
207
120
                ids,
208
120
            })
209
120
            .await?)
210
360
    }
211

            
212
60
    async fn compact_collection_by_name(
213
60
        &self,
214
60
        collection: CollectionName,
215
60
    ) -> Result<(), bonsaidb_core::Error> {
216
60
        self.send_api_request(&CompactCollection {
217
60
            database: self.name.to_string(),
218
60
            name: collection,
219
60
        })
220
60
        .await?;
221
60
        Ok(())
222
180
    }
223

            
224
140400
    async fn query_by_name(
225
140400
        &self,
226
140400
        view: &ViewName,
227
140400
        key: Option<SerializedQueryKey>,
228
140400
        order: Sort,
229
140400
        limit: Option<u32>,
230
140400
        access_policy: AccessPolicy,
231
140400
    ) -> Result<Vec<schema::view::map::Serialized>, bonsaidb_core::Error> {
232
140400
        Ok(self
233
140400
            .client
234
140400
            .send_api_request(&Query {
235
140400
                database: self.name.to_string(),
236
140400
                view: view.clone(),
237
140400
                key,
238
140400
                order,
239
140400
                limit,
240
140400
                access_policy,
241
140400
            })
242
140400
            .await?)
243
421200
    }
244

            
245
    async fn query_by_name_with_docs(
246
        &self,
247
        view: &ViewName,
248
        key: Option<SerializedQueryKey>,
249
        order: Sort,
250
        limit: Option<u32>,
251
        access_policy: AccessPolicy,
252
    ) -> Result<schema::view::map::MappedSerializedDocuments, bonsaidb_core::Error> {
253
        Ok(self
254
            .client
255
            .send_api_request(&QueryWithDocs(Query {
256
                database: self.name.to_string(),
257
                view: view.clone(),
258
                key,
259
                order,
260
                limit,
261
                access_policy,
262
            }))
263
            .await?)
264
    }
265

            
266
276090
    async fn reduce_by_name(
267
276090
        &self,
268
276090
        view: &ViewName,
269
276090
        key: Option<SerializedQueryKey>,
270
276090
        access_policy: AccessPolicy,
271
276090
    ) -> Result<Vec<u8>, bonsaidb_core::Error> {
272
276090
        Ok(self
273
276090
            .client
274
276090
            .send_api_request(&Reduce {
275
276090
                database: self.name.to_string(),
276
276090
                view: view.clone(),
277
276090
                key,
278
276090
                access_policy,
279
276090
            })
280
276090
            .await?
281
276030
            .into_vec())
282
828270
    }
283

            
284
180
    async fn reduce_grouped_by_name(
285
180
        &self,
286
180
        view: &ViewName,
287
180
        key: Option<SerializedQueryKey>,
288
180
        access_policy: AccessPolicy,
289
180
    ) -> Result<Vec<MappedSerializedValue>, bonsaidb_core::Error> {
290
180
        Ok(self
291
180
            .client
292
180
            .send_api_request(&ReduceGrouped(Reduce {
293
180
                database: self.name.to_string(),
294
180
                view: view.clone(),
295
180
                key,
296
180
                access_policy,
297
180
            }))
298
270
            .await?)
299
540
    }
300

            
301
120
    async fn delete_docs_by_name(
302
120
        &self,
303
120
        view: &ViewName,
304
120
        key: Option<SerializedQueryKey>,
305
120
        access_policy: AccessPolicy,
306
120
    ) -> Result<u64, bonsaidb_core::Error> {
307
120
        Ok(self
308
120
            .client
309
120
            .send_api_request(&DeleteDocs {
310
120
                database: self.name.to_string(),
311
120
                view: view.clone(),
312
120
                key,
313
120
                access_policy,
314
120
            })
315
120
            .await?)
316
360
    }
317
}
318

            
319
impl HasSchema for AsyncRemoteDatabase {
320
418290
    fn schematic(&self) -> &Schematic {
321
418290
        &self.schema
322
418290
    }
323
}