1
use futures::{Future, FutureExt};
2

            
3
use super::{BuilderState, Command, KeyOperation, KeyValue, Output};
4
use crate::{
5
    keyvalue::{IncompatibleTypeError, Numeric, Value},
6
    Error,
7
};
8

            
9
/// Executes [`Command::Set`] when awaited. Also offers methods to customize the
10
/// options for the operation.
11
#[must_use = "futures do nothing unless you `.await` or poll them"]
12
pub struct Builder<'a, KeyValue, V> {
13
    state: BuilderState<'a, Options<'a, KeyValue>, Result<V, Error>>,
14
}
15

            
16
struct Options<'a, KeyValue> {
17
    kv: &'a KeyValue,
18
    namespace: Option<String>,
19
    key: String,
20
    increment: bool,
21
    amount: Numeric,
22
    saturating: bool,
23
}
24

            
25
impl<'a, K, V> Builder<'a, K, V>
26
where
27
    K: KeyValue,
28
{
29
50110
    pub(crate) fn new(
30
50110
        kv: &'a K,
31
50110
        namespace: Option<String>,
32
50110
        increment: bool,
33
50110
        key: String,
34
50110
        amount: Numeric,
35
50110
    ) -> Self {
36
50110
        Self {
37
50110
            state: BuilderState::Pending(Some(Options {
38
50110
                key,
39
50110
                kv,
40
50110
                namespace,
41
50110
                increment,
42
50110
                amount,
43
50110
                saturating: true,
44
50110
            })),
45
50110
        }
46
50110
    }
47

            
48
25
    fn options(&mut self) -> &mut Options<'a, K> {
49
25
        if let BuilderState::Pending(Some(options)) = &mut self.state {
50
25
            options
51
        } else {
52
            panic!("Attempted to use after retrieving the result")
53
        }
54
25
    }
55

            
56
    /// Allows overflowing the value.
57
25
    pub fn allow_overflow(mut self) -> Self {
58
25
        self.options().saturating = false;
59
25
        self
60
25
    }
61
}
62

            
63
impl<'a, K, V> Future for Builder<'a, K, V>
64
where
65
    K: KeyValue,
66
    V: TryFrom<Numeric, Error = IncompatibleTypeError>,
67
{
68
    type Output = Result<V, Error>;
69

            
70
120387
    fn poll(
71
120387
        mut self: std::pin::Pin<&mut Self>,
72
120387
        cx: &mut std::task::Context<'_>,
73
120387
    ) -> std::task::Poll<Self::Output> {
74
120387
        match &mut self.state {
75
70277
            BuilderState::Executing(future) => future.as_mut().poll(cx),
76
50110
            BuilderState::Pending(builder) => {
77
50110
                let Options {
78
50110
                    kv,
79
50110
                    namespace,
80
50110
                    key,
81
50110
                    increment,
82
50110
                    amount,
83
50110
                    saturating,
84
50110
                } = builder.take().expect("expected builder to have options");
85
50110
                let future = async move {
86
50095
                    let result = kv
87
                        .execute_key_operation(KeyOperation {
88
50110
                            namespace,
89
50110
                            key,
90
50110
                            command: if increment {
91
50060
                                Command::Increment { amount, saturating }
92
                            } else {
93
50
                                Command::Decrement { amount, saturating }
94
                            },
95
20167
                        })
96
20167
                        .await?;
97
50095
                    if let Output::Value(Some(Value::Numeric(value))) = result {
98
50095
                        Ok(V::try_from(value).expect("server should send back identical type"))
99
                    } else {
100
                        unreachable!("Unexpected result from key value operation")
101
                    }
102
50110
                }
103
50110
                .boxed();
104
50110

            
105
50110
                self.state = BuilderState::Executing(future);
106
50110
                self.poll(cx)
107
            }
108
        }
109
120387
    }
110
}