1
use std::sync::Arc;
2

            
3
use async_trait::async_trait;
4
use bonsaidb_core::arc_bytes::serde::Bytes;
5
use bonsaidb_core::networking::{
6
    CreateSubscriber, Publish, PublishToAll, SubscribeTo, UnsubscribeFrom,
7
};
8
use bonsaidb_core::pubsub::{AsyncPubSub, AsyncSubscriber, Receiver};
9

            
10
use crate::AsyncClient;
11

            
12
#[async_trait]
13
impl AsyncPubSub for super::AsyncRemoteDatabase {
14
    type Subscriber = AsyncRemoteSubscriber;
15

            
16
272
    async fn create_subscriber(&self) -> Result<Self::Subscriber, bonsaidb_core::Error> {
17
272
        let subscriber_id = self
18
272
            .client
19
272
            .send_api_request(&CreateSubscriber {
20
272
                database: self.name.to_string(),
21
272
            })
22
272
            .await?;
23

            
24
272
        let (sender, receiver) = flume::unbounded();
25
272
        self.client.register_subscriber(subscriber_id, sender);
26
272
        Ok(AsyncRemoteSubscriber {
27
272
            client: self.client.clone(),
28
272
            database: self.name.clone(),
29
272
            id: subscriber_id,
30
272
            receiver: Receiver::new(receiver),
31
272
            #[cfg(not(target_arch = "wasm32"))]
32
272
            tokio: tokio::runtime::Handle::try_current().ok().map(Arc::new),
33
272
        })
34
544
    }
35

            
36
340
    async fn publish_bytes(
37
340
        &self,
38
340
        topic: Vec<u8>,
39
340
        payload: Vec<u8>,
40
340
    ) -> Result<(), bonsaidb_core::Error> {
41
340
        self.client
42
340
            .send_api_request(&Publish {
43
340
                database: self.name.to_string(),
44
340
                topic: Bytes::from(topic),
45
340
                payload: Bytes::from(payload),
46
340
            })
47
340
            .await?;
48
340
        Ok(())
49
680
    }
50

            
51
2
    async fn publish_bytes_to_all(
52
2
        &self,
53
2
        topics: impl IntoIterator<Item = Vec<u8>> + Send + 'async_trait,
54
2
        payload: Vec<u8>,
55
2
    ) -> Result<(), bonsaidb_core::Error> {
56
2
        let topics = topics.into_iter().map(Bytes::from).collect();
57
2
        self.client
58
2
            .send_api_request(&PublishToAll {
59
2
                database: self.name.to_string(),
60
2
                topics,
61
2
                payload: Bytes::from(payload),
62
2
            })
63
2
            .await?;
64
2
        Ok(())
65
4
    }
66
}
67

            
68
/// A `PubSub` subscriber from a remote server.
69
#[derive(Debug)]
70
pub struct AsyncRemoteSubscriber {
71
    pub(crate) client: AsyncClient,
72
    pub(crate) database: Arc<String>,
73
    pub(crate) id: u64,
74
    pub(crate) receiver: Receiver,
75
    #[cfg(not(target_arch = "wasm32"))]
76
    pub(crate) tokio: Option<Arc<tokio::runtime::Handle>>,
77
}
78

            
79
#[async_trait]
80
impl AsyncSubscriber for AsyncRemoteSubscriber {
81
442
    async fn subscribe_to_bytes(&self, topic: Vec<u8>) -> Result<(), bonsaidb_core::Error> {
82
442
        self.client
83
442
            .send_api_request(&SubscribeTo {
84
442
                database: self.database.to_string(),
85
442
                subscriber_id: self.id,
86
442
                topic: Bytes::from(topic),
87
442
            })
88
442
            .await?;
89
442
        Ok(())
90
884
    }
91

            
92
34
    async fn unsubscribe_from_bytes(&self, topic: &[u8]) -> Result<(), bonsaidb_core::Error> {
93
34
        self.client
94
34
            .send_api_request(&UnsubscribeFrom {
95
34
                database: self.database.to_string(),
96
34
                subscriber_id: self.id,
97
34
                topic: Bytes::from(topic),
98
34
            })
99
34
            .await?;
100
34
        Ok(())
101
68
    }
102

            
103
918
    fn receiver(&self) -> &Receiver {
104
918
        &self.receiver
105
918
    }
106
}
107

            
108
#[cfg(target_arch = "wasm32")]
109
impl Drop for AsyncRemoteSubscriber {
110
    fn drop(&mut self) {
111
        let client = self.client.clone();
112
        let database = self.database.to_string();
113
        let subscriber_id = self.id;
114
        let drop_future = async move {
115
            client
116
                .unregister_subscriber_async(database, subscriber_id)
117
                .await;
118
        };
119
        wasm_bindgen_futures::spawn_local(drop_future);
120
    }
121
}
122

            
123
#[cfg(not(target_arch = "wasm32"))]
124
impl Drop for AsyncRemoteSubscriber {
125
    fn drop(&mut self) {
126
408
        if let Some(tokio) = &self.tokio {
127
272
            let client = self.client.clone();
128
272
            let database = self.database.to_string();
129
272
            let subscriber_id = self.id;
130
272
            tokio.spawn(async move {
131
34
                client
132
34
                    .unregister_subscriber_async(database, subscriber_id)
133
34
                    .await;
134
272
            });
135
272
        } else {
136
136
            self.client
137
136
                .unregister_subscriber(self.database.to_string(), self.id);
138
136
        }
139
408
    }
140
}