Lines
85.56 %
Functions
45.05 %
Branches
100 %
use std::sync::Arc;
use bonsaidb_core::{
custom_api::{CustomApi, CustomApiResult},
networking::{Payload, Response},
};
use bonsaidb_utils::fast_async_lock;
use flume::Receiver;
use futures::{
stream::{SplitSink, SplitStream},
SinkExt, StreamExt,
use tokio::net::TcpStream;
use tokio_tungstenite::{tungstenite::Message, MaybeTlsStream, WebSocketStream};
use url::Url;
use super::{CustomApiCallback, PendingRequest};
use crate::{
client::{OutstandingRequestMapHandle, SubscriberMap},
Error,
pub async fn reconnecting_client_loop<A: CustomApi>(
url: Url,
protocol_version: &str,
request_receiver: Receiver<PendingRequest<A>>,
custom_api_callback: Option<Arc<dyn CustomApiCallback<A>>>,
subscribers: SubscriberMap,
) -> Result<(), Error<A::Error>> {
while let Ok(request) = {
subscribers.clear().await;
request_receiver.recv_async().await
} {
let (stream, _) = match tokio_tungstenite::connect_async(
tokio_tungstenite::tungstenite::handshake::client::Request::get(url.as_str())
.header("Sec-WebSocket-Protocol", protocol_version)
.body(())
.unwrap(),
)
.await
{
Ok(result) => result,
Err(err) => {
drop(request.responder.send(Err(Error::from(err))));
continue;
}
let (mut sender, receiver) = stream.split();
let outstanding_requests = OutstandingRequestMapHandle::default();
let mut outstanding_requests = fast_async_lock!(outstanding_requests);
if let Err(err) = sender
.send(Message::Binary(bincode::serialize(&request.request)?))
outstanding_requests.insert(
request.request.id.expect("all requests must have ids"),
request,
);
if let Err(err) = tokio::try_join!(
request_sender(&request_receiver, sender, outstanding_requests.clone()),
response_processor(
receiver,
outstanding_requests.clone(),
custom_api_callback.as_deref(),
subscribers.clone()
) {
// Our socket was disconnected, clear the outstanding requests before returning.
for (_, pending) in outstanding_requests.drain() {
drop(pending.responder.send(Err(Error::Disconnected)));
log::error!("Error on socket {:?}", err);
Ok(())
async fn request_sender<Api: CustomApi>(
request_receiver: &Receiver<PendingRequest<Api>>,
mut sender: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
outstanding_requests: OutstandingRequestMapHandle<Api>,
) -> Result<(), Error<Api::Error>> {
while let Ok(pending) = request_receiver.recv_async().await {
sender
.send(Message::Binary(bincode::serialize(&pending.request)?))
.await?;
pending.request.id.expect("all requests must have ids"),
pending,
Err(Error::Disconnected)
#[allow(clippy::collapsible_else_if)] // not possible due to cfg statement
async fn response_processor<A: CustomApi>(
mut receiver: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
outstanding_requests: OutstandingRequestMapHandle<A>,
custom_api_callback: Option<&dyn CustomApiCallback<A>>,
while let Some(message) = receiver.next().await {
let message = message?;
match message {
Message::Binary(response) => {
let payload =
bincode::deserialize::<Payload<Response<CustomApiResult<A>>>>(&response)?;
super::process_response_payload(
payload,
&outstanding_requests,
custom_api_callback,
&subscribers,
.await;
other => {
log::error!("Unexpected websocket message: {:?}", other);