1
use bonsaidb_core::arc_bytes::OwnedBytes;
2
pub use bonsaidb_core::circulate::Relay;
3
use bonsaidb_core::connection::{Connection, HasSession};
4
use bonsaidb_core::permissions::bonsai::{
5
    database_resource_name, pubsub_topic_resource_name, BonsaiAction, DatabaseAction, PubSubAction,
6
};
7
use bonsaidb_core::pubsub::{self, database_topic, PubSub, Receiver};
8
use bonsaidb_core::{circulate, Error};
9

            
10
use crate::{Database, DatabaseNonBlocking};
11

            
12
impl PubSub for super::Database {
13
    type Subscriber = Subscriber;
14

            
15
    fn create_subscriber(&self) -> Result<Self::Subscriber, bonsaidb_core::Error> {
16
1328
        self.check_permission(
17
1328
            database_resource_name(self.name()),
18
1328
            &BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::CreateSuscriber)),
19
1328
        )?;
20
1328
        Ok(self
21
1328
            .storage()
22
1328
            .instance
23
1328
            .register_subscriber(self.session().and_then(|session| session.id), self.clone()))
24
1328
    }
25

            
26
    fn publish_bytes(&self, topic: Vec<u8>, payload: Vec<u8>) -> Result<(), bonsaidb_core::Error> {
27
2380
        self.check_permission(
28
2380
            pubsub_topic_resource_name(self.name(), &topic),
29
2380
            &BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::Publish)),
30
2380
        )?;
31
2380
        self.storage
32
2380
            .instance
33
2380
            .relay()
34
2380
            .publish_raw(database_topic(&self.data.name, &topic), payload);
35
2380
        Ok(())
36
2380
    }
37

            
38
8
    fn publish_bytes_to_all(
39
8
        &self,
40
8
        topics: impl IntoIterator<Item = Vec<u8>> + Send,
41
8
        payload: Vec<u8>,
42
8
    ) -> Result<(), bonsaidb_core::Error> {
43
8
        self.storage.instance.relay().publish_raw_to_all(
44
8
            topics
45
8
                .into_iter()
46
24
                .map(|topic| {
47
24
                    self.check_permission(
48
24
                        pubsub_topic_resource_name(self.name(), &topic),
49
24
                        &BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::Publish)),
50
24
                    )
51
24
                    .map(|_| OwnedBytes::from(database_topic(&self.data.name, &topic)))
52
24
                })
53
8
                .collect::<Result<Vec<_>, _>>()?,
54
8
            payload,
55
8
        );
56
8
        Ok(())
57
8
    }
58
}
59

            
60
/// A subscriber for `PubSub` messages.
61
#[derive(Debug, Clone)]
62
pub struct Subscriber {
63
    pub(crate) id: u64,
64
    pub(crate) database: Database,
65
    pub(crate) subscriber: circulate::Subscriber,
66
    pub(crate) receiver: Receiver,
67
}
68

            
69
impl Subscriber {
70
    /// Returns the unique id of the subscriber.
71
    #[must_use]
72
2592
    pub const fn id(&self) -> u64 {
73
2592
        self.id
74
2592
    }
75
}
76

            
77
impl Drop for Subscriber {
78
824
    fn drop(&mut self) {
79
824
        self.database.storage().instance.unregister_subscriber(self);
80
824
    }
81
}
82

            
83
impl pubsub::Subscriber for Subscriber {
84
    fn subscribe_to_bytes(&self, topic: Vec<u8>) -> Result<(), Error> {
85
2068
        self.database.check_permission(
86
2068
            pubsub_topic_resource_name(self.database.name(), &topic),
87
2068
            &BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::SubscribeTo)),
88
2068
        )?;
89
2068
        self.subscriber
90
2068
            .subscribe_to_raw(database_topic(self.database.name(), &topic));
91
2068
        Ok(())
92
2068
    }
93

            
94
    fn unsubscribe_from_bytes(&self, topic: &[u8]) -> Result<(), Error> {
95
148
        self.database.check_permission(
96
148
            pubsub_topic_resource_name(self.database.name(), topic),
97
148
            &BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::UnsubscribeFrom)),
98
148
        )?;
99
148
        self.subscriber
100
148
            .unsubscribe_from_raw(&database_topic(self.database.name(), topic));
101
148
        Ok(())
102
148
    }
103

            
104
2304
    fn receiver(&self) -> &Receiver {
105
2304
        &self.receiver
106
2304
    }
107
}