1
use std::{ops::Deref, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use bonsaidb_core::{
5
    arc_bytes::serde::Bytes,
6
    connection::{
7
        AccessPolicy, AsyncConnection, AsyncLowLevelConnection, HasSession, QueryKey, Range,
8
        Session, Sort,
9
    },
10
    document::{DocumentId, Header, OwnedDocument},
11
    networking::{
12
        ApplyTransaction, Compact, CompactCollection, CompactKeyValueStore, Count, DeleteDocs, Get,
13
        GetMultiple, LastTransactionId, List, ListExecutedTransactions, ListHeaders, Query,
14
        QueryWithDocs, Reduce, ReduceGrouped,
15
    },
16
    schema::{self, view::map::MappedSerializedValue, CollectionName, Schematic, ViewName},
17
    transaction::{Executed, OperationResult, Transaction},
18
};
19

            
20
use crate::Client;
21

            
22
mod pubsub;
23
pub use pubsub::*;
24

            
25
mod keyvalue;
26

            
27
/// A database on a remote server.
28
392
#[derive(Debug, Clone)]
29
pub struct RemoteDatabase {
30
    pub(crate) client: Client,
31
    pub(crate) name: Arc<String>,
32
    pub(crate) schema: Arc<Schematic>,
33
}
34
impl RemoteDatabase {
35
    /// Returns the name of the database.
36
    #[must_use]
37
    pub fn name(&self) -> &str {
38
        self.name.as_ref()
39
    }
40
}
41

            
42
impl Deref for RemoteDatabase {
43
    type Target = Client;
44

            
45
171
    fn deref(&self) -> &Self::Target {
46
171
        &self.client
47
171
    }
48
}
49

            
50
impl RemoteDatabase {
51
23769
    pub(crate) fn new(client: Client, name: String, schema: Arc<Schematic>) -> Self {
52
23769
        Self {
53
23769
            client,
54
23769
            name: Arc::new(name),
55
23769
            schema,
56
23769
        }
57
23769
    }
58
}
59

            
60
impl HasSession for RemoteDatabase {
61
    fn session(&self) -> Option<&Session> {
62
        Some(&self.session)
63
    }
64
}
65

            
66
#[async_trait]
67
impl AsyncConnection for RemoteDatabase {
68
    type Storage = Client;
69

            
70
    fn storage(&self) -> Self::Storage {
71
        self.client.clone()
72
    }
73

            
74
19684
    async fn list_executed_transactions(
75
19684
        &self,
76
19684
        starting_id: Option<u64>,
77
19684
        result_limit: Option<u32>,
78
19684
    ) -> Result<Vec<Executed>, bonsaidb_core::Error> {
79
19684
        Ok(self
80
19684
            .client
81
19684
            .send_api_request_async(&ListExecutedTransactions {
82
19684
                database: self.name.to_string(),
83
19684
                starting_id,
84
19684
                result_limit,
85
57475
            })
86
57475
            .await?)
87
39368
    }
88

            
89
38
    async fn last_transaction_id(&self) -> Result<Option<u64>, bonsaidb_core::Error> {
90
38
        Ok(self
91
38
            .client
92
38
            .send_api_request_async(&LastTransactionId {
93
38
                database: self.name.to_string(),
94
38
            })
95
38
            .await?)
96
76
    }
97

            
98
38
    async fn compact(&self) -> Result<(), bonsaidb_core::Error> {
99
38
        self.send_api_request_async(&Compact {
100
38
            database: self.name.to_string(),
101
38
        })
102
38
        .await?;
103
38
        Ok(())
104
76
    }
105

            
106
38
    async fn compact_key_value_store(&self) -> Result<(), bonsaidb_core::Error> {
107
38
        self.send_api_request_async(&CompactKeyValueStore {
108
38
            database: self.name.to_string(),
109
38
        })
110
38
        .await?;
111
38
        Ok(())
112
76
    }
113
}
114

            
115
#[async_trait]
116
impl AsyncLowLevelConnection for RemoteDatabase {
117
256405
    fn schematic(&self) -> &Schematic {
118
256405
        &self.schema
119
256405
    }
120

            
121
199557
    async fn apply_transaction(
122
199557
        &self,
123
199557
        transaction: Transaction,
124
199557
    ) -> Result<Vec<OperationResult>, bonsaidb_core::Error> {
125
199557
        Ok(self
126
199557
            .client
127
199557
            .send_api_request_async(&ApplyTransaction {
128
199557
                database: self.name.to_string(),
129
199557
                transaction,
130
400216
            })
131
400216
            .await?)
132
399114
    }
133

            
134
168872
    async fn get_from_collection(
135
168872
        &self,
136
168872
        id: DocumentId,
137
168872
        collection: &CollectionName,
138
168872
    ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error> {
139
168872
        Ok(self
140
168872
            .client
141
168872
            .send_api_request_async(&Get {
142
168872
                database: self.name.to_string(),
143
168872
                collection: collection.clone(),
144
168872
                id,
145
240597
            })
146
240597
            .await?)
147
337744
    }
148

            
149
84550
    async fn get_multiple_from_collection(
150
84550
        &self,
151
84550
        ids: &[DocumentId],
152
84550
        collection: &CollectionName,
153
84550
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
154
84550
        Ok(self
155
84550
            .client
156
84550
            .send_api_request_async(&GetMultiple {
157
84550
                database: self.name.to_string(),
158
84550
                collection: collection.clone(),
159
84550
                ids: ids.to_vec(),
160
84550
            })
161
84550
            .await?)
162
169100
    }
163

            
164
152
    async fn list_from_collection(
165
152
        &self,
166
152
        ids: Range<DocumentId>,
167
152
        order: Sort,
168
152
        limit: Option<u32>,
169
152
        collection: &CollectionName,
170
152
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
171
152
        Ok(self
172
152
            .client
173
152
            .send_api_request_async(&List {
174
152
                database: self.name.to_string(),
175
152
                collection: collection.clone(),
176
152
                ids,
177
152
                order,
178
152
                limit,
179
152
            })
180
152
            .await?)
181
304
    }
182

            
183
38
    async fn list_headers_from_collection(
184
38
        &self,
185
38
        ids: Range<DocumentId>,
186
38
        order: Sort,
187
38
        limit: Option<u32>,
188
38
        collection: &CollectionName,
189
38
    ) -> Result<Vec<Header>, bonsaidb_core::Error> {
190
38
        Ok(self
191
38
            .client
192
38
            .send_api_request_async(&ListHeaders(List {
193
38
                database: self.name.to_string(),
194
38
                collection: collection.clone(),
195
38
                ids,
196
38
                order,
197
38
                limit,
198
38
            }))
199
38
            .await?)
200
76
    }
201

            
202
76
    async fn count_from_collection(
203
76
        &self,
204
76
        ids: Range<DocumentId>,
205
76
        collection: &CollectionName,
206
76
    ) -> Result<u64, bonsaidb_core::Error> {
207
76
        Ok(self
208
76
            .client
209
76
            .send_api_request_async(&Count {
210
76
                database: self.name.to_string(),
211
76
                collection: collection.clone(),
212
76
                ids,
213
76
            })
214
76
            .await?)
215
152
    }
216

            
217
38
    async fn compact_collection_by_name(
218
38
        &self,
219
38
        collection: CollectionName,
220
38
    ) -> Result<(), bonsaidb_core::Error> {
221
38
        self.send_api_request_async(&CompactCollection {
222
38
            database: self.name.to_string(),
223
38
            name: collection,
224
38
        })
225
38
        .await?;
226
38
        Ok(())
227
76
    }
228

            
229
85462
    async fn query_by_name(
230
85462
        &self,
231
85462
        view: &ViewName,
232
85462
        key: Option<QueryKey<Bytes>>,
233
85462
        order: Sort,
234
85462
        limit: Option<u32>,
235
85462
        access_policy: AccessPolicy,
236
85462
    ) -> Result<Vec<schema::view::map::Serialized>, bonsaidb_core::Error> {
237
85462
        Ok(self
238
85462
            .client
239
85462
            .send_api_request_async(&Query {
240
85462
                database: self.name.to_string(),
241
85462
                view: view.clone(),
242
85462
                key,
243
85462
                order,
244
85462
                limit,
245
85462
                access_policy,
246
85462
            })
247
85462
            .await?)
248
170924
    }
249

            
250
    async fn query_by_name_with_docs(
251
        &self,
252
        view: &ViewName,
253
        key: Option<QueryKey<Bytes>>,
254
        order: Sort,
255
        limit: Option<u32>,
256
        access_policy: AccessPolicy,
257
    ) -> Result<schema::view::map::MappedSerializedDocuments, bonsaidb_core::Error> {
258
        Ok(self
259
            .client
260
            .send_api_request_async(&QueryWithDocs(Query {
261
                database: self.name.to_string(),
262
                view: view.clone(),
263
                key,
264
                order,
265
                limit,
266
                access_policy,
267
            }))
268
            .await?)
269
    }
270

            
271
170753
    async fn reduce_by_name(
272
170753
        &self,
273
170753
        view: &ViewName,
274
170753
        key: Option<QueryKey<Bytes>>,
275
170753
        access_policy: AccessPolicy,
276
170753
    ) -> Result<Vec<u8>, bonsaidb_core::Error> {
277
170753
        Ok(self
278
170753
            .client
279
170753
            .send_api_request_async(&Reduce {
280
170753
                database: self.name.to_string(),
281
170753
                view: view.clone(),
282
170753
                key,
283
170753
                access_policy,
284
170753
            })
285
170753
            .await?
286
170715
            .into_vec())
287
341506
    }
288

            
289
114
    async fn reduce_grouped_by_name(
290
114
        &self,
291
114
        view: &ViewName,
292
114
        key: Option<QueryKey<Bytes>>,
293
114
        access_policy: AccessPolicy,
294
114
    ) -> Result<Vec<MappedSerializedValue>, bonsaidb_core::Error> {
295
114
        Ok(self
296
114
            .client
297
114
            .send_api_request_async(&ReduceGrouped(Reduce {
298
114
                database: self.name.to_string(),
299
114
                view: view.clone(),
300
114
                key,
301
114
                access_policy,
302
152
            }))
303
152
            .await?)
304
228
    }
305

            
306
76
    async fn delete_docs_by_name(
307
76
        &self,
308
76
        view: &ViewName,
309
76
        key: Option<QueryKey<Bytes>>,
310
76
        access_policy: AccessPolicy,
311
76
    ) -> Result<u64, bonsaidb_core::Error> {
312
76
        Ok(self
313
76
            .client
314
76
            .send_api_request_async(&DeleteDocs {
315
76
                database: self.name.to_string(),
316
76
                view: view.clone(),
317
76
                key,
318
76
                access_policy,
319
76
            })
320
76
            .await?)
321
152
    }
322
}