1
use std::sync::Arc;
2

            
3
use async_trait::async_trait;
4
use bonsaidb_core::{
5
    arc_bytes::serde::Bytes,
6
    circulate::Message,
7
    custom_api::CustomApi,
8
    networking::{DatabaseRequest, DatabaseResponse, Request, Response},
9
    pubsub::{PubSub, Subscriber},
10
};
11
use serde::Serialize;
12

            
13
use crate::Client;
14

            
15
#[async_trait]
16
impl<A> PubSub for super::RemoteDatabase<A>
17
where
18
    A: CustomApi,
19
{
20
    type Subscriber = RemoteSubscriber<A>;
21

            
22
14
    async fn create_subscriber(&self) -> Result<Self::Subscriber, bonsaidb_core::Error> {
23
14
        match self
24
14
            .client
25
14
            .send_request(Request::Database {
26
14
                database: self.name.to_string(),
27
14
                request: DatabaseRequest::CreateSubscriber,
28
14
            })
29
14
            .await?
30
        {
31
14
            Response::Database(DatabaseResponse::SubscriberCreated { subscriber_id }) => {
32
14
                let (sender, receiver) = flume::unbounded();
33
14
                self.client.register_subscriber(subscriber_id, sender).await;
34
14
                Ok(RemoteSubscriber {
35
14
                    client: self.client.clone(),
36
14
                    database: self.name.clone(),
37
14
                    id: subscriber_id,
38
14
                    receiver,
39
14
                })
40
            }
41
            Response::Error(err) => Err(err),
42
            other => Err(bonsaidb_core::Error::Networking(
43
                bonsaidb_core::networking::Error::UnexpectedResponse(format!("{:?}", other)),
44
            )),
45
        }
46
28
    }
47

            
48
16
    async fn publish<S: Into<String> + Send, P: Serialize + Sync>(
49
16
        &self,
50
16
        topic: S,
51
16
        payload: &P,
52
16
    ) -> Result<(), bonsaidb_core::Error> {
53
16
        let payload = pot::to_vec(&payload)?;
54
16
        match self
55
16
            .client
56
16
            .send_request(Request::Database {
57
16
                database: self.name.to_string(),
58
16
                request: DatabaseRequest::Publish {
59
16
                    topic: topic.into(),
60
16
                    payload: Bytes::from(payload),
61
16
                },
62
16
            })
63
16
            .await?
64
        {
65
16
            Response::Ok => Ok(()),
66
            Response::Error(err) => Err(err),
67
            other => Err(bonsaidb_core::Error::Networking(
68
                bonsaidb_core::networking::Error::UnexpectedResponse(format!("{:?}", other)),
69
            )),
70
        }
71
32
    }
72

            
73
2
    async fn publish_to_all<P: Serialize + Sync>(
74
2
        &self,
75
2
        topics: Vec<String>,
76
2
        payload: &P,
77
2
    ) -> Result<(), bonsaidb_core::Error> {
78
2
        let payload = pot::to_vec(&payload)?;
79
2
        match self
80
2
            .client
81
2
            .send_request(Request::Database {
82
2
                database: self.name.to_string(),
83
2
                request: DatabaseRequest::PublishToAll {
84
2
                    topics,
85
2
                    payload: Bytes::from(payload),
86
2
                },
87
2
            })
88
2
            .await?
89
        {
90
2
            Response::Ok => Ok(()),
91
            Response::Error(err) => Err(err),
92
            other => Err(bonsaidb_core::Error::Networking(
93
                bonsaidb_core::networking::Error::UnexpectedResponse(format!("{:?}", other)),
94
            )),
95
        }
96
4
    }
97
}
98

            
99
/// A `PubSub` subscriber from a remote server.
100
#[derive(Debug)]
101
pub struct RemoteSubscriber<A: CustomApi> {
102
    client: Client<A>,
103
    database: Arc<String>,
104
    id: u64,
105
    receiver: flume::Receiver<Arc<Message>>,
106
}
107

            
108
#[async_trait]
109
impl<A: CustomApi> Subscriber for RemoteSubscriber<A> {
110
24
    async fn subscribe_to<S: Into<String> + Send>(
111
24
        &self,
112
24
        topic: S,
113
24
    ) -> Result<(), bonsaidb_core::Error> {
114
24
        match self
115
24
            .client
116
24
            .send_request(Request::Database {
117
24
                database: self.database.to_string(),
118
24
                request: DatabaseRequest::SubscribeTo {
119
24
                    subscriber_id: self.id,
120
24
                    topic: topic.into(),
121
24
                },
122
24
            })
123
24
            .await?
124
        {
125
24
            Response::Ok => Ok(()),
126
            Response::Error(err) => Err(err),
127
            other => Err(bonsaidb_core::Error::Networking(
128
                bonsaidb_core::networking::Error::UnexpectedResponse(format!("{:?}", other)),
129
            )),
130
        }
131
48
    }
132

            
133
2
    async fn unsubscribe_from(&self, topic: &str) -> Result<(), bonsaidb_core::Error> {
134
2
        match self
135
2
            .client
136
2
            .send_request(Request::Database {
137
2
                database: self.database.to_string(),
138
2
                request: DatabaseRequest::UnsubscribeFrom {
139
2
                    subscriber_id: self.id,
140
2
                    topic: topic.to_string(),
141
2
                },
142
2
            })
143
2
            .await?
144
        {
145
2
            Response::Ok => Ok(()),
146
            Response::Error(err) => Err(err),
147
            other => Err(bonsaidb_core::Error::Networking(
148
                bonsaidb_core::networking::Error::UnexpectedResponse(format!("{:?}", other)),
149
            )),
150
        }
151
4
    }
152

            
153
34
    fn receiver(&self) -> &'_ flume::Receiver<Arc<bonsaidb_core::circulate::Message>> {
154
34
        &self.receiver
155
34
    }
156
}
157

            
158
impl<A: CustomApi> Drop for RemoteSubscriber<A> {
159
14
    fn drop(&mut self) {
160
14
        let client = self.client.clone();
161
14
        let database = self.database.to_string();
162
14
        let subscriber_id = self.id;
163
14
        let drop_future = async move {
164
            client.unregister_subscriber(database, subscriber_id).await;
165
14
        };
166
14
        #[cfg(target_arch = "wasm32")]
167
14
        wasm_bindgen_futures::spawn_local(drop_future);
168
14
        #[cfg(not(target_arch = "wasm32"))]
169
14
        tokio::spawn(drop_future);
170
14
    }
171
}