1
use std::sync::Arc;
2

            
3
use async_trait::async_trait;
4
pub use bonsaidb_core::circulate::Relay;
5
use bonsaidb_core::{
6
    circulate,
7
    pubsub::{self, database_topic, PubSub},
8
    Error,
9
};
10

            
11
#[async_trait]
12
impl PubSub for super::Database {
13
    type Subscriber = Subscriber;
14

            
15
51
    async fn create_subscriber(&self) -> Result<Self::Subscriber, bonsaidb_core::Error> {
16
        Ok(Subscriber {
17
51
            database_name: self.data.name.to_string(),
18
51
            subscriber: self.data.storage.relay().create_subscriber().await,
19
        })
20
102
    }
21

            
22
18
    async fn publish<S: Into<String> + Send, P: serde::Serialize + Sync>(
23
18
        &self,
24
18
        topic: S,
25
18
        payload: &P,
26
18
    ) -> Result<(), bonsaidb_core::Error> {
27
18
        self.data
28
18
            .storage
29
18
            .relay()
30
18
            .publish(database_topic(&self.data.name, &topic.into()), payload)
31
            .await?;
32
18
        Ok(())
33
36
    }
34

            
35
1
    async fn publish_to_all<P: serde::Serialize + Sync>(
36
1
        &self,
37
1
        topics: Vec<String>,
38
1
        payload: &P,
39
1
    ) -> Result<(), bonsaidb_core::Error> {
40
1
        self.data
41
1
            .storage
42
1
            .relay()
43
1
            .publish_to_all(
44
1
                topics
45
1
                    .iter()
46
3
                    .map(|topic| database_topic(&self.data.name, topic))
47
1
                    .collect(),
48
1
                payload,
49
1
            )
50
            .await?;
51
1
        Ok(())
52
2
    }
53
}
54

            
55
/// A subscriber for `PubSub` messages.
56
pub struct Subscriber {
57
    database_name: String,
58
    subscriber: circulate::Subscriber,
59
}
60

            
61
#[async_trait]
62
impl pubsub::Subscriber for Subscriber {
63
14
    async fn subscribe_to<S: Into<String> + Send>(&self, topic: S) -> Result<(), Error> {
64
14
        self.subscriber
65
14
            .subscribe_to(database_topic(&self.database_name, &topic.into()))
66
            .await;
67
14
        Ok(())
68
28
    }
69

            
70
1
    async fn unsubscribe_from(&self, topic: &str) -> Result<(), Error> {
71
1
        let topic = format!("{}\u{0}{}", self.database_name, topic);
72
1
        self.subscriber.unsubscribe_from(&topic).await;
73
1
        Ok(())
74
2
    }
75

            
76
237
    fn receiver(&self) -> &'_ flume::Receiver<Arc<circulate::Message>> {
77
237
        self.subscriber.receiver()
78
237
    }
79
}