1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
use bonsaidb_core::arc_bytes::OwnedBytes;
use bonsaidb_core::connection::{Connection, HasSession};
use bonsaidb_core::permissions::bonsai::{
    database_resource_name, pubsub_topic_resource_name, BonsaiAction, DatabaseAction, PubSubAction,
};
use bonsaidb_core::pubsub::{self, database_topic, PubSub, Receiver};
use bonsaidb_core::{circulate, Error};

use crate::{Database, DatabaseNonBlocking};

impl PubSub for super::Database {
    type Subscriber = Subscriber;

    fn create_subscriber(&self) -> Result<Self::Subscriber, bonsaidb_core::Error> {
        self.check_permission(
            database_resource_name(self.name()),
            &BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::CreateSuscriber)),
        )?;
        Ok(self
            .storage()
            .instance
            .register_subscriber(self.session().and_then(|session| session.id), self.clone()))
    }

    fn publish_bytes(&self, topic: Vec<u8>, payload: Vec<u8>) -> Result<(), bonsaidb_core::Error> {
        self.check_permission(
            pubsub_topic_resource_name(self.name(), &topic),
            &BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::Publish)),
        )?;
        self.storage
            .instance
            .relay()
            .publish_raw(database_topic(&self.data.name, &topic), payload);
        Ok(())
    }

    fn publish_bytes_to_all(
        &self,
        topics: impl IntoIterator<Item = Vec<u8>> + Send,
        payload: Vec<u8>,
    ) -> Result<(), bonsaidb_core::Error> {
        self.storage.instance.relay().publish_raw_to_all(
            topics
                .into_iter()
                .map(|topic| {
                    self.check_permission(
                        pubsub_topic_resource_name(self.name(), &topic),
                        &BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::Publish)),
                    )
                    .map(|()| OwnedBytes::from(database_topic(&self.data.name, &topic)))
                })
                .collect::<Result<Vec<_>, _>>()?,
            payload,
        );
        Ok(())
    }
}

/// A subscriber for `PubSub` messages.
#[derive(Debug, Clone)]
pub struct Subscriber {
    pub(crate) id: u64,
    pub(crate) database: Database,
    pub(crate) subscriber: circulate::Subscriber,
    pub(crate) receiver: Receiver,
}

impl Subscriber {
    /// Returns the unique id of the subscriber.
    #[must_use]
    pub const fn id(&self) -> u64 {
        self.id
    }
}

impl Drop for Subscriber {
    fn drop(&mut self) {
        self.database.storage().instance.unregister_subscriber(self);
    }
}

impl pubsub::Subscriber for Subscriber {
    fn subscribe_to_bytes(&self, topic: Vec<u8>) -> Result<(), Error> {
        self.database.check_permission(
            pubsub_topic_resource_name(self.database.name(), &topic),
            &BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::SubscribeTo)),
        )?;
        self.subscriber
            .subscribe_to_raw(database_topic(self.database.name(), &topic));
        Ok(())
    }

    fn unsubscribe_from_bytes(&self, topic: &[u8]) -> Result<(), Error> {
        self.database.check_permission(
            pubsub_topic_resource_name(self.database.name(), topic),
            &BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::UnsubscribeFrom)),
        )?;
        self.subscriber
            .unsubscribe_from_raw(&database_topic(self.database.name(), topic));
        Ok(())
    }

    fn receiver(&self) -> &Receiver {
        &self.receiver
    }
}