1
use std::sync::Arc;
2

            
3
use async_trait::async_trait;
4
pub use bonsaidb_core::circulate::Relay;
5
use bonsaidb_core::{
6
    circulate,
7
    pubsub::{self, database_topic, PubSub},
8
    Error,
9
};
10

            
11
#[async_trait]
12
impl PubSub for super::Database {
13
    type Subscriber = Subscriber;
14

            
15
64
    async fn create_subscriber(&self) -> Result<Self::Subscriber, bonsaidb_core::Error> {
16
        Ok(Subscriber {
17
64
            database_name: self.data.name.to_string(),
18
64
            subscriber: self.data.storage.relay().create_subscriber().await,
19
        })
20
128
    }
21

            
22
26
    async fn publish<S: Into<String> + Send, P: serde::Serialize + Sync>(
23
26
        &self,
24
26
        topic: S,
25
26
        payload: &P,
26
26
    ) -> Result<(), bonsaidb_core::Error> {
27
26
        self.data
28
26
            .storage
29
26
            .relay()
30
26
            .publish(database_topic(&self.data.name, &topic.into()), payload)
31
1
            .await?;
32
26
        Ok(())
33
52
    }
34

            
35
2
    async fn publish_to_all<P: serde::Serialize + Sync>(
36
2
        &self,
37
2
        topics: Vec<String>,
38
2
        payload: &P,
39
2
    ) -> Result<(), bonsaidb_core::Error> {
40
2
        self.data
41
2
            .storage
42
2
            .relay()
43
2
            .publish_to_all(
44
2
                topics
45
2
                    .iter()
46
6
                    .map(|topic| database_topic(&self.data.name, topic))
47
2
                    .collect(),
48
2
                payload,
49
2
            )
50
            .await?;
51
2
        Ok(())
52
4
    }
53
}
54

            
55
/// A subscriber for `PubSub` messages.
56
pub struct Subscriber {
57
    database_name: String,
58
    subscriber: circulate::Subscriber,
59
}
60

            
61
#[async_trait]
62
impl pubsub::Subscriber for Subscriber {
63
26
    async fn subscribe_to<S: Into<String> + Send>(&self, topic: S) -> Result<(), Error> {
64
26
        self.subscriber
65
26
            .subscribe_to(database_topic(&self.database_name, &topic.into()))
66
            .await;
67
26
        Ok(())
68
52
    }
69

            
70
2
    async fn unsubscribe_from(&self, topic: &str) -> Result<(), Error> {
71
2
        let topic = format!("{}\u{0}{}", self.database_name, topic);
72
2
        self.subscriber.unsubscribe_from(&topic).await;
73
2
        Ok(())
74
4
    }
75

            
76
284
    fn receiver(&self) -> &'_ flume::Receiver<Arc<circulate::Message>> {
77
284
        self.subscriber.receiver()
78
284
    }
79
}