1
use std::ops::Deref;
2
use std::sync::Arc;
3

            
4
use async_trait::async_trait;
5
use bonsaidb_core::connection::{
6
    AccessPolicy, AsyncConnection, AsyncLowLevelConnection, HasSchema, HasSession, Range,
7
    SerializedQueryKey, Session, Sort,
8
};
9
use bonsaidb_core::document::{DocumentId, Header, OwnedDocument};
10
use bonsaidb_core::networking::{
11
    ApplyTransaction, Compact, CompactCollection, CompactKeyValueStore, Count, DeleteDocs, Get,
12
    GetMultiple, LastTransactionId, List, ListExecutedTransactions, ListHeaders, Query,
13
    QueryWithDocs, Reduce, ReduceGrouped,
14
};
15
use bonsaidb_core::schema::view::map::MappedSerializedValue;
16
use bonsaidb_core::schema::{self, CollectionName, Schematic, ViewName};
17
use bonsaidb_core::transaction::{Executed, OperationResult, Transaction};
18

            
19
use crate::AsyncClient;
20

            
21
mod pubsub;
22
pub use pubsub::*;
23

            
24
mod keyvalue;
25

            
26
/// A database on a remote server.
27
405
#[derive(Debug, Clone)]
28
pub struct AsyncRemoteDatabase {
29
    pub(crate) client: AsyncClient,
30
    pub(crate) name: Arc<String>,
31
    pub(crate) schema: Arc<Schematic>,
32
}
33
impl AsyncRemoteDatabase {
34
    /// Returns the name of the database.
35
    #[must_use]
36
    pub fn name(&self) -> &str {
37
        self.name.as_ref()
38
    }
39
}
40

            
41
impl Deref for AsyncRemoteDatabase {
42
    type Target = AsyncClient;
43

            
44
153
    fn deref(&self) -> &Self::Target {
45
153
        &self.client
46
153
    }
47
}
48

            
49
impl AsyncRemoteDatabase {
50
21794
    pub(crate) fn new(client: AsyncClient, name: String, schema: Arc<Schematic>) -> Self {
51
21794
        Self {
52
21794
            client,
53
21794
            name: Arc::new(name),
54
21794
            schema,
55
21794
        }
56
21794
    }
57
}
58

            
59
impl HasSession for AsyncRemoteDatabase {
60
    fn session(&self) -> Option<&Session> {
61
        self.client.session()
62
    }
63
}
64

            
65
#[async_trait]
66
impl AsyncConnection for AsyncRemoteDatabase {
67
    type Storage = AsyncClient;
68

            
69
    fn storage(&self) -> Self::Storage {
70
        self.client.clone()
71
    }
72

            
73
17612
    async fn list_executed_transactions(
74
17612
        &self,
75
17612
        starting_id: Option<u64>,
76
17612
        result_limit: Option<u32>,
77
17612
    ) -> Result<Vec<Executed>, bonsaidb_core::Error> {
78
17612
        Ok(self
79
17612
            .client
80
17612
            .send_api_request(&ListExecutedTransactions {
81
17612
                database: self.name.to_string(),
82
17612
                starting_id,
83
17612
                result_limit,
84
17612
            })
85
50320
            .await?)
86
35224
    }
87

            
88
34
    async fn last_transaction_id(&self) -> Result<Option<u64>, bonsaidb_core::Error> {
89
34
        Ok(self
90
34
            .client
91
34
            .send_api_request(&LastTransactionId {
92
34
                database: self.name.to_string(),
93
34
            })
94
34
            .await?)
95
68
    }
96

            
97
34
    async fn compact(&self) -> Result<(), bonsaidb_core::Error> {
98
34
        self.send_api_request(&Compact {
99
34
            database: self.name.to_string(),
100
34
        })
101
34
        .await?;
102
34
        Ok(())
103
68
    }
104

            
105
34
    async fn compact_key_value_store(&self) -> Result<(), bonsaidb_core::Error> {
106
34
        self.send_api_request(&CompactKeyValueStore {
107
34
            database: self.name.to_string(),
108
34
        })
109
34
        .await?;
110
34
        Ok(())
111
68
    }
112
}
113

            
114
#[async_trait]
115
impl AsyncLowLevelConnection for AsyncRemoteDatabase {
116
182461
    async fn apply_transaction(
117
182461
        &self,
118
182461
        transaction: Transaction,
119
182461
    ) -> Result<Vec<OperationResult>, bonsaidb_core::Error> {
120
182461
        Ok(self
121
182461
            .client
122
182461
            .send_api_request(&ApplyTransaction {
123
182461
                database: self.name.to_string(),
124
182461
                transaction,
125
182461
            })
126
625311
            .await?)
127
364922
    }
128

            
129
157386
    async fn get_from_collection(
130
157386
        &self,
131
157386
        id: DocumentId,
132
157386
        collection: &CollectionName,
133
157386
    ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error> {
134
157386
        Ok(self
135
157386
            .client
136
157386
            .send_api_request(&Get {
137
157386
                database: self.name.to_string(),
138
157386
                collection: collection.clone(),
139
157386
                id,
140
157386
            })
141
234209
            .await?)
142
314772
    }
143

            
144
80614
    async fn get_multiple_from_collection(
145
80614
        &self,
146
80614
        ids: &[DocumentId],
147
80614
        collection: &CollectionName,
148
80614
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
149
80614
        Ok(self
150
80614
            .client
151
80614
            .send_api_request(&GetMultiple {
152
80614
                database: self.name.to_string(),
153
80614
                collection: collection.clone(),
154
80614
                ids: ids.to_vec(),
155
80614
            })
156
80614
            .await?)
157
161228
    }
158

            
159
136
    async fn list_from_collection(
160
136
        &self,
161
136
        ids: Range<DocumentId>,
162
136
        order: Sort,
163
136
        limit: Option<u32>,
164
136
        collection: &CollectionName,
165
136
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
166
136
        Ok(self
167
136
            .client
168
136
            .send_api_request(&List {
169
136
                database: self.name.to_string(),
170
136
                collection: collection.clone(),
171
136
                ids,
172
136
                order,
173
136
                limit,
174
136
            })
175
136
            .await?)
176
272
    }
177

            
178
34
    async fn list_headers_from_collection(
179
34
        &self,
180
34
        ids: Range<DocumentId>,
181
34
        order: Sort,
182
34
        limit: Option<u32>,
183
34
        collection: &CollectionName,
184
34
    ) -> Result<Vec<Header>, bonsaidb_core::Error> {
185
34
        Ok(self
186
34
            .client
187
34
            .send_api_request(&ListHeaders(List {
188
34
                database: self.name.to_string(),
189
34
                collection: collection.clone(),
190
34
                ids,
191
34
                order,
192
34
                limit,
193
34
            }))
194
34
            .await?)
195
68
    }
196

            
197
68
    async fn count_from_collection(
198
68
        &self,
199
68
        ids: Range<DocumentId>,
200
68
        collection: &CollectionName,
201
68
    ) -> Result<u64, bonsaidb_core::Error> {
202
68
        Ok(self
203
68
            .client
204
68
            .send_api_request(&Count {
205
68
                database: self.name.to_string(),
206
68
                collection: collection.clone(),
207
68
                ids,
208
68
            })
209
68
            .await?)
210
136
    }
211

            
212
34
    async fn compact_collection_by_name(
213
34
        &self,
214
34
        collection: CollectionName,
215
34
    ) -> Result<(), bonsaidb_core::Error> {
216
34
        self.send_api_request(&CompactCollection {
217
34
            database: self.name.to_string(),
218
34
            name: collection,
219
34
        })
220
34
        .await?;
221
34
        Ok(())
222
68
    }
223

            
224
81634
    async fn query_by_name(
225
81634
        &self,
226
81634
        view: &ViewName,
227
81634
        key: Option<SerializedQueryKey>,
228
81634
        order: Sort,
229
81634
        limit: Option<u32>,
230
81634
        access_policy: AccessPolicy,
231
81634
    ) -> Result<Vec<schema::view::map::Serialized>, bonsaidb_core::Error> {
232
81634
        Ok(self
233
81634
            .client
234
81634
            .send_api_request(&Query {
235
81634
                database: self.name.to_string(),
236
81634
                view: view.clone(),
237
81634
                key,
238
81634
                order,
239
81634
                limit,
240
81634
                access_policy,
241
81634
            })
242
81634
            .await?)
243
163268
    }
244

            
245
    async fn query_by_name_with_docs(
246
        &self,
247
        view: &ViewName,
248
        key: Option<SerializedQueryKey>,
249
        order: Sort,
250
        limit: Option<u32>,
251
        access_policy: AccessPolicy,
252
    ) -> Result<schema::view::map::MappedSerializedDocuments, bonsaidb_core::Error> {
253
        Ok(self
254
            .client
255
            .send_api_request(&QueryWithDocs(Query {
256
                database: self.name.to_string(),
257
                view: view.clone(),
258
                key,
259
                order,
260
                limit,
261
                access_policy,
262
            }))
263
            .await?)
264
    }
265

            
266
162129
    async fn reduce_by_name(
267
162129
        &self,
268
162129
        view: &ViewName,
269
162129
        key: Option<SerializedQueryKey>,
270
162129
        access_policy: AccessPolicy,
271
162129
    ) -> Result<Vec<u8>, bonsaidb_core::Error> {
272
162129
        Ok(self
273
162129
            .client
274
162129
            .send_api_request(&Reduce {
275
162129
                database: self.name.to_string(),
276
162129
                view: view.clone(),
277
162129
                key,
278
162129
                access_policy,
279
162129
            })
280
162129
            .await?
281
162095
            .into_vec())
282
324258
    }
283

            
284
102
    async fn reduce_grouped_by_name(
285
102
        &self,
286
102
        view: &ViewName,
287
102
        key: Option<SerializedQueryKey>,
288
102
        access_policy: AccessPolicy,
289
102
    ) -> Result<Vec<MappedSerializedValue>, bonsaidb_core::Error> {
290
102
        Ok(self
291
102
            .client
292
102
            .send_api_request(&ReduceGrouped(Reduce {
293
102
                database: self.name.to_string(),
294
102
                view: view.clone(),
295
102
                key,
296
102
                access_policy,
297
102
            }))
298
136
            .await?)
299
204
    }
300

            
301
68
    async fn delete_docs_by_name(
302
68
        &self,
303
68
        view: &ViewName,
304
68
        key: Option<SerializedQueryKey>,
305
68
        access_policy: AccessPolicy,
306
68
    ) -> Result<u64, bonsaidb_core::Error> {
307
68
        Ok(self
308
68
            .client
309
68
            .send_api_request(&DeleteDocs {
310
68
                database: self.name.to_string(),
311
68
                view: view.clone(),
312
68
                key,
313
68
                access_policy,
314
68
            })
315
68
            .await?)
316
136
    }
317
}
318

            
319
impl HasSchema for AsyncRemoteDatabase {
320
244783
    fn schematic(&self) -> &Schematic {
321
244783
        &self.schema
322
244783
    }
323
}