1
use std::sync::Arc;
2

            
3
use async_trait::async_trait;
4
use bonsaidb_core::arc_bytes::serde::Bytes;
5
use bonsaidb_core::networking::{
6
    CreateSubscriber, Publish, PublishToAll, SubscribeTo, UnsubscribeFrom,
7
};
8
use bonsaidb_core::pubsub::{AsyncPubSub, AsyncSubscriber, Receiver};
9

            
10
use crate::AsyncClient;
11

            
12
#[async_trait]
13
impl AsyncPubSub for super::AsyncRemoteDatabase {
14
    type Subscriber = AsyncRemoteSubscriber;
15

            
16
288
    async fn create_subscriber(&self) -> Result<Self::Subscriber, bonsaidb_core::Error> {
17
288
        let subscriber_id = self
18
288
            .client
19
288
            .send_api_request(&CreateSubscriber {
20
288
                database: self.name.to_string(),
21
288
            })
22
288
            .await?;
23

            
24
288
        let (sender, receiver) = flume::unbounded();
25
288
        self.client.register_subscriber(subscriber_id, sender);
26
288
        Ok(AsyncRemoteSubscriber {
27
288
            client: self.client.clone(),
28
288
            database: self.name.clone(),
29
288
            id: subscriber_id,
30
288
            receiver: Receiver::new(receiver),
31
288
            #[cfg(not(target_arch = "wasm32"))]
32
288
            tokio: tokio::runtime::Handle::try_current().ok().map(Arc::new),
33
288
        })
34
576
    }
35

            
36
360
    async fn publish_bytes(
37
360
        &self,
38
360
        topic: Vec<u8>,
39
360
        payload: Vec<u8>,
40
360
    ) -> Result<(), bonsaidb_core::Error> {
41
360
        self.client
42
360
            .send_api_request(&Publish {
43
360
                database: self.name.to_string(),
44
360
                topic: Bytes::from(topic),
45
360
                payload: Bytes::from(payload),
46
360
            })
47
360
            .await?;
48
360
        Ok(())
49
720
    }
50

            
51
2
    async fn publish_bytes_to_all(
52
2
        &self,
53
2
        topics: impl IntoIterator<Item = Vec<u8>> + Send + 'async_trait,
54
2
        payload: Vec<u8>,
55
2
    ) -> Result<(), bonsaidb_core::Error> {
56
2
        let topics = topics.into_iter().map(Bytes::from).collect();
57
2
        self.client
58
2
            .send_api_request(&PublishToAll {
59
2
                database: self.name.to_string(),
60
2
                topics,
61
2
                payload: Bytes::from(payload),
62
2
            })
63
2
            .await?;
64
2
        Ok(())
65
4
    }
66
}
67

            
68
/// A `PubSub` subscriber from a remote server.
69
#[derive(Debug)]
70
pub struct AsyncRemoteSubscriber {
71
    pub(crate) client: AsyncClient,
72
    pub(crate) database: Arc<String>,
73
    pub(crate) id: u64,
74
    pub(crate) receiver: Receiver,
75
    #[cfg(not(target_arch = "wasm32"))]
76
    pub(crate) tokio: Option<Arc<tokio::runtime::Handle>>,
77
}
78

            
79
#[async_trait]
80
impl AsyncSubscriber for AsyncRemoteSubscriber {
81
468
    async fn subscribe_to_bytes(&self, topic: Vec<u8>) -> Result<(), bonsaidb_core::Error> {
82
468
        self.client
83
468
            .send_api_request(&SubscribeTo {
84
468
                database: self.database.to_string(),
85
468
                subscriber_id: self.id,
86
468
                topic: Bytes::from(topic),
87
468
            })
88
468
            .await?;
89
468
        Ok(())
90
936
    }
91

            
92
36
    async fn unsubscribe_from_bytes(&self, topic: &[u8]) -> Result<(), bonsaidb_core::Error> {
93
36
        self.client
94
36
            .send_api_request(&UnsubscribeFrom {
95
36
                database: self.database.to_string(),
96
36
                subscriber_id: self.id,
97
36
                topic: Bytes::from(topic),
98
36
            })
99
36
            .await?;
100
36
        Ok(())
101
72
    }
102

            
103
972
    fn receiver(&self) -> &Receiver {
104
972
        &self.receiver
105
972
    }
106
}
107

            
108
#[cfg(target_arch = "wasm32")]
109
impl Drop for AsyncRemoteSubscriber {
110
    fn drop(&mut self) {
111
        let client = self.client.clone();
112
        let database = self.database.to_string();
113
        let subscriber_id = self.id;
114
        let drop_future = async move {
115
            client
116
                .unregister_subscriber_async(database, subscriber_id)
117
                .await;
118
        };
119
        wasm_bindgen_futures::spawn_local(drop_future);
120
    }
121
}
122

            
123
#[cfg(not(target_arch = "wasm32"))]
124
impl Drop for AsyncRemoteSubscriber {
125
    fn drop(&mut self) {
126
432
        if let Some(tokio) = &self.tokio {
127
288
            let client = self.client.clone();
128
288
            let database = self.database.to_string();
129
288
            let subscriber_id = self.id;
130
288
            tokio.spawn(async move {
131
36
                client
132
36
                    .unregister_subscriber_async(database, subscriber_id)
133
36
                    .await;
134
288
            });
135
288
        } else {
136
144
            self.client
137
144
                .unregister_subscriber(self.database.to_string(), self.id);
138
144
        }
139
432
    }
140
}