1
use std::{ops::Deref, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use bonsaidb_core::{
5
    arc_bytes::serde::Bytes,
6
    connection::{
7
        AccessPolicy, AsyncConnection, AsyncLowLevelConnection, HasSession, QueryKey, Range,
8
        Session, Sort,
9
    },
10
    document::{DocumentId, Header, OwnedDocument},
11
    networking::{
12
        ApplyTransaction, Compact, CompactCollection, CompactKeyValueStore, Count, DeleteDocs, Get,
13
        GetMultiple, LastTransactionId, List, ListExecutedTransactions, ListHeaders, Query,
14
        QueryWithDocs, Reduce, ReduceGrouped,
15
    },
16
    schema::{self, view::map::MappedSerializedValue, CollectionName, Schematic, ViewName},
17
    transaction::{Executed, OperationResult, Transaction},
18
};
19

            
20
use crate::Client;
21

            
22
mod pubsub;
23
pub use pubsub::*;
24

            
25
mod keyvalue;
26

            
27
/// A database on a remote server.
28
392
#[derive(Debug, Clone)]
29
pub struct RemoteDatabase {
30
    pub(crate) client: Client,
31
    pub(crate) name: Arc<String>,
32
    pub(crate) schema: Arc<Schematic>,
33
}
34
impl RemoteDatabase {
35
    /// Returns the name of the database.
36
    #[must_use]
37
    pub fn name(&self) -> &str {
38
        self.name.as_ref()
39
    }
40
}
41

            
42
impl Deref for RemoteDatabase {
43
    type Target = Client;
44

            
45
180
    fn deref(&self) -> &Self::Target {
46
180
        &self.client
47
180
    }
48
}
49

            
50
impl RemoteDatabase {
51
25020
    pub(crate) fn new(client: Client, name: String, schema: Arc<Schematic>) -> Self {
52
25020
        Self {
53
25020
            client,
54
25020
            name: Arc::new(name),
55
25020
            schema,
56
25020
        }
57
25020
    }
58
}
59

            
60
impl HasSession for RemoteDatabase {
61
    fn session(&self) -> Option<&Session> {
62
        Some(&self.session)
63
    }
64
}
65

            
66
#[async_trait]
67
impl AsyncConnection for RemoteDatabase {
68
    type Storage = Client;
69

            
70
    fn storage(&self) -> Self::Storage {
71
        self.client.clone()
72
    }
73

            
74
20720
    async fn list_executed_transactions(
75
20720
        &self,
76
20720
        starting_id: Option<u64>,
77
20720
        result_limit: Option<u32>,
78
20720
    ) -> Result<Vec<Executed>, bonsaidb_core::Error> {
79
20720
        Ok(self
80
20720
            .client
81
20720
            .send_api_request_async(&ListExecutedTransactions {
82
20720
                database: self.name.to_string(),
83
20720
                starting_id,
84
20720
                result_limit,
85
57120
            })
86
57120
            .await?)
87
41440
    }
88

            
89
40
    async fn last_transaction_id(&self) -> Result<Option<u64>, bonsaidb_core::Error> {
90
40
        Ok(self
91
40
            .client
92
40
            .send_api_request_async(&LastTransactionId {
93
40
                database: self.name.to_string(),
94
40
            })
95
40
            .await?)
96
80
    }
97

            
98
40
    async fn compact(&self) -> Result<(), bonsaidb_core::Error> {
99
40
        self.send_api_request_async(&Compact {
100
40
            database: self.name.to_string(),
101
40
        })
102
40
        .await?;
103
40
        Ok(())
104
80
    }
105

            
106
40
    async fn compact_key_value_store(&self) -> Result<(), bonsaidb_core::Error> {
107
40
        self.send_api_request_async(&CompactKeyValueStore {
108
40
            database: self.name.to_string(),
109
40
        })
110
40
        .await?;
111
40
        Ok(())
112
80
    }
113
}
114

            
115
#[async_trait]
116
impl AsyncLowLevelConnection for RemoteDatabase {
117
281380
    fn schematic(&self) -> &Schematic {
118
281380
        &self.schema
119
281380
    }
120

            
121
212860
    async fn apply_transaction(
122
212860
        &self,
123
212860
        transaction: Transaction,
124
212860
    ) -> Result<Vec<OperationResult>, bonsaidb_core::Error> {
125
212860
        Ok(self
126
212860
            .client
127
212860
            .send_api_request_async(&ApplyTransaction {
128
212860
                database: self.name.to_string(),
129
212860
                transaction,
130
406740
            })
131
406740
            .await?)
132
425720
    }
133

            
134
186000
    async fn get_from_collection(
135
186000
        &self,
136
186000
        id: DocumentId,
137
186000
        collection: &CollectionName,
138
186000
    ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error> {
139
186000
        Ok(self
140
186000
            .client
141
186000
            .send_api_request_async(&Get {
142
186000
                database: self.name.to_string(),
143
186000
                collection: collection.clone(),
144
186000
                id,
145
256900
            })
146
256900
            .await?)
147
372000
    }
148

            
149
91880
    async fn get_multiple_from_collection(
150
91880
        &self,
151
91880
        ids: &[DocumentId],
152
91880
        collection: &CollectionName,
153
91880
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
154
91860
        Ok(self
155
91860
            .client
156
91860
            .send_api_request_async(&GetMultiple {
157
91860
                database: self.name.to_string(),
158
91860
                collection: collection.clone(),
159
91860
                ids: ids.to_vec(),
160
91880
            })
161
91880
            .await?)
162
183760
    }
163

            
164
160
    async fn list_from_collection(
165
160
        &self,
166
160
        ids: Range<DocumentId>,
167
160
        order: Sort,
168
160
        limit: Option<u32>,
169
160
        collection: &CollectionName,
170
160
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
171
160
        Ok(self
172
160
            .client
173
160
            .send_api_request_async(&List {
174
160
                database: self.name.to_string(),
175
160
                collection: collection.clone(),
176
160
                ids,
177
160
                order,
178
160
                limit,
179
160
            })
180
160
            .await?)
181
320
    }
182

            
183
40
    async fn list_headers_from_collection(
184
40
        &self,
185
40
        ids: Range<DocumentId>,
186
40
        order: Sort,
187
40
        limit: Option<u32>,
188
40
        collection: &CollectionName,
189
40
    ) -> Result<Vec<Header>, bonsaidb_core::Error> {
190
40
        Ok(self
191
40
            .client
192
40
            .send_api_request_async(&ListHeaders(List {
193
40
                database: self.name.to_string(),
194
40
                collection: collection.clone(),
195
40
                ids,
196
40
                order,
197
40
                limit,
198
40
            }))
199
40
            .await?)
200
80
    }
201

            
202
80
    async fn count_from_collection(
203
80
        &self,
204
80
        ids: Range<DocumentId>,
205
80
        collection: &CollectionName,
206
80
    ) -> Result<u64, bonsaidb_core::Error> {
207
80
        Ok(self
208
80
            .client
209
80
            .send_api_request_async(&Count {
210
80
                database: self.name.to_string(),
211
80
                collection: collection.clone(),
212
80
                ids,
213
80
            })
214
80
            .await?)
215
160
    }
216

            
217
40
    async fn compact_collection_by_name(
218
40
        &self,
219
40
        collection: CollectionName,
220
40
    ) -> Result<(), bonsaidb_core::Error> {
221
40
        self.send_api_request_async(&CompactCollection {
222
40
            database: self.name.to_string(),
223
40
            name: collection,
224
40
        })
225
40
        .await?;
226
40
        Ok(())
227
80
    }
228

            
229
92800
    async fn query_by_name(
230
92800
        &self,
231
92800
        view: &ViewName,
232
92800
        key: Option<QueryKey<Bytes>>,
233
92800
        order: Sort,
234
92800
        limit: Option<u32>,
235
92800
        access_policy: AccessPolicy,
236
92800
    ) -> Result<Vec<schema::view::map::Serialized>, bonsaidb_core::Error> {
237
92800
        Ok(self
238
92800
            .client
239
92800
            .send_api_request_async(&Query {
240
92800
                database: self.name.to_string(),
241
92800
                view: view.clone(),
242
92800
                key,
243
92800
                order,
244
92800
                limit,
245
92800
                access_policy,
246
92800
            })
247
92800
            .await?)
248
185600
    }
249

            
250
    async fn query_by_name_with_docs(
251
        &self,
252
        view: &ViewName,
253
        key: Option<QueryKey<Bytes>>,
254
        order: Sort,
255
        limit: Option<u32>,
256
        access_policy: AccessPolicy,
257
    ) -> Result<schema::view::map::MappedSerializedDocuments, bonsaidb_core::Error> {
258
        Ok(self
259
            .client
260
            .send_api_request_async(&QueryWithDocs(Query {
261
                database: self.name.to_string(),
262
                view: view.clone(),
263
                key,
264
                order,
265
                limit,
266
                access_policy,
267
            }))
268
            .await?)
269
    }
270

            
271
188440
    async fn reduce_by_name(
272
188440
        &self,
273
188440
        view: &ViewName,
274
188440
        key: Option<QueryKey<Bytes>>,
275
188440
        access_policy: AccessPolicy,
276
188460
    ) -> Result<Vec<u8>, bonsaidb_core::Error> {
277
188460
        Ok(self
278
188460
            .client
279
188460
            .send_api_request_async(&Reduce {
280
188460
                database: self.name.to_string(),
281
188460
                view: view.clone(),
282
188460
                key,
283
188460
                access_policy,
284
188460
            })
285
188460
            .await?
286
188400
            .into_vec())
287
376900
    }
288

            
289
120
    async fn reduce_grouped_by_name(
290
120
        &self,
291
120
        view: &ViewName,
292
120
        key: Option<QueryKey<Bytes>>,
293
120
        access_policy: AccessPolicy,
294
120
    ) -> Result<Vec<MappedSerializedValue>, bonsaidb_core::Error> {
295
120
        Ok(self
296
120
            .client
297
120
            .send_api_request_async(&ReduceGrouped(Reduce {
298
120
                database: self.name.to_string(),
299
120
                view: view.clone(),
300
120
                key,
301
120
                access_policy,
302
140
            }))
303
140
            .await?)
304
240
    }
305

            
306
40
    async fn delete_docs_by_name(
307
40
        &self,
308
40
        view: &ViewName,
309
40
        key: Option<QueryKey<Bytes>>,
310
40
        access_policy: AccessPolicy,
311
40
    ) -> Result<u64, bonsaidb_core::Error> {
312
40
        Ok(self
313
40
            .client
314
40
            .send_api_request_async(&DeleteDocs {
315
40
                database: self.name.to_string(),
316
40
                view: view.clone(),
317
40
                key,
318
40
                access_policy,
319
40
            })
320
40
            .await?)
321
80
    }
322
}