1
pub use bonsaidb_core::circulate::Relay;
2
use bonsaidb_core::{
3
    arc_bytes::OwnedBytes,
4
    circulate,
5
    connection::{Connection, HasSession},
6
    permissions::bonsai::{
7
        database_resource_name, pubsub_topic_resource_name, BonsaiAction, DatabaseAction,
8
        PubSubAction,
9
    },
10
    pubsub::{self, database_topic, PubSub, Receiver},
11
    Error,
12
};
13

            
14
use crate::{Database, DatabaseNonBlocking};
15

            
16
impl PubSub for super::Database {
17
    type Subscriber = Subscriber;
18

            
19
    fn create_subscriber(&self) -> Result<Self::Subscriber, bonsaidb_core::Error> {
20
1112
        self.check_permission(
21
1112
            database_resource_name(self.name()),
22
1112
            &BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::CreateSuscriber)),
23
1112
        )?;
24
1112
        Ok(self
25
1112
            .storage()
26
1112
            .instance
27
1112
            .register_subscriber(self.session().and_then(|session| session.id), self.clone()))
28
1112
    }
29

            
30
    fn publish_bytes(&self, topic: Vec<u8>, payload: Vec<u8>) -> Result<(), bonsaidb_core::Error> {
31
1870
        self.check_permission(
32
1870
            pubsub_topic_resource_name(self.name(), &topic),
33
1870
            &BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::Publish)),
34
1870
        )?;
35
1870
        self.storage
36
1870
            .instance
37
1870
            .relay()
38
1870
            .publish_raw(database_topic(&self.data.name, &topic), payload);
39
1870
        Ok(())
40
1870
    }
41

            
42
8
    fn publish_bytes_to_all(
43
8
        &self,
44
8
        topics: impl IntoIterator<Item = Vec<u8>> + Send,
45
8
        payload: Vec<u8>,
46
8
    ) -> Result<(), bonsaidb_core::Error> {
47
8
        self.storage.instance.relay().publish_raw_to_all(
48
8
            topics
49
8
                .into_iter()
50
24
                .map(|topic| {
51
24
                    self.check_permission(
52
24
                        pubsub_topic_resource_name(self.name(), &topic),
53
24
                        &BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::Publish)),
54
24
                    )
55
24
                    .map(|_| OwnedBytes::from(database_topic(&self.data.name, &topic)))
56
24
                })
57
8
                .collect::<Result<Vec<_>, _>>()?,
58
8
            payload,
59
8
        );
60
8
        Ok(())
61
8
    }
62
}
63

            
64
/// A subscriber for `PubSub` messages.
65
#[derive(Debug, Clone)]
66
pub struct Subscriber {
67
    pub(crate) id: u64,
68
    pub(crate) database: Database,
69
    pub(crate) subscriber: circulate::Subscriber,
70
    pub(crate) receiver: Receiver,
71
}
72

            
73
impl Subscriber {
74
    /// Returns the unique id of the subscriber.
75
    #[must_use]
76
2160
    pub const fn id(&self) -> u64 {
77
2160
        self.id
78
2160
    }
79
}
80

            
81
impl Drop for Subscriber {
82
692
    fn drop(&mut self) {
83
692
        self.database.storage().instance.unregister_subscriber(self);
84
692
    }
85
}
86

            
87
impl pubsub::Subscriber for Subscriber {
88
    fn subscribe_to_bytes(&self, topic: Vec<u8>) -> Result<(), Error> {
89
1732
        self.database.check_permission(
90
1732
            pubsub_topic_resource_name(self.database.name(), &topic),
91
1732
            &BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::SubscribeTo)),
92
1732
        )?;
93
1732
        self.subscriber
94
1732
            .subscribe_to_raw(database_topic(self.database.name(), &topic));
95
1732
        Ok(())
96
1732
    }
97

            
98
    fn unsubscribe_from_bytes(&self, topic: &[u8]) -> Result<(), Error> {
99
124
        self.database.check_permission(
100
124
            pubsub_topic_resource_name(self.database.name(), topic),
101
124
            &BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::UnsubscribeFrom)),
102
124
        )?;
103
124
        self.subscriber
104
124
            .unsubscribe_from_raw(&database_topic(self.database.name(), topic));
105
124
        Ok(())
106
124
    }
107

            
108
1932
    fn receiver(&self) -> &Receiver {
109
1932
        &self.receiver
110
1932
    }
111
}