1
use std::{ops::Deref, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use bonsaidb_core::{
5
    circulate::Message,
6
    connection::{AccessPolicy, QueryKey, Range, Sort},
7
    document::OwnedDocument,
8
    keyvalue::KeyValue,
9
    pubsub::{PubSub, Subscriber},
10
    schema::{self, view::map::MappedDocuments, Collection, Map, MappedValue, SerializedView},
11
    transaction::Transaction,
12
};
13
use bonsaidb_local::Database;
14

            
15
use crate::{Backend, CustomServer, NoBackend};
16

            
17
/// A database belonging to a [`CustomServer`].
18
pub struct ServerDatabase<B: Backend = NoBackend> {
19
    pub(crate) server: CustomServer<B>,
20
    pub(crate) db: Database,
21
}
22

            
23
impl<B: Backend> Deref for ServerDatabase<B> {
24
    type Target = Database;
25

            
26
100
    fn deref(&self) -> &Self::Target {
27
100
        &self.db
28
100
    }
29
}
30

            
31
/// Uses `CustomServer`'s `PubSub` relay.
32
#[async_trait]
33
impl<B: Backend> PubSub for ServerDatabase<B> {
34
    type Subscriber = ServerSubscriber<B>;
35

            
36
7
    async fn create_subscriber(&self) -> Result<Self::Subscriber, bonsaidb_core::Error> {
37
7
        Ok(self
38
7
            .server
39
7
            .create_subscriber(self.db.name().to_string())
40
            .await)
41
14
    }
42

            
43
8
    async fn publish<S: Into<String> + Send, P: serde::Serialize + Sync>(
44
8
        &self,
45
8
        topic: S,
46
8
        payload: &P,
47
8
    ) -> Result<(), bonsaidb_core::Error> {
48
        self.server
49
8
            .publish_message(self.db.name(), &topic.into(), pot::to_vec(payload)?)
50
            .await;
51
8
        Ok(())
52
16
    }
53

            
54
1
    async fn publish_to_all<P: serde::Serialize + Sync>(
55
1
        &self,
56
1
        topics: Vec<String>,
57
1
        payload: &P,
58
1
    ) -> Result<(), bonsaidb_core::Error> {
59
        self.server
60
1
            .publish_serialized_to_all(self.db.name(), &topics, pot::to_vec(payload)?)
61
            .await;
62
1
        Ok(())
63
2
    }
64
}
65

            
66
/// A `PubSub` subscriber for a [`CustomServer`].
67
pub struct ServerSubscriber<B: Backend> {
68
    /// The unique ID of this subscriber.
69
    pub id: u64,
70
    pub(crate) database: String,
71
    pub(crate) server: CustomServer<B>,
72
    pub(crate) receiver: flume::Receiver<Arc<Message>>,
73
}
74

            
75
#[async_trait]
76
impl<B: Backend> Subscriber for ServerSubscriber<B> {
77
12
    async fn subscribe_to<S: Into<String> + Send>(
78
12
        &self,
79
12
        topic: S,
80
12
    ) -> Result<(), bonsaidb_core::Error> {
81
12
        self.server
82
12
            .subscribe_to(self.id, &self.database, topic)
83
            .await
84
24
    }
85

            
86
1
    async fn unsubscribe_from(&self, topic: &str) -> Result<(), bonsaidb_core::Error> {
87
1
        self.server
88
1
            .unsubscribe_from(self.id, &self.database, topic)
89
            .await
90
2
    }
91

            
92
17
    fn receiver(&self) -> &'_ flume::Receiver<Arc<Message>> {
93
17
        &self.receiver
94
17
    }
95
}
96

            
97
/// Pass-through implementation
98
#[async_trait]
99
impl<B: Backend> bonsaidb_core::connection::Connection for ServerDatabase<B> {
100
3034
    async fn get<C: Collection>(
101
3034
        &self,
102
3034
        id: u64,
103
3034
    ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error> {
104
3034
        self.db.get::<C>(id).await
105
6068
    }
106

            
107
2
    async fn get_multiple<C: Collection>(
108
2
        &self,
109
2
        ids: &[u64],
110
2
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
111
2
        self.db.get_multiple::<C>(ids).await
112
4
    }
113

            
114
4
    async fn list<C: schema::Collection, R: Into<Range<u64>> + Send>(
115
4
        &self,
116
4
        ids: R,
117
4
        order: Sort,
118
4
        limit: Option<usize>,
119
4
    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
120
4
        self.db.list::<C, R>(ids, order, limit).await
121
8
    }
122

            
123
142
    async fn query<V: SerializedView>(
124
142
        &self,
125
142
        key: Option<QueryKey<V::Key>>,
126
142
        order: Sort,
127
142
        limit: Option<usize>,
128
142
        access_policy: AccessPolicy,
129
142
    ) -> Result<Vec<Map<V::Key, V::Value>>, bonsaidb_core::Error>
130
142
    where
131
142
        Self: Sized,
132
142
    {
133
143
        self.db.query::<V>(key, order, limit, access_policy).await
134
284
    }
135

            
136
2178
    async fn query_with_docs<V: SerializedView>(
137
2178
        &self,
138
2178
        key: Option<QueryKey<V::Key>>,
139
2178
        order: Sort,
140
2178
        limit: Option<usize>,
141
2178
        access_policy: AccessPolicy,
142
2178
    ) -> Result<MappedDocuments<OwnedDocument, V>, bonsaidb_core::Error>
143
2178
    where
144
2178
        Self: Sized,
145
2178
    {
146
2178
        self.db
147
2371
            .query_with_docs::<V>(key, order, limit, access_policy)
148
2371
            .await
149
4356
    }
150

            
151
4002
    async fn reduce<V: SerializedView>(
152
4002
        &self,
153
4002
        key: Option<QueryKey<V::Key>>,
154
4002
        access_policy: AccessPolicy,
155
4002
    ) -> Result<V::Value, bonsaidb_core::Error>
156
4002
    where
157
4002
        Self: Sized,
158
4002
    {
159
4003
        self.db.reduce::<V>(key, access_policy).await
160
8004
    }
161

            
162
2
    async fn reduce_grouped<V: SerializedView>(
163
2
        &self,
164
2
        key: Option<QueryKey<V::Key>>,
165
2
        access_policy: AccessPolicy,
166
2
    ) -> Result<Vec<MappedValue<V::Key, V::Value>>, bonsaidb_core::Error>
167
2
    where
168
2
        Self: Sized,
169
2
    {
170
2
        self.db.reduce_grouped::<V>(key, access_policy).await
171
4
    }
172

            
173
1
    async fn delete_docs<V: SerializedView>(
174
1
        &self,
175
1
        key: Option<QueryKey<V::Key>>,
176
1
        access_policy: AccessPolicy,
177
1
    ) -> Result<u64, bonsaidb_core::Error>
178
1
    where
179
1
        Self: Sized,
180
1
    {
181
2
        self.db.delete_docs::<V>(key, access_policy).await
182
2
    }
183

            
184
2802
    async fn apply_transaction(
185
2802
        &self,
186
2802
        transaction: Transaction,
187
2802
    ) -> Result<Vec<bonsaidb_core::transaction::OperationResult>, bonsaidb_core::Error> {
188
2803
        self.db.apply_transaction(transaction).await
189
5603
    }
190

            
191
20
    async fn list_executed_transactions(
192
20
        &self,
193
20
        starting_id: Option<u64>,
194
20
        result_limit: Option<usize>,
195
20
    ) -> Result<Vec<bonsaidb_core::transaction::Executed>, bonsaidb_core::Error> {
196
20
        self.db
197
20
            .list_executed_transactions(starting_id, result_limit)
198
19
            .await
199
40
    }
200

            
201
1
    async fn last_transaction_id(&self) -> Result<Option<u64>, bonsaidb_core::Error> {
202
1
        self.db.last_transaction_id().await
203
2
    }
204

            
205
1
    async fn compact_collection<C: schema::Collection>(&self) -> Result<(), bonsaidb_core::Error> {
206
1
        self.db.compact_collection::<C>().await
207
2
    }
208

            
209
1
    async fn compact(&self) -> Result<(), bonsaidb_core::Error> {
210
1
        self.db.compact().await
211
2
    }
212

            
213
1
    async fn compact_key_value_store(&self) -> Result<(), bonsaidb_core::Error> {
214
1
        self.db.compact_key_value_store().await
215
2
    }
216
}
217

            
218
/// Pass-through implementation
219
#[async_trait]
220
impl<B: Backend> KeyValue for ServerDatabase<B> {
221
114
    async fn execute_key_operation(
222
114
        &self,
223
114
        op: bonsaidb_core::keyvalue::KeyOperation,
224
114
    ) -> Result<bonsaidb_core::keyvalue::Output, bonsaidb_core::Error> {
225
114
        self.db.execute_key_operation(op).await
226
228
    }
227
}