1
use std::marker::PhantomData;
2

            
3
use futures::{Future, FutureExt};
4

            
5
use super::{BuilderState, Command, KeyOperation, KeyValue, Output};
6
use crate::{
7
    keyvalue::{AsyncKeyValue, IncompatibleTypeError, Numeric, Value},
8
    Error,
9
};
10

            
11
/// Executes a [`Command::Increment`] or [`Command::Decrement`] key-value operation.
12
#[must_use = "the key-value operation is not performed until execute() is called"]
13
pub struct Builder<'a, KeyValue, V> {
14
    kv: &'a KeyValue,
15
    namespace: Option<String>,
16
    key: String,
17
    increment: bool,
18
    amount: Numeric,
19
    saturating: bool,
20
    _value: PhantomData<V>,
21
}
22

            
23
impl<'a, K, V> Builder<'a, K, V>
24
where
25
    K: KeyValue,
26
    V: TryFrom<Numeric, Error = IncompatibleTypeError>,
27
{
28
30061
    pub(crate) fn new(
29
30061
        kv: &'a K,
30
30061
        namespace: Option<String>,
31
30061
        increment: bool,
32
30061
        key: String,
33
30061
        amount: Numeric,
34
30061
    ) -> Self {
35
30061
        Self {
36
30061
            key,
37
30061
            kv,
38
30061
            namespace,
39
30061
            increment,
40
30061
            amount,
41
30061
            saturating: true,
42
30061
            _value: PhantomData,
43
30061
        }
44
30061
    }
45

            
46
    /// Allows overflowing the value.
47
15
    pub fn allow_overflow(mut self) -> Self {
48
15
        self.saturating = false;
49
15
        self
50
15
    }
51

            
52
    /// Executes the operation using the configured options.
53
30061
    pub fn execute(self) -> Result<V, Error> {
54
30061
        let Self {
55
30061
            kv,
56
30061
            namespace,
57
30061
            key,
58
30061
            increment,
59
30061
            amount,
60
30061
            saturating,
61
30061
            ..
62
30061
        } = self;
63
30054
        let result = kv.execute_key_operation(KeyOperation {
64
30061
            namespace,
65
30061
            key,
66
30061
            command: if increment {
67
30031
                Command::Increment { amount, saturating }
68
            } else {
69
30
                Command::Decrement { amount, saturating }
70
            },
71
9
        })?;
72
30054
        if let Output::Value(Some(Value::Numeric(value))) = result {
73
30054
            Ok(V::try_from(value).expect("server should send back identical type"))
74
        } else {
75
            unreachable!("Unexpected result from key value operation")
76
        }
77
30063
    }
78
}
79

            
80
/// Executes a [`Command::Increment`] or [`Command::Decrement`] key-value operation when awaited.
81
#[must_use = "futures do nothing unless you `.await` or poll them"]
82
pub struct AsyncBuilder<'a, KeyValue, V> {
83
    state: BuilderState<'a, Options<'a, KeyValue>, Result<V, Error>>,
84
}
85

            
86
struct Options<'a, KeyValue> {
87
    kv: &'a KeyValue,
88
    namespace: Option<String>,
89
    key: String,
90
    increment: bool,
91
    amount: Numeric,
92
    saturating: bool,
93
}
94

            
95
impl<'a, K, V> AsyncBuilder<'a, K, V>
96
where
97
    K: AsyncKeyValue,
98
{
99
50114
    pub(crate) fn new(
100
50114
        kv: &'a K,
101
50114
        namespace: Option<String>,
102
50114
        increment: bool,
103
50114
        key: String,
104
50114
        amount: Numeric,
105
50114
    ) -> Self {
106
50114
        Self {
107
50114
            state: BuilderState::Pending(Some(Options {
108
50114
                key,
109
50114
                kv,
110
50114
                namespace,
111
50114
                increment,
112
50114
                amount,
113
50114
                saturating: true,
114
50114
            })),
115
50114
        }
116
50114
    }
117

            
118
25
    fn options(&mut self) -> &mut Options<'a, K> {
119
25
        if let BuilderState::Pending(Some(options)) = &mut self.state {
120
25
            options
121
        } else {
122
            panic!("Attempted to use after retrieving the result")
123
        }
124
25
    }
125

            
126
    /// Allows overflowing the value.
127
25
    pub fn allow_overflow(mut self) -> Self {
128
25
        self.options().saturating = false;
129
25
        self
130
25
    }
131
}
132

            
133
impl<'a, K, V> Future for AsyncBuilder<'a, K, V>
134
where
135
    K: AsyncKeyValue,
136
    V: TryFrom<Numeric, Error = IncompatibleTypeError>,
137
{
138
    type Output = Result<V, Error>;
139

            
140
141788
    fn poll(
141
141788
        mut self: std::pin::Pin<&mut Self>,
142
141788
        cx: &mut std::task::Context<'_>,
143
141788
    ) -> std::task::Poll<Self::Output> {
144
141788
        match &mut self.state {
145
91674
            BuilderState::Executing(future) => future.as_mut().poll(cx),
146
50114
            BuilderState::Pending(builder) => {
147
50114
                let Options {
148
50114
                    kv,
149
50114
                    namespace,
150
50114
                    key,
151
50114
                    increment,
152
50114
                    amount,
153
50114
                    saturating,
154
50114
                } = builder.take().expect("expected builder to have options");
155
50114
                let future = async move {
156
50099
                    let result = kv
157
                        .execute_key_operation(KeyOperation {
158
50114
                            namespace,
159
50114
                            key,
160
50114
                            command: if increment {
161
50064
                                Command::Increment { amount, saturating }
162
                            } else {
163
50
                                Command::Decrement { amount, saturating }
164
                            },
165
41560
                        })
166
41560
                        .await?;
167
50099
                    if let Output::Value(Some(Value::Numeric(value))) = result {
168
50099
                        Ok(V::try_from(value).expect("server should send back identical type"))
169
                    } else {
170
                        unreachable!("Unexpected result from key value operation")
171
                    }
172
50114
                }
173
50114
                .boxed();
174
50114

            
175
50114
                self.state = BuilderState::Executing(future);
176
50114
                self.poll(cx)
177
            }
178
        }
179
141788
    }
180
}