1
use bonsaidb_core::arc_bytes::OwnedBytes;
2
use bonsaidb_core::connection::{Connection, HasSession};
3
use bonsaidb_core::permissions::bonsai::{
4
    database_resource_name, pubsub_topic_resource_name, BonsaiAction, DatabaseAction, PubSubAction,
5
};
6
use bonsaidb_core::pubsub::{self, database_topic, PubSub, Receiver};
7
use bonsaidb_core::{circulate, Error};
8

            
9
use crate::{Database, DatabaseNonBlocking};
10

            
11
impl PubSub for super::Database {
12
    type Subscriber = Subscriber;
13

            
14
1328
    fn create_subscriber(&self) -> Result<Self::Subscriber, bonsaidb_core::Error> {
15
1328
        self.check_permission(
16
1328
            database_resource_name(self.name()),
17
1328
            &BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::CreateSuscriber)),
18
1328
        )?;
19
1328
        Ok(self
20
1328
            .storage()
21
1328
            .instance
22
1328
            .register_subscriber(self.session().and_then(|session| session.id), self.clone()))
23
1328
    }
24

            
25
2236
    fn publish_bytes(&self, topic: Vec<u8>, payload: Vec<u8>) -> Result<(), bonsaidb_core::Error> {
26
2236
        self.check_permission(
27
2236
            pubsub_topic_resource_name(self.name(), &topic),
28
2236
            &BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::Publish)),
29
2236
        )?;
30
2236
        self.storage
31
2236
            .instance
32
2236
            .relay()
33
2236
            .publish_raw(database_topic(&self.data.name, &topic), payload);
34
2236
        Ok(())
35
2236
    }
36

            
37
8
    fn publish_bytes_to_all(
38
8
        &self,
39
8
        topics: impl IntoIterator<Item = Vec<u8>> + Send,
40
8
        payload: Vec<u8>,
41
8
    ) -> Result<(), bonsaidb_core::Error> {
42
8
        self.storage.instance.relay().publish_raw_to_all(
43
8
            topics
44
8
                .into_iter()
45
24
                .map(|topic| {
46
24
                    self.check_permission(
47
24
                        pubsub_topic_resource_name(self.name(), &topic),
48
24
                        &BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::Publish)),
49
24
                    )
50
24
                    .map(|()| OwnedBytes::from(database_topic(&self.data.name, &topic)))
51
24
                })
52
8
                .collect::<Result<Vec<_>, _>>()?,
53
8
            payload,
54
8
        );
55
8
        Ok(())
56
8
    }
57
}
58

            
59
/// A subscriber for `PubSub` messages.
60
#[derive(Debug, Clone)]
61
pub struct Subscriber {
62
    pub(crate) id: u64,
63
    pub(crate) database: Database,
64
    pub(crate) subscriber: circulate::Subscriber,
65
    pub(crate) receiver: Receiver,
66
}
67

            
68
impl Subscriber {
69
    /// Returns the unique id of the subscriber.
70
    #[must_use]
71
2592
    pub const fn id(&self) -> u64 {
72
2592
        self.id
73
2592
    }
74
}
75

            
76
impl Drop for Subscriber {
77
824
    fn drop(&mut self) {
78
824
        self.database.storage().instance.unregister_subscriber(self);
79
824
    }
80
}
81

            
82
impl pubsub::Subscriber for Subscriber {
83
2068
    fn subscribe_to_bytes(&self, topic: Vec<u8>) -> Result<(), Error> {
84
2068
        self.database.check_permission(
85
2068
            pubsub_topic_resource_name(self.database.name(), &topic),
86
2068
            &BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::SubscribeTo)),
87
2068
        )?;
88
2068
        self.subscriber
89
2068
            .subscribe_to_raw(database_topic(self.database.name(), &topic));
90
2068
        Ok(())
91
2068
    }
92

            
93
148
    fn unsubscribe_from_bytes(&self, topic: &[u8]) -> Result<(), Error> {
94
148
        self.database.check_permission(
95
148
            pubsub_topic_resource_name(self.database.name(), topic),
96
148
            &BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::UnsubscribeFrom)),
97
148
        )?;
98
148
        self.subscriber
99
148
            .unsubscribe_from_raw(&database_topic(self.database.name(), topic));
100
148
        Ok(())
101
148
    }
102

            
103
2304
    fn receiver(&self) -> &Receiver {
104
2304
        &self.receiver
105
2304
    }
106
}