use std::sync::Arc;
use async_trait::async_trait;
use bonsaidb_core::arc_bytes::serde::Bytes;
use bonsaidb_core::networking::{
CreateSubscriber, Publish, PublishToAll, SubscribeTo, UnsubscribeFrom,
};
use bonsaidb_core::pubsub::{AsyncPubSub, AsyncSubscriber, Receiver};
use crate::AsyncClient;
#[async_trait]
impl AsyncPubSub for super::AsyncRemoteDatabase {
type Subscriber = AsyncRemoteSubscriber;
async fn create_subscriber(&self) -> Result<Self::Subscriber, bonsaidb_core::Error> {
let subscriber_id = self
.client
.send_api_request(&CreateSubscriber {
database: self.name.to_string(),
})
.await?;
let (sender, receiver) = flume::unbounded();
self.client.register_subscriber(subscriber_id, sender);
Ok(AsyncRemoteSubscriber {
client: self.client.clone(),
database: self.name.clone(),
id: subscriber_id,
receiver: Receiver::new(receiver),
#[cfg(not(target_arch = "wasm32"))]
tokio: tokio::runtime::Handle::try_current().ok().map(Arc::new),
})
}
async fn publish_bytes(
&self,
topic: Vec<u8>,
payload: Vec<u8>,
) -> Result<(), bonsaidb_core::Error> {
self.client
.send_api_request(&Publish {
database: self.name.to_string(),
topic: Bytes::from(topic),
payload: Bytes::from(payload),
})
.await?;
Ok(())
}
async fn publish_bytes_to_all(
&self,
topics: impl IntoIterator<Item = Vec<u8>> + Send + 'async_trait,
payload: Vec<u8>,
) -> Result<(), bonsaidb_core::Error> {
let topics = topics.into_iter().map(Bytes::from).collect();
self.client
.send_api_request(&PublishToAll {
database: self.name.to_string(),
topics,
payload: Bytes::from(payload),
})
.await?;
Ok(())
}
}
#[derive(Debug)]
pub struct AsyncRemoteSubscriber {
pub(crate) client: AsyncClient,
pub(crate) database: Arc<String>,
pub(crate) id: u64,
pub(crate) receiver: Receiver,
#[cfg(not(target_arch = "wasm32"))]
pub(crate) tokio: Option<Arc<tokio::runtime::Handle>>,
}
#[async_trait]
impl AsyncSubscriber for AsyncRemoteSubscriber {
async fn subscribe_to_bytes(&self, topic: Vec<u8>) -> Result<(), bonsaidb_core::Error> {
self.client
.send_api_request(&SubscribeTo {
database: self.database.to_string(),
subscriber_id: self.id,
topic: Bytes::from(topic),
})
.await?;
Ok(())
}
async fn unsubscribe_from_bytes(&self, topic: &[u8]) -> Result<(), bonsaidb_core::Error> {
self.client
.send_api_request(&UnsubscribeFrom {
database: self.database.to_string(),
subscriber_id: self.id,
topic: Bytes::from(topic),
})
.await?;
Ok(())
}
fn receiver(&self) -> &Receiver {
&self.receiver
}
}
#[cfg(target_arch = "wasm32")]
impl Drop for AsyncRemoteSubscriber {
fn drop(&mut self) {
let client = self.client.clone();
let database = self.database.to_string();
let subscriber_id = self.id;
let drop_future = async move {
client
.unregister_subscriber_async(database, subscriber_id)
.await;
};
wasm_bindgen_futures::spawn_local(drop_future);
}
}
#[cfg(not(target_arch = "wasm32"))]
impl Drop for AsyncRemoteSubscriber {
fn drop(&mut self) {
if let Some(tokio) = &self.tokio {
let client = self.client.clone();
let database = self.database.to_string();
let subscriber_id = self.id;
tokio.spawn(async move {
client
.unregister_subscriber_async(database, subscriber_id)
.await;
});
} else {
self.client
.unregister_subscriber(self.database.to_string(), self.id);
}
}
}