1
use std::sync::Arc;
2

            
3
use async_trait::async_trait;
4
use bonsaidb_core::arc_bytes::serde::Bytes;
5
use bonsaidb_core::networking::{
6
    CreateSubscriber, Publish, PublishToAll, SubscribeTo, UnsubscribeFrom,
7
};
8
use bonsaidb_core::pubsub::{AsyncPubSub, AsyncSubscriber, Receiver};
9

            
10
use crate::AsyncClient;
11

            
12
#[async_trait]
13
impl AsyncPubSub for super::AsyncRemoteDatabase {
14
    type Subscriber = AsyncRemoteSubscriber;
15

            
16
480
    async fn create_subscriber(&self) -> Result<Self::Subscriber, bonsaidb_core::Error> {
17
480
        let subscriber_id = self
18
480
            .client
19
480
            .send_api_request(&CreateSubscriber {
20
480
                database: self.name.to_string(),
21
480
            })
22
480
            .await?;
23

            
24
480
        let (sender, receiver) = flume::unbounded();
25
480
        self.client.register_subscriber(subscriber_id, sender);
26
480
        Ok(AsyncRemoteSubscriber {
27
480
            client: self.client.clone(),
28
480
            database: self.name.clone(),
29
480
            id: subscriber_id,
30
480
            receiver: Receiver::new(receiver),
31
480
            #[cfg(not(target_arch = "wasm32"))]
32
480
            tokio: tokio::runtime::Handle::try_current().ok().map(Arc::new),
33
480
        })
34
1440
    }
35

            
36
600
    async fn publish_bytes(
37
600
        &self,
38
600
        topic: Vec<u8>,
39
600
        payload: Vec<u8>,
40
600
    ) -> Result<(), bonsaidb_core::Error> {
41
600
        self.client
42
600
            .send_api_request(&Publish {
43
600
                database: self.name.to_string(),
44
600
                topic: Bytes::from(topic),
45
600
                payload: Bytes::from(payload),
46
600
            })
47
600
            .await?;
48
600
        Ok(())
49
1800
    }
50

            
51
2
    async fn publish_bytes_to_all(
52
2
        &self,
53
2
        topics: impl IntoIterator<Item = Vec<u8>> + Send + 'async_trait,
54
2
        payload: Vec<u8>,
55
2
    ) -> Result<(), bonsaidb_core::Error> {
56
2
        let topics = topics.into_iter().map(Bytes::from).collect();
57
2
        self.client
58
2
            .send_api_request(&PublishToAll {
59
2
                database: self.name.to_string(),
60
2
                topics,
61
2
                payload: Bytes::from(payload),
62
2
            })
63
2
            .await?;
64
2
        Ok(())
65
6
    }
66
}
67

            
68
/// A `PubSub` subscriber from a remote server.
69
#[derive(Debug)]
70
pub struct AsyncRemoteSubscriber {
71
    pub(crate) client: AsyncClient,
72
    pub(crate) database: Arc<String>,
73
    pub(crate) id: u64,
74
    pub(crate) receiver: Receiver,
75
    #[cfg(not(target_arch = "wasm32"))]
76
    pub(crate) tokio: Option<Arc<tokio::runtime::Handle>>,
77
}
78

            
79
#[async_trait]
80
impl AsyncSubscriber for AsyncRemoteSubscriber {
81
780
    async fn subscribe_to_bytes(&self, topic: Vec<u8>) -> Result<(), bonsaidb_core::Error> {
82
780
        self.client
83
780
            .send_api_request(&SubscribeTo {
84
780
                database: self.database.to_string(),
85
780
                subscriber_id: self.id,
86
780
                topic: Bytes::from(topic),
87
780
            })
88
780
            .await?;
89
780
        Ok(())
90
2340
    }
91

            
92
60
    async fn unsubscribe_from_bytes(&self, topic: &[u8]) -> Result<(), bonsaidb_core::Error> {
93
60
        self.client
94
60
            .send_api_request(&UnsubscribeFrom {
95
60
                database: self.database.to_string(),
96
60
                subscriber_id: self.id,
97
60
                topic: Bytes::from(topic),
98
60
            })
99
60
            .await?;
100
60
        Ok(())
101
180
    }
102

            
103
1620
    fn receiver(&self) -> &Receiver {
104
1620
        &self.receiver
105
1620
    }
106
}
107

            
108
#[cfg(target_arch = "wasm32")]
109
impl Drop for AsyncRemoteSubscriber {
110
    fn drop(&mut self) {
111
        let client = self.client.clone();
112
        let database = self.database.to_string();
113
        let subscriber_id = self.id;
114
        let drop_future = async move {
115
            client
116
                .unregister_subscriber_async(database, subscriber_id)
117
                .await;
118
        };
119
        wasm_bindgen_futures::spawn_local(drop_future);
120
    }
121
}
122

            
123
#[cfg(not(target_arch = "wasm32"))]
124
impl Drop for AsyncRemoteSubscriber {
125
720
    fn drop(&mut self) {
126
720
        if let Some(tokio) = &self.tokio {
127
480
            let client = self.client.clone();
128
480
            let database = self.database.to_string();
129
480
            let subscriber_id = self.id;
130
480
            tokio.spawn(async move {
131
60
                client
132
60
                    .unregister_subscriber_async(database, subscriber_id)
133
60
                    .await;
134
480
            });
135
480
        } else {
136
240
            self.client
137
240
                .unregister_subscriber(self.database.to_string(), self.id);
138
240
        }
139
720
    }
140
}