1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
pub use bonsaidb_core::circulate::Relay;
use bonsaidb_core::{
    arc_bytes::OwnedBytes,
    circulate,
    connection::{Connection, HasSession},
    permissions::bonsai::{
        database_resource_name, pubsub_topic_resource_name, BonsaiAction, DatabaseAction,
        PubSubAction,
    },
    pubsub::{self, database_topic, PubSub, Receiver},
    Error,
};

use crate::{Database, DatabaseNonBlocking};

impl PubSub for super::Database {
    type Subscriber = Subscriber;

    fn create_subscriber(&self) -> Result<Self::Subscriber, bonsaidb_core::Error> {
        self.check_permission(
            database_resource_name(self.name()),
            &BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::CreateSuscriber)),
        )?;
        Ok(self
            .storage()
            .instance
            .register_subscriber(self.session().and_then(|session| session.id), self.clone()))
    }

    fn publish_bytes(&self, topic: Vec<u8>, payload: Vec<u8>) -> Result<(), bonsaidb_core::Error> {
        self.check_permission(
            pubsub_topic_resource_name(self.name(), &topic),
            &BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::Publish)),
        )?;
        self.storage
            .instance
            .relay()
            .publish_raw(database_topic(&self.data.name, &topic), payload);
        Ok(())
    }

    fn publish_bytes_to_all(
        &self,
        topics: impl IntoIterator<Item = Vec<u8>> + Send,
        payload: Vec<u8>,
    ) -> Result<(), bonsaidb_core::Error> {
        self.storage.instance.relay().publish_raw_to_all(
            topics
                .into_iter()
                .map(|topic| {
                    self.check_permission(
                        pubsub_topic_resource_name(self.name(), &topic),
                        &BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::Publish)),
                    )
                    .map(|_| OwnedBytes::from(database_topic(&self.data.name, &topic)))
                })
                .collect::<Result<Vec<_>, _>>()?,
            payload,
        );
        Ok(())
    }
}

/// A subscriber for `PubSub` messages.
#[derive(Debug, Clone)]
pub struct Subscriber {
    pub(crate) id: u64,
    pub(crate) database: Database,
    pub(crate) subscriber: circulate::Subscriber,
    pub(crate) receiver: Receiver,
}

impl Subscriber {
    /// Returns the unique id of the subscriber.
    #[must_use]
    pub const fn id(&self) -> u64 {
        self.id
    }
}

impl Drop for Subscriber {
    fn drop(&mut self) {
        self.database.storage().instance.unregister_subscriber(self);
    }
}

impl pubsub::Subscriber for Subscriber {
    fn subscribe_to_bytes(&self, topic: Vec<u8>) -> Result<(), Error> {
        self.database.check_permission(
            pubsub_topic_resource_name(self.database.name(), &topic),
            &BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::SubscribeTo)),
        )?;
        self.subscriber
            .subscribe_to_raw(database_topic(self.database.name(), &topic));
        Ok(())
    }

    fn unsubscribe_from_bytes(&self, topic: &[u8]) -> Result<(), Error> {
        self.database.check_permission(
            pubsub_topic_resource_name(self.database.name(), topic),
            &BonsaiAction::Database(DatabaseAction::PubSub(PubSubAction::UnsubscribeFrom)),
        )?;
        self.subscriber
            .unsubscribe_from_raw(&database_topic(self.database.name(), topic));
        Ok(())
    }

    fn receiver(&self) -> &Receiver {
        &self.receiver
    }
}