1
use std::ops::Add;
2
use std::time::{Duration, SystemTime};
3

            
4
use futures::{Future, FutureExt};
5
use serde::{Deserialize, Serialize};
6

            
7
use super::{
8
    BuilderState, Command, KeyCheck, KeyOperation, KeyStatus, KeyValue, Output, PendingValue,
9
    Timestamp,
10
};
11
use crate::keyvalue::{AsyncKeyValue, SetCommand, Value};
12
use crate::Error;
13

            
14
/// Builder for a [`Command::Set`] key-value operation.
15
#[must_use = "the key-value operation is not performed until execute() is called"]
16
pub struct Builder<'a, KeyValue, V> {
17
    kv: &'a KeyValue,
18
    namespace: Option<String>,
19
    key: String,
20
    value: PendingValue<'a, V>,
21
    expiration: Option<Timestamp>,
22
    keep_existing_expiration: bool,
23
    check: Option<KeyCheck>,
24
}
25

            
26
impl<'a, K, V> Builder<'a, K, V>
27
where
28
    K: KeyValue,
29
    V: Serialize + Send + Sync,
30
{
31
125
    pub(crate) const fn new(
32
125
        kv: &'a K,
33
125
        namespace: Option<String>,
34
125
        key: String,
35
125
        value: PendingValue<'a, V>,
36
125
    ) -> Self {
37
125
        Self {
38
125
            key,
39
125
            value,
40
125
            kv,
41
125
            namespace,
42
125
            expiration: None,
43
125
            keep_existing_expiration: false,
44
125
            check: None,
45
125
        }
46
125
    }
47

            
48
    /// Set this key to expire after `duration` from now.
49
22
    pub fn expire_in(mut self, duration: Duration) -> Self {
50
22
        // TODO consider using checked_add here and making it return an error.
51
22
        self.expiration = Some(Timestamp::from(SystemTime::now().add(duration)));
52
22
        self
53
22
    }
54

            
55
    /// Set this key to expire at the provided `time`.
56
    pub fn expire_at(mut self, time: SystemTime) -> Self {
57
        // TODO consider using checked_add here and making it return an error.
58
        self.expiration = Some(Timestamp::from(time));
59
        self
60
    }
61

            
62
    /// If the key already exists, do not update the currently set expiration.
63
3
    pub const fn keep_existing_expiration(mut self) -> Self {
64
3
        self.keep_existing_expiration = true;
65
3
        self
66
3
    }
67

            
68
    /// Only set the value if this key already exists.
69
7
    pub const fn only_if_exists(mut self) -> Self {
70
7
        self.check = Some(KeyCheck::OnlyIfPresent);
71
7
        self
72
7
    }
73

            
74
    /// Only set the value if this key isn't present.
75
8
    pub const fn only_if_vacant(mut self) -> Self {
76
8
        self.check = Some(KeyCheck::OnlyIfVacant);
77
8
        self
78
8
    }
79

            
80
    /// Executes the Set operation, requesting the previous value be returned.
81
    /// If no change is made, None will be returned.
82
    #[allow(clippy::missing_panics_doc)]
83
10
    pub fn returning_previous(self) -> Result<Option<Value>, Error> {
84
10
        let Self {
85
10
            kv,
86
10
            namespace,
87
10
            key,
88
10
            value,
89
10
            expiration,
90
10
            keep_existing_expiration,
91
10
            check,
92
10
        } = self;
93

            
94
10
        let result = kv.execute_key_operation(KeyOperation {
95
10
            namespace,
96
10
            key,
97
10
            command: Command::Set(SetCommand {
98
10
                value: value.prepare()?,
99
10
                expiration,
100
10
                keep_existing_expiration,
101
10
                check,
102
                return_previous_value: true,
103
            }),
104
        })?;
105
        match result {
106
10
            Output::Value(value) => Ok(value),
107
            Output::Status(KeyStatus::NotChanged) => Ok(None),
108
            Output::Status(_) => unreachable!("Unexpected output from Set"),
109
        }
110
10
    }
111

            
112
    /// Executes the Set operation, requesting the previous value be returned.
113
    /// If no change is made, None will be returned.
114
    #[allow(clippy::missing_panics_doc)]
115
7
    pub fn returning_previous_as<OtherV: for<'de> Deserialize<'de>>(
116
7
        self,
117
7
    ) -> Result<Option<OtherV>, Error> {
118
7
        self.returning_previous()?
119
7
            .map(|value| value.deserialize())
120
7
            .transpose()
121
7
    }
122

            
123
    /// Executes the operation using the configured options.
124
115
    pub fn execute(self) -> Result<KeyStatus, Error> {
125
115
        let Self {
126
115
            kv,
127
115
            namespace,
128
115
            key,
129
115
            value,
130
115
            expiration,
131
115
            keep_existing_expiration,
132
115
            check,
133
115
        } = self;
134
115
        let result = kv.execute_key_operation(KeyOperation {
135
115
            namespace,
136
115
            key,
137
115
            command: Command::Set(SetCommand {
138
115
                value: value.prepare()?,
139
115
                expiration,
140
115
                keep_existing_expiration,
141
115
                check,
142
                return_previous_value: false,
143
            }),
144
3
        })?;
145
112
        if let Output::Status(status) = result {
146
112
            Ok(status)
147
        } else {
148
            unreachable!("Unexpected output from Set")
149
        }
150
115
    }
151
}
152

            
153
/// Builder for a [`Command::Set`] key-value operation.
154
#[must_use = "futures do nothing unless you `.await` or poll them"]
155
pub struct AsyncBuilder<'a, KeyValue, V> {
156
    state: BuilderState<'a, Options<'a, KeyValue, V>, Result<KeyStatus, Error>>,
157
}
158

            
159
struct Options<'a, KeyValue, V> {
160
    kv: &'a KeyValue,
161
    namespace: Option<String>,
162
    key: String,
163
    value: PendingValue<'a, V>,
164
    expiration: Option<Timestamp>,
165
    keep_existing_expiration: bool,
166
    check: Option<KeyCheck>,
167
}
168

            
169
impl<'a, K, V> AsyncBuilder<'a, K, V>
170
where
171
    K: AsyncKeyValue,
172
    V: Serialize + Send + Sync,
173
{
174
206
    pub(crate) const fn new(
175
206
        kv: &'a K,
176
206
        namespace: Option<String>,
177
206
        key: String,
178
206
        value: PendingValue<'a, V>,
179
206
    ) -> Self {
180
206
        Self {
181
206
            state: BuilderState::Pending(Some(Options {
182
206
                key,
183
206
                value,
184
206
                kv,
185
206
                namespace,
186
206
                expiration: None,
187
206
                keep_existing_expiration: false,
188
206
                check: None,
189
206
            })),
190
206
        }
191
206
    }
192

            
193
60
    fn options(&mut self) -> &mut Options<'a, K, V> {
194
60
        if let BuilderState::Pending(Some(options)) = &mut self.state {
195
60
            options
196
        } else {
197
            panic!("Attempted to use after retrieving the result")
198
        }
199
60
    }
200

            
201
    /// Set this key to expire after `duration` from now.
202
32
    pub fn expire_in(mut self, duration: Duration) -> Self {
203
32
        // TODO consider using checked_add here and making it return an error.
204
32
        self.options().expiration = Some(Timestamp::from(SystemTime::now().add(duration)));
205
32
        self
206
32
    }
207

            
208
    /// Set this key to expire at the provided `time`.
209
    pub fn expire_at(mut self, time: SystemTime) -> Self {
210
        // TODO consider using checked_add here and making it return an error.
211
        self.options().expiration = Some(Timestamp::from(time));
212
        self
213
    }
214

            
215
    /// If the key already exists, do not update the currently set expiration.
216
5
    pub fn keep_existing_expiration(mut self) -> Self {
217
5
        self.options().keep_existing_expiration = true;
218
5
        self
219
5
    }
220

            
221
    /// Only set the value if this key already exists.
222
11
    pub fn only_if_exists(mut self) -> Self {
223
11
        self.options().check = Some(KeyCheck::OnlyIfPresent);
224
11
        self
225
11
    }
226

            
227
    /// Only set the value if this key isn't present.
228
12
    pub fn only_if_vacant(mut self) -> Self {
229
12
        self.options().check = Some(KeyCheck::OnlyIfVacant);
230
12
        self
231
12
    }
232

            
233
    /// Executes the Set operation, requesting the previous value be returned.
234
    /// If no change is made, None will be returned.
235
    #[allow(clippy::missing_panics_doc)]
236
16
    pub async fn returning_previous(self) -> Result<Option<Value>, Error> {
237
16
        if let BuilderState::Pending(Some(builder)) = self.state {
238
            let Options {
239
16
                kv,
240
16
                namespace,
241
16
                key,
242
16
                value,
243
16
                expiration,
244
16
                keep_existing_expiration,
245
16
                check,
246
16
            } = builder;
247

            
248
16
            let result = kv
249
16
                .execute_key_operation(KeyOperation {
250
16
                    namespace,
251
16
                    key,
252
16
                    command: Command::Set(SetCommand {
253
16
                        value: value.prepare()?,
254
16
                        expiration,
255
16
                        keep_existing_expiration,
256
16
                        check,
257
                        return_previous_value: true,
258
                    }),
259
                })
260
16
                .await?;
261
            match result {
262
16
                Output::Value(value) => Ok(value),
263
                Output::Status(KeyStatus::NotChanged) => Ok(None),
264
                Output::Status(_) => unreachable!("Unexpected output from Set"),
265
            }
266
        } else {
267
            panic!("Using future after it's been executed")
268
        }
269
16
    }
270

            
271
    /// Executes the Set operation, requesting the previous value be returned.
272
    /// If no change is made, None will be returned.
273
    #[allow(clippy::missing_panics_doc)]
274
11
    pub async fn returning_previous_as<OtherV: for<'de> Deserialize<'de>>(
275
11
        self,
276
11
    ) -> Result<Option<OtherV>, Error> {
277
11
        self.returning_previous()
278
11
            .await?
279
11
            .map(|value| value.deserialize())
280
11
            .transpose()
281
11
    }
282
}
283

            
284
impl<'a, K, V> Future for AsyncBuilder<'a, K, V>
285
where
286
    K: AsyncKeyValue,
287
    V: Serialize + Send + Sync,
288
{
289
    type Output = Result<KeyStatus, Error>;
290

            
291
577
    fn poll(
292
577
        mut self: std::pin::Pin<&mut Self>,
293
577
        cx: &mut std::task::Context<'_>,
294
577
    ) -> std::task::Poll<Self::Output> {
295
577
        match &mut self.state {
296
387
            BuilderState::Executing(future) => future.as_mut().poll(cx),
297
190
            BuilderState::Pending(builder) => {
298
190
                let Options {
299
190
                    kv,
300
190
                    namespace,
301
190
                    key,
302
190
                    value,
303
190
                    expiration,
304
190
                    keep_existing_expiration,
305
190
                    check,
306
190
                } = builder.take().expect("expected builder to have options");
307
190
                let future = async move {
308
190
                    let result = kv
309
190
                        .execute_key_operation(KeyOperation {
310
190
                            namespace,
311
190
                            key,
312
190
                            command: Command::Set(SetCommand {
313
190
                                value: value.prepare()?,
314
190
                                expiration,
315
190
                                keep_existing_expiration,
316
190
                                check,
317
                                return_previous_value: false,
318
                            }),
319
                        })
320
197
                        .await?;
321
183
                    if let Output::Status(status) = result {
322
183
                        Ok(status)
323
                    } else {
324
                        unreachable!("Unexpected output from Set")
325
                    }
326
190
                }
327
190
                .boxed();
328
190

            
329
190
                self.state = BuilderState::Executing(future);
330
190
                self.poll(cx)
331
            }
332
        }
333
577
    }
334
}