use std::ops::Add;
use std::time::{Duration, SystemTime};
use futures::{Future, FutureExt};
use serde::{Deserialize, Serialize};
use super::{
BuilderState, Command, KeyCheck, KeyOperation, KeyStatus, KeyValue, Output, PendingValue,
Timestamp,
};
use crate::keyvalue::{AsyncKeyValue, SetCommand, Value};
use crate::Error;
#[must_use = "the key-value operation is not performed until execute() is called"]
pub struct Builder<'a, KeyValue, V> {
kv: &'a KeyValue,
namespace: Option<String>,
key: String,
value: PendingValue<'a, V>,
expiration: Option<Timestamp>,
keep_existing_expiration: bool,
check: Option<KeyCheck>,
}
impl<'a, K, V> Builder<'a, K, V>
where
K: KeyValue,
V: Serialize + Send + Sync,
{
pub(crate) const fn new(
kv: &'a K,
namespace: Option<String>,
key: String,
value: PendingValue<'a, V>,
) -> Self {
Self {
key,
value,
kv,
namespace,
expiration: None,
keep_existing_expiration: false,
check: None,
}
}
pub fn expire_in(mut self, duration: Duration) -> Self {
self.expiration = Some(Timestamp::from(SystemTime::now().add(duration)));
self
}
pub fn expire_at(mut self, time: SystemTime) -> Self {
self.expiration = Some(Timestamp::from(time));
self
}
pub const fn keep_existing_expiration(mut self) -> Self {
self.keep_existing_expiration = true;
self
}
pub const fn only_if_exists(mut self) -> Self {
self.check = Some(KeyCheck::OnlyIfPresent);
self
}
pub const fn only_if_vacant(mut self) -> Self {
self.check = Some(KeyCheck::OnlyIfVacant);
self
}
#[allow(clippy::missing_panics_doc)]
pub fn returning_previous(self) -> Result<Option<Value>, Error> {
let Self {
kv,
namespace,
key,
value,
expiration,
keep_existing_expiration,
check,
} = self;
let result = kv.execute_key_operation(KeyOperation {
namespace,
key,
command: Command::Set(SetCommand {
value: value.prepare()?,
expiration,
keep_existing_expiration,
check,
return_previous_value: true,
}),
})?;
match result {
Output::Value(value) => Ok(value),
Output::Status(KeyStatus::NotChanged) => Ok(None),
Output::Status(_) => unreachable!("Unexpected output from Set"),
}
}
#[allow(clippy::missing_panics_doc)]
pub fn returning_previous_as<OtherV: for<'de> Deserialize<'de>>(
self,
) -> Result<Option<OtherV>, Error> {
self.returning_previous()?
.map(|value| value.deserialize())
.transpose()
}
pub fn execute(self) -> Result<KeyStatus, Error> {
let Self {
kv,
namespace,
key,
value,
expiration,
keep_existing_expiration,
check,
} = self;
let result = kv.execute_key_operation(KeyOperation {
namespace,
key,
command: Command::Set(SetCommand {
value: value.prepare()?,
expiration,
keep_existing_expiration,
check,
return_previous_value: false,
}),
})?;
if let Output::Status(status) = result {
Ok(status)
} else {
unreachable!("Unexpected output from Set")
}
}
}
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct AsyncBuilder<'a, KeyValue, V> {
state: BuilderState<'a, Options<'a, KeyValue, V>, Result<KeyStatus, Error>>,
}
struct Options<'a, KeyValue, V> {
kv: &'a KeyValue,
namespace: Option<String>,
key: String,
value: PendingValue<'a, V>,
expiration: Option<Timestamp>,
keep_existing_expiration: bool,
check: Option<KeyCheck>,
}
impl<'a, K, V> AsyncBuilder<'a, K, V>
where
K: AsyncKeyValue,
V: Serialize + Send + Sync,
{
pub(crate) const fn new(
kv: &'a K,
namespace: Option<String>,
key: String,
value: PendingValue<'a, V>,
) -> Self {
Self {
state: BuilderState::Pending(Some(Options {
key,
value,
kv,
namespace,
expiration: None,
keep_existing_expiration: false,
check: None,
})),
}
}
fn options(&mut self) -> &mut Options<'a, K, V> {
if let BuilderState::Pending(Some(options)) = &mut self.state {
options
} else {
panic!("Attempted to use after retrieving the result")
}
}
pub fn expire_in(mut self, duration: Duration) -> Self {
self.options().expiration = Some(Timestamp::from(SystemTime::now().add(duration)));
self
}
pub fn expire_at(mut self, time: SystemTime) -> Self {
self.options().expiration = Some(Timestamp::from(time));
self
}
pub fn keep_existing_expiration(mut self) -> Self {
self.options().keep_existing_expiration = true;
self
}
pub fn only_if_exists(mut self) -> Self {
self.options().check = Some(KeyCheck::OnlyIfPresent);
self
}
pub fn only_if_vacant(mut self) -> Self {
self.options().check = Some(KeyCheck::OnlyIfVacant);
self
}
#[allow(clippy::missing_panics_doc)]
pub async fn returning_previous(self) -> Result<Option<Value>, Error> {
if let BuilderState::Pending(Some(builder)) = self.state {
let Options {
kv,
namespace,
key,
value,
expiration,
keep_existing_expiration,
check,
} = builder;
let result = kv
.execute_key_operation(KeyOperation {
namespace,
key,
command: Command::Set(SetCommand {
value: value.prepare()?,
expiration,
keep_existing_expiration,
check,
return_previous_value: true,
}),
})
.await?;
match result {
Output::Value(value) => Ok(value),
Output::Status(KeyStatus::NotChanged) => Ok(None),
Output::Status(_) => unreachable!("Unexpected output from Set"),
}
} else {
panic!("Using future after it's been executed")
}
}
#[allow(clippy::missing_panics_doc)]
pub async fn returning_previous_as<OtherV: for<'de> Deserialize<'de>>(
self,
) -> Result<Option<OtherV>, Error> {
self.returning_previous()
.await?
.map(|value| value.deserialize())
.transpose()
}
}
impl<'a, K, V> Future for AsyncBuilder<'a, K, V>
where
K: AsyncKeyValue,
V: Serialize + Send + Sync,
{
type Output = Result<KeyStatus, Error>;
fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
match &mut self.state {
BuilderState::Executing(future) => future.as_mut().poll(cx),
BuilderState::Pending(builder) => {
let Options {
kv,
namespace,
key,
value,
expiration,
keep_existing_expiration,
check,
} = builder.take().expect("expected builder to have options");
let future = async move {
let result = kv
.execute_key_operation(KeyOperation {
namespace,
key,
command: Command::Set(SetCommand {
value: value.prepare()?,
expiration,
keep_existing_expiration,
check,
return_previous_value: false,
}),
})
.await?;
if let Output::Status(status) = result {
Ok(status)
} else {
unreachable!("Unexpected output from Set")
}
}
.boxed();
self.state = BuilderState::Executing(future);
self.poll(cx)
}
}
}
}