1
use std::ops::Deref;
2

            
3
use async_trait::async_trait;
4
use bonsaidb_core::connection::{
5
    AccessPolicy, AsyncLowLevelConnection, HasSchema, HasSession, Range, SerializedQueryKey, Sort,
6
};
7
use bonsaidb_core::document::{DocumentId, Header, OwnedDocument};
8
use bonsaidb_core::keyvalue::AsyncKeyValue;
9
use bonsaidb_core::permissions::Permissions;
10
use bonsaidb_core::pubsub::AsyncPubSub;
11
use bonsaidb_core::schema::view::map::MappedSerializedValue;
12
use bonsaidb_core::schema::{self, CollectionName, Schematic, ViewName};
13
use bonsaidb_core::transaction::{OperationResult, Transaction};
14
use bonsaidb_local::{AsyncDatabase, Database};
15
use derive_where::derive_where;
16

            
17
use crate::{Backend, CustomServer, NoBackend};
18

            
19
/// A database belonging to a [`CustomServer`].
20
100
#[derive_where(Debug, Clone)]
21
pub struct ServerDatabase<B: Backend = NoBackend> {
22
    pub(crate) server: CustomServer<B>,
23
    pub(crate) db: AsyncDatabase,
24
}
25

            
26
impl<B: Backend> From<ServerDatabase<B>> for Database {
27
    fn from(server: ServerDatabase<B>) -> Self {
28
        Self::from(server.db)
29
    }
30
}
31

            
32
impl<'a, B: Backend> From<&'a ServerDatabase<B>> for Database {
33
    fn from(server: &'a ServerDatabase<B>) -> Self {
34
        Self::from(server.db.clone())
35
    }
36
}
37

            
38
impl<B: Backend> ServerDatabase<B> {
39
    /// Restricts an unauthenticated instance to having `effective_permissions`.
40
    /// Returns `None` if a session has already been established.
41
    #[must_use]
42
    pub fn with_effective_permissions(&self, effective_permissions: Permissions) -> Option<Self> {
43
        self.db
44
            .with_effective_permissions(effective_permissions)
45
            .map(|db| Self {
46
                db,
47
                server: self.server.clone(),
48
            })
49
    }
50
}
51

            
52
impl<B: Backend> Deref for ServerDatabase<B> {
53
    type Target = AsyncDatabase;
54

            
55
    fn deref(&self) -> &Self::Target {
56
        &self.db
57
    }
58
}
59

            
60
/// Uses `CustomServer`'s `PubSub` relay.
61
#[async_trait]
62
impl<B: Backend> AsyncPubSub for ServerDatabase<B> {
63
    type Subscriber = bonsaidb_local::Subscriber;
64

            
65
8
    async fn create_subscriber(&self) -> Result<Self::Subscriber, bonsaidb_core::Error> {
66
8
        let subscriber = self.db.create_subscriber().await?;
67
8
        Ok(subscriber)
68
24
    }
69

            
70
10
    async fn publish_bytes(
71
10
        &self,
72
10
        topic: Vec<u8>,
73
10
        payload: Vec<u8>,
74
10
    ) -> Result<(), bonsaidb_core::Error> {
75
10
        self.db.publish_bytes(topic, payload).await
76
30
    }
77

            
78
1
    async fn publish_bytes_to_all(
79
1
        &self,
80
1
        topics: impl IntoIterator<Item = Vec<u8>> + Send + 'async_trait,
81
1
        payload: Vec<u8>,
82
1
    ) -> Result<(), bonsaidb_core::Error> {
83
1
        self.db.publish_bytes_to_all(topics, payload).await
84
3
    }
85
}
86

            
87
impl<B: Backend> HasSession for ServerDatabase<B> {
88
    fn session(&self) -> Option<&bonsaidb_core::connection::Session> {
89
        self.server.session()
90
    }
91
}
92

            
93
/// Pass-through implementation
94
#[async_trait]
95
impl<B: Backend> bonsaidb_core::connection::AsyncConnection for ServerDatabase<B> {
96
    type Storage = CustomServer<B>;
97

            
98
    fn storage(&self) -> Self::Storage {
99
        self.server.clone()
100
    }
101

            
102
20
    async fn list_executed_transactions(
103
20
        &self,
104
20
        starting_id: Option<u64>,
105
20
        result_limit: Option<u32>,
106
20
    ) -> Result<Vec<bonsaidb_core::transaction::Executed>, bonsaidb_core::Error> {
107
20
        self.db
108
20
            .list_executed_transactions(starting_id, result_limit)
109
20
            .await
110
60
    }
111

            
112
1
    async fn last_transaction_id(&self) -> Result<Option<u64>, bonsaidb_core::Error> {
113
1
        self.db.last_transaction_id().await
114
3
    }
115

            
116
1
    async fn compact_collection<C: schema::Collection>(&self) -> Result<(), bonsaidb_core::Error> {
117
1
        self.db.compact_collection::<C>().await
118
3
    }
119

            
120
1
    async fn compact(&self) -> Result<(), bonsaidb_core::Error> {
121
1
        self.db.compact().await
122
3
    }
123

            
124
1
    async fn compact_key_value_store(&self) -> Result<(), bonsaidb_core::Error> {
125
1
        self.db.compact_key_value_store().await
126
3
    }
127
}
128

            
129
/// Pass-through implementation
130
#[async_trait]
131
impl<B: Backend> AsyncKeyValue for ServerDatabase<B> {
132
10118
    async fn execute_key_operation(
133
10118
        &self,
134
10118
        op: bonsaidb_core::keyvalue::KeyOperation,
135
10118
    ) -> Result<bonsaidb_core::keyvalue::Output, bonsaidb_core::Error> {
136
10118
        self.db.execute_key_operation(op).await
137
30354
    }
138
}
139

            
140
#[async_trait]
141
impl<B: Backend> AsyncLowLevelConnection for ServerDatabase<B> {
142
6976
    async fn get_from_collection(
143
6976
        &self,
144
6976
        id: DocumentId,
145
6976
        collection: &CollectionName,
146
6976
    ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error> {
147
6976
        self.db.get_from_collection(id, collection).await
148
20928
    }
149

            
150
4
    async fn list_from_collection(
151
4
        &self,
152
4
        ids: Range<DocumentId>,
153
4
        order: Sort,
154
4
        limit: Option<u32>,
155
4
        collection: &CollectionName,
156
4
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
157
4
        self.db
158
4
            .list_from_collection(ids, order, limit, collection)
159
4
            .await
160
12
    }
161

            
162
1
    async fn list_headers_from_collection(
163
1
        &self,
164
1
        ids: Range<DocumentId>,
165
1
        order: Sort,
166
1
        limit: Option<u32>,
167
1
        collection: &CollectionName,
168
1
    ) -> Result<Vec<Header>, bonsaidb_core::Error> {
169
1
        self.db
170
1
            .list_headers_from_collection(ids, order, limit, collection)
171
1
            .await
172
3
    }
173

            
174
2
    async fn count_from_collection(
175
2
        &self,
176
2
        ids: Range<DocumentId>,
177
2
        collection: &CollectionName,
178
2
    ) -> Result<u64, bonsaidb_core::Error> {
179
2
        self.db.count_from_collection(ids, collection).await
180
6
    }
181

            
182
4807
    async fn get_multiple_from_collection(
183
4807
        &self,
184
4807
        ids: &[DocumentId],
185
4807
        collection: &CollectionName,
186
4807
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
187
4807
        self.db.get_multiple_from_collection(ids, collection).await
188
14421
    }
189

            
190
    async fn compact_collection_by_name(
191
        &self,
192
        collection: CollectionName,
193
    ) -> Result<(), bonsaidb_core::Error> {
194
        self.db.compact_collection_by_name(collection).await
195
    }
196

            
197
4968
    async fn query_by_name(
198
4968
        &self,
199
4968
        view: &ViewName,
200
4968
        key: Option<SerializedQueryKey>,
201
4968
        order: Sort,
202
4968
        limit: Option<u32>,
203
4968
        access_policy: AccessPolicy,
204
4968
    ) -> Result<Vec<schema::view::map::Serialized>, bonsaidb_core::Error> {
205
4968
        self.db
206
4968
            .query_by_name(view, key, order, limit, access_policy)
207
4933
            .await
208
14904
    }
209

            
210
    async fn query_by_name_with_docs(
211
        &self,
212
        view: &ViewName,
213
        key: Option<SerializedQueryKey>,
214
        order: Sort,
215
        limit: Option<u32>,
216
        access_policy: AccessPolicy,
217
    ) -> Result<schema::view::map::MappedSerializedDocuments, bonsaidb_core::Error> {
218
        self.db
219
            .query_by_name_with_docs(view, key, order, limit, access_policy)
220
            .await
221
    }
222

            
223
9197
    async fn reduce_by_name(
224
9197
        &self,
225
9197
        view: &ViewName,
226
9197
        key: Option<SerializedQueryKey>,
227
9197
        access_policy: AccessPolicy,
228
9197
    ) -> Result<Vec<u8>, bonsaidb_core::Error> {
229
9197
        self.db.reduce_by_name(view, key, access_policy).await
230
27591
    }
231

            
232
2
    async fn reduce_grouped_by_name(
233
2
        &self,
234
2
        view: &ViewName,
235
2
        key: Option<SerializedQueryKey>,
236
2
        access_policy: AccessPolicy,
237
2
    ) -> Result<Vec<MappedSerializedValue>, bonsaidb_core::Error> {
238
2
        self.db
239
2
            .reduce_grouped_by_name(view, key, access_policy)
240
2
            .await
241
6
    }
242

            
243
2
    async fn delete_docs_by_name(
244
2
        &self,
245
2
        view: &ViewName,
246
2
        key: Option<SerializedQueryKey>,
247
2
        access_policy: AccessPolicy,
248
2
    ) -> Result<u64, bonsaidb_core::Error> {
249
2
        self.db.delete_docs_by_name(view, key, access_policy).await
250
6
    }
251

            
252
5096
    async fn apply_transaction(
253
5096
        &self,
254
5096
        transaction: Transaction,
255
5096
    ) -> Result<Vec<OperationResult>, bonsaidb_core::Error> {
256
5096
        self.db.apply_transaction(transaction).await
257
15288
    }
258
}
259

            
260
impl<B: Backend> HasSchema for ServerDatabase<B> {
261
14169
    fn schematic(&self) -> &Schematic {
262
14169
        self.db.schematic()
263
14169
    }
264
}