1
use std::ops::Deref;
2

            
3
use async_trait::async_trait;
4
use bonsaidb_core::{
5
    arc_bytes::serde::Bytes,
6
    connection::{AccessPolicy, AsyncLowLevelConnection, HasSession, QueryKey, Range, Sort},
7
    document::{DocumentId, Header, OwnedDocument},
8
    keyvalue::AsyncKeyValue,
9
    permissions::Permissions,
10
    pubsub::AsyncPubSub,
11
    schema::{self, view::map::MappedSerializedValue, CollectionName, Schematic, ViewName},
12
    transaction::{OperationResult, Transaction},
13
};
14
use bonsaidb_local::{AsyncDatabase, Database};
15
use derive_where::derive_where;
16

            
17
use crate::{Backend, CustomServer, NoBackend};
18

            
19
/// A database belonging to a [`CustomServer`].
20
#[derive_where(Debug)]
21
pub struct ServerDatabase<B: Backend = NoBackend> {
22
    pub(crate) server: CustomServer<B>,
23
    pub(crate) db: AsyncDatabase,
24
}
25

            
26
impl<B: Backend> From<ServerDatabase<B>> for Database {
27
    fn from(server: ServerDatabase<B>) -> Self {
28
        Self::from(server.db)
29
    }
30
}
31

            
32
impl<'a, B: Backend> From<&'a ServerDatabase<B>> for Database {
33
    fn from(server: &'a ServerDatabase<B>) -> Self {
34
        Self::from(server.db.clone())
35
    }
36
}
37

            
38
impl<B: Backend> ServerDatabase<B> {
39
    /// Restricts an unauthenticated instance to having `effective_permissions`.
40
    /// Returns `None` if a session has already been established.
41
    #[must_use]
42
    pub fn with_effective_permissions(&self, effective_permissions: Permissions) -> Option<Self> {
43
        self.db
44
            .with_effective_permissions(effective_permissions)
45
            .map(|db| Self {
46
                db,
47
                server: self.server.clone(),
48
            })
49
    }
50
}
51

            
52
impl<B: Backend> Deref for ServerDatabase<B> {
53
    type Target = AsyncDatabase;
54

            
55
100
    fn deref(&self) -> &Self::Target {
56
100
        &self.db
57
100
    }
58
}
59

            
60
/// Uses `CustomServer`'s `PubSub` relay.
61
#[async_trait]
62
impl<B: Backend> AsyncPubSub for ServerDatabase<B> {
63
    type Subscriber = bonsaidb_local::Subscriber;
64

            
65
8
    async fn create_subscriber(&self) -> Result<Self::Subscriber, bonsaidb_core::Error> {
66
8
        let subscriber = self.db.create_subscriber().await?;
67
8
        Ok(subscriber)
68
16
    }
69

            
70
10
    async fn publish_bytes(
71
10
        &self,
72
10
        topic: Vec<u8>,
73
10
        payload: Vec<u8>,
74
10
    ) -> Result<(), bonsaidb_core::Error> {
75
10
        self.db.publish_bytes(topic, payload).await
76
20
    }
77

            
78
1
    async fn publish_bytes_to_all(
79
1
        &self,
80
1
        topics: impl IntoIterator<Item = Vec<u8>> + Send + 'async_trait,
81
1
        payload: Vec<u8>,
82
1
    ) -> Result<(), bonsaidb_core::Error> {
83
1
        self.db.publish_bytes_to_all(topics, payload).await
84
2
    }
85
}
86

            
87
impl<B: Backend> HasSession for ServerDatabase<B> {
88
    fn session(&self) -> Option<&bonsaidb_core::connection::Session> {
89
        self.server.session()
90
    }
91
}
92

            
93
/// Pass-through implementation
94
#[async_trait]
95
impl<B: Backend> bonsaidb_core::connection::AsyncConnection for ServerDatabase<B> {
96
    type Storage = CustomServer<B>;
97

            
98
    fn storage(&self) -> Self::Storage {
99
        self.server.clone()
100
    }
101

            
102
20
    async fn list_executed_transactions(
103
20
        &self,
104
20
        starting_id: Option<u64>,
105
20
        result_limit: Option<u32>,
106
20
    ) -> Result<Vec<bonsaidb_core::transaction::Executed>, bonsaidb_core::Error> {
107
20
        self.db
108
20
            .list_executed_transactions(starting_id, result_limit)
109
20
            .await
110
40
    }
111

            
112
1
    async fn last_transaction_id(&self) -> Result<Option<u64>, bonsaidb_core::Error> {
113
1
        self.db.last_transaction_id().await
114
2
    }
115

            
116
1
    async fn compact_collection<C: schema::Collection>(&self) -> Result<(), bonsaidb_core::Error> {
117
1
        self.db.compact_collection::<C>().await
118
2
    }
119

            
120
1
    async fn compact(&self) -> Result<(), bonsaidb_core::Error> {
121
1
        self.db.compact().await
122
2
    }
123

            
124
1
    async fn compact_key_value_store(&self) -> Result<(), bonsaidb_core::Error> {
125
1
        self.db.compact_key_value_store().await
126
2
    }
127
}
128

            
129
/// Pass-through implementation
130
#[async_trait]
131
impl<B: Backend> AsyncKeyValue for ServerDatabase<B> {
132
118
    async fn execute_key_operation(
133
118
        &self,
134
118
        op: bonsaidb_core::keyvalue::KeyOperation,
135
118
    ) -> Result<bonsaidb_core::keyvalue::Output, bonsaidb_core::Error> {
136
118
        self.db.execute_key_operation(op).await
137
236
    }
138
}
139

            
140
#[async_trait]
141
impl<B: Backend> AsyncLowLevelConnection for ServerDatabase<B> {
142
13740
    fn schematic(&self) -> &Schematic {
143
13740
        self.db.schematic()
144
13740
    }
145

            
146
6878
    async fn get_from_collection(
147
6878
        &self,
148
6878
        id: DocumentId,
149
6878
        collection: &CollectionName,
150
6878
    ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error> {
151
6878
        self.db.get_from_collection(id, collection).await
152
13756
    }
153

            
154
4
    async fn list_from_collection(
155
4
        &self,
156
4
        ids: Range<DocumentId>,
157
4
        order: Sort,
158
4
        limit: Option<u32>,
159
4
        collection: &CollectionName,
160
4
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
161
4
        self.db
162
4
            .list_from_collection(ids, order, limit, collection)
163
4
            .await
164
8
    }
165

            
166
1
    async fn list_headers_from_collection(
167
1
        &self,
168
1
        ids: Range<DocumentId>,
169
1
        order: Sort,
170
1
        limit: Option<u32>,
171
1
        collection: &CollectionName,
172
1
    ) -> Result<Vec<Header>, bonsaidb_core::Error> {
173
1
        self.db
174
1
            .list_headers_from_collection(ids, order, limit, collection)
175
1
            .await
176
2
    }
177

            
178
2
    async fn count_from_collection(
179
2
        &self,
180
2
        ids: Range<DocumentId>,
181
2
        collection: &CollectionName,
182
2
    ) -> Result<u64, bonsaidb_core::Error> {
183
2
        self.db.count_from_collection(ids, collection).await
184
4
    }
185

            
186
4617
    async fn get_multiple_from_collection(
187
4617
        &self,
188
4617
        ids: &[DocumentId],
189
4617
        collection: &CollectionName,
190
4617
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
191
4617
        self.db.get_multiple_from_collection(ids, collection).await
192
9234
    }
193

            
194
    async fn compact_collection_by_name(
195
        &self,
196
        collection: CollectionName,
197
    ) -> Result<(), bonsaidb_core::Error> {
198
        self.db.compact_collection_by_name(collection).await
199
    }
200

            
201
4754
    async fn query_by_name(
202
4754
        &self,
203
4754
        view: &ViewName,
204
4754
        key: Option<QueryKey<Bytes>>,
205
4754
        order: Sort,
206
4754
        limit: Option<u32>,
207
4754
        access_policy: AccessPolicy,
208
4754
    ) -> Result<Vec<schema::view::map::Serialized>, bonsaidb_core::Error> {
209
4753
        self.db
210
4754
            .query_by_name(view, key, order, limit, access_policy)
211
4148
            .await
212
9507
    }
213

            
214
    async fn query_by_name_with_docs(
215
        &self,
216
        view: &ViewName,
217
        key: Option<QueryKey<Bytes>>,
218
        order: Sort,
219
        limit: Option<u32>,
220
        access_policy: AccessPolicy,
221
    ) -> Result<schema::view::map::MappedSerializedDocuments, bonsaidb_core::Error> {
222
        self.db
223
            .query_by_name_with_docs(view, key, order, limit, access_policy)
224
            .await
225
    }
226

            
227
8982
    async fn reduce_by_name(
228
8982
        &self,
229
8982
        view: &ViewName,
230
8982
        key: Option<QueryKey<Bytes>>,
231
8982
        access_policy: AccessPolicy,
232
8982
    ) -> Result<Vec<u8>, bonsaidb_core::Error> {
233
8982
        self.db.reduce_by_name(view, key, access_policy).await
234
17964
    }
235

            
236
2
    async fn reduce_grouped_by_name(
237
2
        &self,
238
2
        view: &ViewName,
239
2
        key: Option<QueryKey<Bytes>>,
240
2
        access_policy: AccessPolicy,
241
2
    ) -> Result<Vec<MappedSerializedValue>, bonsaidb_core::Error> {
242
2
        self.db
243
2
            .reduce_grouped_by_name(view, key, access_policy)
244
2
            .await
245
4
    }
246

            
247
2
    async fn delete_docs_by_name(
248
2
        &self,
249
2
        view: &ViewName,
250
2
        key: Option<QueryKey<Bytes>>,
251
2
        access_policy: AccessPolicy,
252
2
    ) -> Result<u64, bonsaidb_core::Error> {
253
2
        self.db.delete_docs_by_name(view, key, access_policy).await
254
4
    }
255

            
256
4938
    async fn apply_transaction(
257
4938
        &self,
258
4938
        transaction: Transaction,
259
4938
    ) -> Result<Vec<OperationResult>, bonsaidb_core::Error> {
260
4938
        self.db.apply_transaction(transaction).await
261
9876
    }
262
}