1
use std::{ops::Deref, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use bonsaidb_core::{
5
    circulate::Message,
6
    connection::{AccessPolicy, QueryKey, Range, Sort},
7
    document::{AnyDocumentId, OwnedDocument},
8
    keyvalue::KeyValue,
9
    pubsub::{PubSub, Subscriber},
10
    schema::{self, view::map::MappedDocuments, Map, MappedValue, SerializedView},
11
    transaction::Transaction,
12
};
13
use bonsaidb_local::Database;
14

            
15
use crate::{Backend, CustomServer, NoBackend};
16

            
17
/// A database belonging to a [`CustomServer`].
18
pub struct ServerDatabase<B: Backend = NoBackend> {
19
    pub(crate) server: CustomServer<B>,
20
    pub(crate) db: Database,
21
}
22

            
23
impl<B: Backend> Deref for ServerDatabase<B> {
24
    type Target = Database;
25

            
26
100
    fn deref(&self) -> &Self::Target {
27
100
        &self.db
28
100
    }
29
}
30

            
31
/// Uses `CustomServer`'s `PubSub` relay.
32
#[async_trait]
33
impl<B: Backend> PubSub for ServerDatabase<B> {
34
    type Subscriber = ServerSubscriber<B>;
35

            
36
7
    async fn create_subscriber(&self) -> Result<Self::Subscriber, bonsaidb_core::Error> {
37
7
        Ok(self
38
7
            .server
39
7
            .create_subscriber(self.db.name().to_string())
40
            .await)
41
14
    }
42

            
43
8
    async fn publish<S: Into<String> + Send, P: serde::Serialize + Sync>(
44
8
        &self,
45
8
        topic: S,
46
8
        payload: &P,
47
8
    ) -> Result<(), bonsaidb_core::Error> {
48
        self.server
49
8
            .publish_message(self.db.name(), &topic.into(), pot::to_vec(payload)?)
50
            .await;
51
8
        Ok(())
52
16
    }
53

            
54
1
    async fn publish_to_all<P: serde::Serialize + Sync>(
55
1
        &self,
56
1
        topics: Vec<String>,
57
1
        payload: &P,
58
1
    ) -> Result<(), bonsaidb_core::Error> {
59
        self.server
60
1
            .publish_serialized_to_all(self.db.name(), &topics, pot::to_vec(payload)?)
61
            .await;
62
1
        Ok(())
63
2
    }
64
}
65

            
66
/// A `PubSub` subscriber for a [`CustomServer`].
67
pub struct ServerSubscriber<B: Backend> {
68
    /// The unique ID of this subscriber.
69
    pub id: u64,
70
    pub(crate) database: String,
71
    pub(crate) server: CustomServer<B>,
72
    pub(crate) receiver: flume::Receiver<Arc<Message>>,
73
}
74

            
75
#[async_trait]
76
impl<B: Backend> Subscriber for ServerSubscriber<B> {
77
12
    async fn subscribe_to<S: Into<String> + Send>(
78
12
        &self,
79
12
        topic: S,
80
12
    ) -> Result<(), bonsaidb_core::Error> {
81
12
        self.server
82
12
            .subscribe_to(self.id, &self.database, topic)
83
            .await
84
24
    }
85

            
86
1
    async fn unsubscribe_from(&self, topic: &str) -> Result<(), bonsaidb_core::Error> {
87
1
        self.server
88
1
            .unsubscribe_from(self.id, &self.database, topic)
89
            .await
90
2
    }
91

            
92
17
    fn receiver(&self) -> &'_ flume::Receiver<Arc<Message>> {
93
17
        &self.receiver
94
17
    }
95
}
96

            
97
/// Pass-through implementation
98
#[async_trait]
99
impl<B: Backend> bonsaidb_core::connection::Connection for ServerDatabase<B> {
100
5882
    async fn get<C, PrimaryKey>(
101
5882
        &self,
102
5882
        id: PrimaryKey,
103
5882
    ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error>
104
5882
    where
105
5882
        C: schema::Collection,
106
5882
        PrimaryKey: Into<AnyDocumentId<C::PrimaryKey>> + Send,
107
5882
    {
108
5882
        self.db.get::<C, PrimaryKey>(id).await
109
11764
    }
110

            
111
2
    async fn get_multiple<C, PrimaryKey, DocumentIds, I>(
112
2
        &self,
113
2
        ids: DocumentIds,
114
2
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error>
115
2
    where
116
2
        C: schema::Collection,
117
2
        DocumentIds: IntoIterator<Item = PrimaryKey, IntoIter = I> + Send + Sync,
118
2
        I: Iterator<Item = PrimaryKey> + Send + Sync,
119
2
        PrimaryKey: Into<AnyDocumentId<C::PrimaryKey>> + Send + Sync,
120
2
    {
121
2
        self.db.get_multiple::<C, _, _, _>(ids).await
122
4
    }
123

            
124
4
    async fn list<C, R, PrimaryKey>(
125
4
        &self,
126
4
        ids: R,
127
4
        order: Sort,
128
4
        limit: Option<usize>,
129
4
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error>
130
4
    where
131
4
        C: schema::Collection,
132
4
        R: Into<Range<PrimaryKey>> + Send,
133
4
        PrimaryKey: Into<AnyDocumentId<C::PrimaryKey>> + Send,
134
4
    {
135
4
        self.db.list::<C, R, PrimaryKey>(ids, order, limit).await
136
8
    }
137

            
138
146
    async fn query<V: SerializedView>(
139
146
        &self,
140
146
        key: Option<QueryKey<V::Key>>,
141
146
        order: Sort,
142
146
        limit: Option<usize>,
143
146
        access_policy: AccessPolicy,
144
146
    ) -> Result<Vec<Map<V::Key, V::Value>>, bonsaidb_core::Error>
145
146
    where
146
146
        Self: Sized,
147
146
    {
148
147
        self.db.query::<V>(key, order, limit, access_policy).await
149
292
    }
150

            
151
4004
    async fn query_with_docs<V: SerializedView>(
152
4004
        &self,
153
4004
        key: Option<QueryKey<V::Key>>,
154
4004
        order: Sort,
155
4004
        limit: Option<usize>,
156
4004
        access_policy: AccessPolicy,
157
4004
    ) -> Result<MappedDocuments<OwnedDocument, V>, bonsaidb_core::Error>
158
4004
    where
159
4004
        Self: Sized,
160
4004
    {
161
4004
        self.db
162
4368
            .query_with_docs::<V>(key, order, limit, access_policy)
163
4368
            .await
164
8008
    }
165

            
166
7704
    async fn reduce<V: SerializedView>(
167
7704
        &self,
168
7704
        key: Option<QueryKey<V::Key>>,
169
7704
        access_policy: AccessPolicy,
170
7704
    ) -> Result<V::Value, bonsaidb_core::Error>
171
7704
    where
172
7704
        Self: Sized,
173
7704
    {
174
7705
        self.db.reduce::<V>(key, access_policy).await
175
15408
    }
176

            
177
2
    async fn reduce_grouped<V: SerializedView>(
178
2
        &self,
179
2
        key: Option<QueryKey<V::Key>>,
180
2
        access_policy: AccessPolicy,
181
2
    ) -> Result<Vec<MappedValue<V::Key, V::Value>>, bonsaidb_core::Error>
182
2
    where
183
2
        Self: Sized,
184
2
    {
185
2
        self.db.reduce_grouped::<V>(key, access_policy).await
186
4
    }
187

            
188
1
    async fn delete_docs<V: SerializedView>(
189
1
        &self,
190
1
        key: Option<QueryKey<V::Key>>,
191
1
        access_policy: AccessPolicy,
192
1
    ) -> Result<u64, bonsaidb_core::Error>
193
1
    where
194
1
        Self: Sized,
195
1
    {
196
2
        self.db.delete_docs::<V>(key, access_policy).await
197
2
    }
198

            
199
4407
    async fn apply_transaction(
200
4407
        &self,
201
4407
        transaction: Transaction,
202
4407
    ) -> Result<Vec<bonsaidb_core::transaction::OperationResult>, bonsaidb_core::Error> {
203
4408
        self.db.apply_transaction(transaction).await
204
8813
    }
205

            
206
20
    async fn list_executed_transactions(
207
20
        &self,
208
20
        starting_id: Option<u64>,
209
20
        result_limit: Option<usize>,
210
20
    ) -> Result<Vec<bonsaidb_core::transaction::Executed>, bonsaidb_core::Error> {
211
20
        self.db
212
20
            .list_executed_transactions(starting_id, result_limit)
213
19
            .await
214
40
    }
215

            
216
1
    async fn last_transaction_id(&self) -> Result<Option<u64>, bonsaidb_core::Error> {
217
1
        self.db.last_transaction_id().await
218
2
    }
219

            
220
1
    async fn compact_collection<C: schema::Collection>(&self) -> Result<(), bonsaidb_core::Error> {
221
1
        self.db.compact_collection::<C>().await
222
2
    }
223

            
224
1
    async fn compact(&self) -> Result<(), bonsaidb_core::Error> {
225
1
        self.db.compact().await
226
2
    }
227

            
228
1
    async fn compact_key_value_store(&self) -> Result<(), bonsaidb_core::Error> {
229
1
        self.db.compact_key_value_store().await
230
2
    }
231
}
232

            
233
/// Pass-through implementation
234
#[async_trait]
235
impl<B: Backend> KeyValue for ServerDatabase<B> {
236
114
    async fn execute_key_operation(
237
114
        &self,
238
114
        op: bonsaidb_core::keyvalue::KeyOperation,
239
114
    ) -> Result<bonsaidb_core::keyvalue::Output, bonsaidb_core::Error> {
240
114
        self.db.execute_key_operation(op).await
241
228
    }
242
}