1
use std::{ops::Deref, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use bonsaidb_core::{
5
    circulate::Message,
6
    connection::{AccessPolicy, QueryKey, Range, Sort},
7
    document::{AnyDocumentId, OwnedDocument},
8
    keyvalue::KeyValue,
9
    pubsub::{PubSub, Subscriber},
10
    schema::{self, view::map::MappedDocuments, Map, MappedValue, SerializedView},
11
    transaction::Transaction,
12
};
13
use bonsaidb_local::Database;
14

            
15
use crate::{Backend, CustomServer, NoBackend};
16

            
17
/// A database belonging to a [`CustomServer`].
18
pub struct ServerDatabase<B: Backend = NoBackend> {
19
    pub(crate) server: CustomServer<B>,
20
    pub(crate) db: Database,
21
}
22

            
23
impl<B: Backend> Deref for ServerDatabase<B> {
24
    type Target = Database;
25

            
26
100
    fn deref(&self) -> &Self::Target {
27
100
        &self.db
28
100
    }
29
}
30

            
31
/// Uses `CustomServer`'s `PubSub` relay.
32
#[async_trait]
33
impl<B: Backend> PubSub for ServerDatabase<B> {
34
    type Subscriber = ServerSubscriber<B>;
35

            
36
7
    async fn create_subscriber(&self) -> Result<Self::Subscriber, bonsaidb_core::Error> {
37
7
        Ok(self
38
7
            .server
39
7
            .create_subscriber(self.db.name().to_string())
40
            .await)
41
14
    }
42

            
43
8
    async fn publish<S: Into<String> + Send, P: serde::Serialize + Sync>(
44
8
        &self,
45
8
        topic: S,
46
8
        payload: &P,
47
8
    ) -> Result<(), bonsaidb_core::Error> {
48
        self.server
49
8
            .publish_message(self.db.name(), &topic.into(), pot::to_vec(payload)?)
50
            .await;
51
8
        Ok(())
52
16
    }
53

            
54
1
    async fn publish_to_all<P: serde::Serialize + Sync>(
55
1
        &self,
56
1
        topics: Vec<String>,
57
1
        payload: &P,
58
1
    ) -> Result<(), bonsaidb_core::Error> {
59
        self.server
60
1
            .publish_serialized_to_all(self.db.name(), &topics, pot::to_vec(payload)?)
61
            .await;
62
1
        Ok(())
63
2
    }
64
}
65

            
66
/// A `PubSub` subscriber for a [`CustomServer`].
67
pub struct ServerSubscriber<B: Backend> {
68
    /// The unique ID of this subscriber.
69
    pub id: u64,
70
    pub(crate) database: String,
71
    pub(crate) server: CustomServer<B>,
72
    pub(crate) receiver: flume::Receiver<Arc<Message>>,
73
}
74

            
75
#[async_trait]
76
impl<B: Backend> Subscriber for ServerSubscriber<B> {
77
12
    async fn subscribe_to<S: Into<String> + Send>(
78
12
        &self,
79
12
        topic: S,
80
12
    ) -> Result<(), bonsaidb_core::Error> {
81
12
        self.server
82
12
            .subscribe_to(self.id, &self.database, topic)
83
            .await
84
24
    }
85

            
86
1
    async fn unsubscribe_from(&self, topic: &str) -> Result<(), bonsaidb_core::Error> {
87
1
        self.server
88
1
            .unsubscribe_from(self.id, &self.database, topic)
89
            .await
90
2
    }
91

            
92
17
    fn receiver(&self) -> &'_ flume::Receiver<Arc<Message>> {
93
17
        &self.receiver
94
17
    }
95
}
96

            
97
/// Pass-through implementation
98
#[async_trait]
99
impl<B: Backend> bonsaidb_core::connection::Connection for ServerDatabase<B> {
100
6140
    async fn get<C, PrimaryKey>(
101
6140
        &self,
102
6140
        id: PrimaryKey,
103
6140
    ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error>
104
6140
    where
105
6140
        C: schema::Collection,
106
6140
        PrimaryKey: Into<AnyDocumentId<C::PrimaryKey>> + Send,
107
6140
    {
108
6140
        self.db.get::<C, PrimaryKey>(id).await
109
12280
    }
110

            
111
2
    async fn get_multiple<C, PrimaryKey, DocumentIds, I>(
112
2
        &self,
113
2
        ids: DocumentIds,
114
2
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error>
115
2
    where
116
2
        C: schema::Collection,
117
2
        DocumentIds: IntoIterator<Item = PrimaryKey, IntoIter = I> + Send + Sync,
118
2
        I: Iterator<Item = PrimaryKey> + Send + Sync,
119
2
        PrimaryKey: Into<AnyDocumentId<C::PrimaryKey>> + Send + Sync,
120
2
    {
121
2
        self.db.get_multiple::<C, _, _, _>(ids).await
122
4
    }
123

            
124
4
    async fn list<C, R, PrimaryKey>(
125
4
        &self,
126
4
        ids: R,
127
4
        order: Sort,
128
4
        limit: Option<usize>,
129
4
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error>
130
4
    where
131
4
        C: schema::Collection,
132
4
        R: Into<Range<PrimaryKey>> + Send,
133
4
        PrimaryKey: Into<AnyDocumentId<C::PrimaryKey>> + Send,
134
4
    {
135
4
        self.db.list::<C, R, PrimaryKey>(ids, order, limit).await
136
8
    }
137

            
138
2
    async fn count<C, R, PrimaryKey>(&self, ids: R) -> Result<u64, bonsaidb_core::Error>
139
2
    where
140
2
        C: schema::Collection,
141
2
        R: Into<Range<PrimaryKey>> + Send,
142
2
        PrimaryKey: Into<AnyDocumentId<C::PrimaryKey>> + Send,
143
2
    {
144
2
        self.db.count::<C, R, PrimaryKey>(ids).await
145
4
    }
146

            
147
146
    async fn query<V: SerializedView>(
148
146
        &self,
149
146
        key: Option<QueryKey<V::Key>>,
150
146
        order: Sort,
151
146
        limit: Option<usize>,
152
146
        access_policy: AccessPolicy,
153
146
    ) -> Result<Vec<Map<V::Key, V::Value>>, bonsaidb_core::Error>
154
146
    where
155
146
        Self: Sized,
156
146
    {
157
189
        self.db.query::<V>(key, order, limit, access_policy).await
158
292
    }
159

            
160
4042
    async fn query_with_docs<V: SerializedView>(
161
4042
        &self,
162
4042
        key: Option<QueryKey<V::Key>>,
163
4042
        order: Sort,
164
4042
        limit: Option<usize>,
165
4042
        access_policy: AccessPolicy,
166
4042
    ) -> Result<MappedDocuments<OwnedDocument, V>, bonsaidb_core::Error>
167
4042
    where
168
4042
        Self: Sized,
169
4042
    {
170
4042
        self.db
171
4715
            .query_with_docs::<V>(key, order, limit, access_policy)
172
4715
            .await
173
8084
    }
174

            
175
7963
    async fn reduce<V: SerializedView>(
176
7963
        &self,
177
7963
        key: Option<QueryKey<V::Key>>,
178
7963
        access_policy: AccessPolicy,
179
7963
    ) -> Result<V::Value, bonsaidb_core::Error>
180
7963
    where
181
7963
        Self: Sized,
182
7964
    {
183
7965
        self.db.reduce::<V>(key, access_policy).await
184
15928
    }
185

            
186
2
    async fn reduce_grouped<V: SerializedView>(
187
2
        &self,
188
2
        key: Option<QueryKey<V::Key>>,
189
2
        access_policy: AccessPolicy,
190
2
    ) -> Result<Vec<MappedValue<V::Key, V::Value>>, bonsaidb_core::Error>
191
2
    where
192
2
        Self: Sized,
193
2
    {
194
2
        self.db.reduce_grouped::<V>(key, access_policy).await
195
4
    }
196

            
197
1
    async fn delete_docs<V: SerializedView>(
198
1
        &self,
199
1
        key: Option<QueryKey<V::Key>>,
200
1
        access_policy: AccessPolicy,
201
1
    ) -> Result<u64, bonsaidb_core::Error>
202
1
    where
203
1
        Self: Sized,
204
1
    {
205
2
        self.db.delete_docs::<V>(key, access_policy).await
206
2
    }
207

            
208
4439
    async fn apply_transaction(
209
4439
        &self,
210
4439
        transaction: Transaction,
211
4439
    ) -> Result<Vec<bonsaidb_core::transaction::OperationResult>, bonsaidb_core::Error> {
212
4446
        self.db.apply_transaction(transaction).await
213
8877
    }
214

            
215
20
    async fn list_executed_transactions(
216
20
        &self,
217
20
        starting_id: Option<u64>,
218
20
        result_limit: Option<usize>,
219
20
    ) -> Result<Vec<bonsaidb_core::transaction::Executed>, bonsaidb_core::Error> {
220
20
        self.db
221
20
            .list_executed_transactions(starting_id, result_limit)
222
19
            .await
223
40
    }
224

            
225
1
    async fn last_transaction_id(&self) -> Result<Option<u64>, bonsaidb_core::Error> {
226
1
        self.db.last_transaction_id().await
227
2
    }
228

            
229
1
    async fn compact_collection<C: schema::Collection>(&self) -> Result<(), bonsaidb_core::Error> {
230
1
        self.db.compact_collection::<C>().await
231
2
    }
232

            
233
1
    async fn compact(&self) -> Result<(), bonsaidb_core::Error> {
234
1
        self.db.compact().await
235
2
    }
236

            
237
1
    async fn compact_key_value_store(&self) -> Result<(), bonsaidb_core::Error> {
238
1
        self.db.compact_key_value_store().await
239
2
    }
240
}
241

            
242
/// Pass-through implementation
243
#[async_trait]
244
impl<B: Backend> KeyValue for ServerDatabase<B> {
245
114
    async fn execute_key_operation(
246
114
        &self,
247
114
        op: bonsaidb_core::keyvalue::KeyOperation,
248
114
    ) -> Result<bonsaidb_core::keyvalue::Output, bonsaidb_core::Error> {
249
114
        self.db.execute_key_operation(op).await
250
228
    }
251
}