1
use std::marker::PhantomData;
2

            
3
use futures::{Future, FutureExt};
4

            
5
use super::{BuilderState, Command, KeyOperation, KeyValue, Output};
6
use crate::keyvalue::{AsyncKeyValue, IncompatibleTypeError, Numeric, Value};
7
use crate::Error;
8

            
9
/// Executes a [`Command::Increment`] or [`Command::Decrement`] key-value operation.
10
#[must_use = "the key-value operation is not performed until execute() is called"]
11
pub struct Builder<'a, KeyValue, V> {
12
    kv: &'a KeyValue,
13
    namespace: Option<String>,
14
    key: String,
15
    increment: bool,
16
    amount: Numeric,
17
    saturating: bool,
18
    _value: PhantomData<V>,
19
}
20

            
21
impl<'a, K, V> Builder<'a, K, V>
22
where
23
    K: KeyValue,
24
    V: TryFrom<Numeric, Error = IncompatibleTypeError>,
25
{
26
30063
    pub(crate) const fn new(
27
30063
        kv: &'a K,
28
30063
        namespace: Option<String>,
29
30063
        increment: bool,
30
30063
        key: String,
31
30063
        amount: Numeric,
32
30063
    ) -> Self {
33
30063
        Self {
34
30063
            key,
35
30063
            kv,
36
30063
            namespace,
37
30063
            increment,
38
30063
            amount,
39
30063
            saturating: true,
40
30063
            _value: PhantomData,
41
30063
        }
42
30063
    }
43

            
44
    /// Allows overflowing the value.
45
15
    pub const fn allow_overflow(mut self) -> Self {
46
15
        self.saturating = false;
47
15
        self
48
15
    }
49

            
50
    /// Executes the operation using the configured options.
51
    #[allow(clippy::missing_panics_doc)]
52
30063
    pub fn execute(self) -> Result<V, Error> {
53
30063
        let Self {
54
30063
            kv,
55
30063
            namespace,
56
30063
            key,
57
30063
            increment,
58
30063
            amount,
59
30063
            saturating,
60
30063
            ..
61
30063
        } = self;
62
30063
        let result = kv.execute_key_operation(KeyOperation {
63
30063
            namespace,
64
30063
            key,
65
30063
            command: if increment {
66
30033
                Command::Increment { amount, saturating }
67
            } else {
68
30
                Command::Decrement { amount, saturating }
69
            },
70
9
        })?;
71
30054
        if let Output::Value(Some(Value::Numeric(value))) = result {
72
30054
            Ok(V::try_from(value).expect("server should send back identical type"))
73
        } else {
74
            unreachable!("Unexpected result from key value operation")
75
        }
76
30063
    }
77
}
78

            
79
/// Executes a [`Command::Increment`] or [`Command::Decrement`] key-value operation when awaited.
80
#[must_use = "futures do nothing unless you `.await` or poll them"]
81
pub struct AsyncBuilder<'a, KeyValue, V> {
82
    state: BuilderState<'a, Options<'a, KeyValue>, Result<V, Error>>,
83
}
84

            
85
struct Options<'a, KeyValue> {
86
    kv: &'a KeyValue,
87
    namespace: Option<String>,
88
    key: String,
89
    increment: bool,
90
    amount: Numeric,
91
    saturating: bool,
92
}
93

            
94
impl<'a, K, V> AsyncBuilder<'a, K, V>
95
where
96
    K: AsyncKeyValue,
97
{
98
50114
    pub(crate) const fn new(
99
50114
        kv: &'a K,
100
50114
        namespace: Option<String>,
101
50114
        increment: bool,
102
50114
        key: String,
103
50114
        amount: Numeric,
104
50114
    ) -> Self {
105
50114
        Self {
106
50114
            state: BuilderState::Pending(Some(Options {
107
50114
                key,
108
50114
                kv,
109
50114
                namespace,
110
50114
                increment,
111
50114
                amount,
112
50114
                saturating: true,
113
50114
            })),
114
50114
        }
115
50114
    }
116

            
117
25
    fn options(&mut self) -> &mut Options<'a, K> {
118
25
        if let BuilderState::Pending(Some(options)) = &mut self.state {
119
25
            options
120
        } else {
121
            panic!("Attempted to use after retrieving the result")
122
        }
123
25
    }
124

            
125
    /// Allows overflowing the value.
126
25
    pub fn allow_overflow(mut self) -> Self {
127
25
        self.options().saturating = false;
128
25
        self
129
25
    }
130
}
131

            
132
impl<'a, K, V> Future for AsyncBuilder<'a, K, V>
133
where
134
    K: AsyncKeyValue,
135
    V: TryFrom<Numeric, Error = IncompatibleTypeError>,
136
{
137
    type Output = Result<V, Error>;
138

            
139
131510
    fn poll(
140
131510
        mut self: std::pin::Pin<&mut Self>,
141
131510
        cx: &mut std::task::Context<'_>,
142
131510
    ) -> std::task::Poll<Self::Output> {
143
131510
        match &mut self.state {
144
81396
            BuilderState::Executing(future) => future.as_mut().poll(cx),
145
50114
            BuilderState::Pending(builder) => {
146
50114
                let Options {
147
50114
                    kv,
148
50114
                    namespace,
149
50114
                    key,
150
50114
                    increment,
151
50114
                    amount,
152
50114
                    saturating,
153
50114
                } = builder.take().expect("expected builder to have options");
154
50114
                let future = async move {
155
50114
                    let result = kv
156
50114
                        .execute_key_operation(KeyOperation {
157
50114
                            namespace,
158
50114
                            key,
159
50114
                            command: if increment {
160
50064
                                Command::Increment { amount, saturating }
161
                            } else {
162
50
                                Command::Decrement { amount, saturating }
163
                            },
164
                        })
165
31282
                        .await?;
166
50099
                    if let Output::Value(Some(Value::Numeric(value))) = result {
167
50099
                        Ok(V::try_from(value).expect("server should send back identical type"))
168
                    } else {
169
                        unreachable!("Unexpected result from key value operation")
170
                    }
171
50114
                }
172
50114
                .boxed();
173
50114

            
174
50114
                self.state = BuilderState::Executing(future);
175
50114
                self.poll(cx)
176
            }
177
        }
178
131510
    }
179
}