1
use std::{
2
    ops::Add,
3
    time::{Duration, SystemTime},
4
};
5

            
6
use futures::{Future, FutureExt};
7
use serde::{Deserialize, Serialize};
8

            
9
use super::{
10
    BuilderState, Command, KeyCheck, KeyOperation, KeyStatus, KeyValue, Output, PendingValue,
11
    Timestamp,
12
};
13
use crate::{
14
    keyvalue::{AsyncKeyValue, SetCommand, Value},
15
    Error,
16
};
17

            
18
/// Builder for a [`Command::Set`] key-value operation.
19
#[must_use = "the key-value operation is not performed until execute() is called"]
20
pub struct Builder<'a, KeyValue, V> {
21
    kv: &'a KeyValue,
22
    namespace: Option<String>,
23
    key: String,
24
    value: PendingValue<'a, V>,
25
    expiration: Option<Timestamp>,
26
    keep_existing_expiration: bool,
27
    check: Option<KeyCheck>,
28
}
29

            
30
impl<'a, K, V> Builder<'a, K, V>
31
where
32
    K: KeyValue,
33
    V: Serialize + Send + Sync,
34
{
35
124
    pub(crate) fn new(
36
124
        kv: &'a K,
37
124
        namespace: Option<String>,
38
124
        key: String,
39
124
        value: PendingValue<'a, V>,
40
124
    ) -> Self {
41
124
        Self {
42
124
            key,
43
124
            value,
44
124
            kv,
45
124
            namespace,
46
124
            expiration: None,
47
124
            keep_existing_expiration: false,
48
124
            check: None,
49
124
        }
50
124
    }
51

            
52
    /// Set this key to expire after `duration` from now.
53
21
    pub fn expire_in(mut self, duration: Duration) -> Self {
54
21
        // TODO consider using checked_add here and making it return an error.
55
21
        self.expiration = Some(Timestamp::from(SystemTime::now().add(duration)));
56
21
        self
57
21
    }
58

            
59
    /// Set this key to expire at the provided `time`.
60
    pub fn expire_at(mut self, time: SystemTime) -> Self {
61
        // TODO consider using checked_add here and making it return an error.
62
        self.expiration = Some(Timestamp::from(time));
63
        self
64
    }
65

            
66
    /// If the key already exists, do not update the currently set expiration.
67
3
    pub fn keep_existing_expiration(mut self) -> Self {
68
3
        self.keep_existing_expiration = true;
69
3
        self
70
3
    }
71

            
72
    /// Only set the value if this key already exists.
73
7
    pub fn only_if_exists(mut self) -> Self {
74
7
        self.check = Some(KeyCheck::OnlyIfPresent);
75
7
        self
76
7
    }
77

            
78
    /// Only set the value if this key isn't present.
79
8
    pub fn only_if_vacant(mut self) -> Self {
80
8
        self.check = Some(KeyCheck::OnlyIfVacant);
81
8
        self
82
8
    }
83

            
84
    /// Executes the Set operation, requesting the previous value be returned.
85
    /// If no change is made, None will be returned.
86
    #[allow(clippy::missing_panics_doc)]
87
10
    pub fn returning_previous(self) -> Result<Option<Value>, Error> {
88
10
        let Self {
89
10
            kv,
90
10
            namespace,
91
10
            key,
92
10
            value,
93
10
            expiration,
94
10
            keep_existing_expiration,
95
10
            check,
96
10
        } = self;
97

            
98
10
        let result = kv.execute_key_operation(KeyOperation {
99
10
            namespace,
100
10
            key,
101
10
            command: Command::Set(SetCommand {
102
10
                value: value.prepare()?,
103
10
                expiration,
104
10
                keep_existing_expiration,
105
10
                check,
106
                return_previous_value: true,
107
            }),
108
        })?;
109
        match result {
110
10
            Output::Value(value) => Ok(value),
111
            Output::Status(KeyStatus::NotChanged) => Ok(None),
112
            Output::Status(_) => unreachable!("Unexpected output from Set"),
113
        }
114
10
    }
115

            
116
    /// Executes the Set operation, requesting the previous value be returned.
117
    /// If no change is made, None will be returned.
118
    #[allow(clippy::missing_panics_doc)]
119
    pub fn returning_previous_as<OtherV: for<'de> Deserialize<'de>>(
120
        self,
121
    ) -> Result<Option<OtherV>, Error> {
122
7
        self.returning_previous()?
123
7
            .map(|value| value.deserialize())
124
7
            .transpose()
125
7
    }
126

            
127
    /// Executes the operation using the configured options.
128
114
    pub fn execute(self) -> Result<KeyStatus, Error> {
129
114
        let Self {
130
114
            kv,
131
114
            namespace,
132
114
            key,
133
114
            value,
134
114
            expiration,
135
114
            keep_existing_expiration,
136
114
            check,
137
114
        } = self;
138
111
        let result = kv.execute_key_operation(KeyOperation {
139
114
            namespace,
140
114
            key,
141
114
            command: Command::Set(SetCommand {
142
114
                value: value.prepare()?,
143
114
                expiration,
144
114
                keep_existing_expiration,
145
114
                check,
146
                return_previous_value: false,
147
            }),
148
3
        })?;
149
111
        if let Output::Status(status) = result {
150
111
            Ok(status)
151
        } else {
152
            unreachable!("Unexpected output from Set")
153
        }
154
114
    }
155
}
156

            
157
/// Builder for a [`Command::Set`] key-value operation.
158
#[must_use = "futures do nothing unless you `.await` or poll them"]
159
pub struct AsyncBuilder<'a, KeyValue, V> {
160
    state: BuilderState<'a, Options<'a, KeyValue, V>, Result<KeyStatus, Error>>,
161
}
162

            
163
struct Options<'a, KeyValue, V> {
164
    kv: &'a KeyValue,
165
    namespace: Option<String>,
166
    key: String,
167
    value: PendingValue<'a, V>,
168
    expiration: Option<Timestamp>,
169
    keep_existing_expiration: bool,
170
    check: Option<KeyCheck>,
171
}
172

            
173
impl<'a, K, V> AsyncBuilder<'a, K, V>
174
where
175
    K: AsyncKeyValue,
176
    V: Serialize + Send + Sync,
177
{
178
206
    pub(crate) fn new(
179
206
        kv: &'a K,
180
206
        namespace: Option<String>,
181
206
        key: String,
182
206
        value: PendingValue<'a, V>,
183
206
    ) -> Self {
184
206
        Self {
185
206
            state: BuilderState::Pending(Some(Options {
186
206
                key,
187
206
                value,
188
206
                kv,
189
206
                namespace,
190
206
                expiration: None,
191
206
                keep_existing_expiration: false,
192
206
                check: None,
193
206
            })),
194
206
        }
195
206
    }
196

            
197
60
    fn options(&mut self) -> &mut Options<'a, K, V> {
198
60
        if let BuilderState::Pending(Some(options)) = &mut self.state {
199
60
            options
200
        } else {
201
            panic!("Attempted to use after retrieving the result")
202
        }
203
60
    }
204

            
205
    /// Set this key to expire after `duration` from now.
206
32
    pub fn expire_in(mut self, duration: Duration) -> Self {
207
32
        // TODO consider using checked_add here and making it return an error.
208
32
        self.options().expiration = Some(Timestamp::from(SystemTime::now().add(duration)));
209
32
        self
210
32
    }
211

            
212
    /// Set this key to expire at the provided `time`.
213
    pub fn expire_at(mut self, time: SystemTime) -> Self {
214
        // TODO consider using checked_add here and making it return an error.
215
        self.options().expiration = Some(Timestamp::from(time));
216
        self
217
    }
218

            
219
    /// If the key already exists, do not update the currently set expiration.
220
5
    pub fn keep_existing_expiration(mut self) -> Self {
221
5
        self.options().keep_existing_expiration = true;
222
5
        self
223
5
    }
224

            
225
    /// Only set the value if this key already exists.
226
11
    pub fn only_if_exists(mut self) -> Self {
227
11
        self.options().check = Some(KeyCheck::OnlyIfPresent);
228
11
        self
229
11
    }
230

            
231
    /// Only set the value if this key isn't present.
232
12
    pub fn only_if_vacant(mut self) -> Self {
233
12
        self.options().check = Some(KeyCheck::OnlyIfVacant);
234
12
        self
235
12
    }
236

            
237
    /// Executes the Set operation, requesting the previous value be returned.
238
    /// If no change is made, None will be returned.
239
    #[allow(clippy::missing_panics_doc)]
240
16
    pub async fn returning_previous(self) -> Result<Option<Value>, Error> {
241
16
        if let BuilderState::Pending(Some(builder)) = self.state {
242
            let Options {
243
16
                kv,
244
16
                namespace,
245
16
                key,
246
16
                value,
247
16
                expiration,
248
16
                keep_existing_expiration,
249
16
                check,
250
            } = builder;
251

            
252
16
            let result = kv
253
                .execute_key_operation(KeyOperation {
254
16
                    namespace,
255
16
                    key,
256
16
                    command: Command::Set(SetCommand {
257
16
                        value: value.prepare()?,
258
16
                        expiration,
259
16
                        keep_existing_expiration,
260
16
                        check,
261
                        return_previous_value: true,
262
                    }),
263
16
                })
264
16
                .await?;
265
            match result {
266
16
                Output::Value(value) => Ok(value),
267
                Output::Status(KeyStatus::NotChanged) => Ok(None),
268
                Output::Status(_) => unreachable!("Unexpected output from Set"),
269
            }
270
        } else {
271
            panic!("Using future after it's been executed")
272
        }
273
16
    }
274

            
275
    /// Executes the Set operation, requesting the previous value be returned.
276
    /// If no change is made, None will be returned.
277
    #[allow(clippy::missing_panics_doc)]
278
11
    pub async fn returning_previous_as<OtherV: for<'de> Deserialize<'de>>(
279
11
        self,
280
11
    ) -> Result<Option<OtherV>, Error> {
281
11
        self.returning_previous()
282
11
            .await?
283
11
            .map(|value| value.deserialize())
284
11
            .transpose()
285
11
    }
286
}
287

            
288
impl<'a, K, V> Future for AsyncBuilder<'a, K, V>
289
where
290
    K: AsyncKeyValue,
291
    V: Serialize + Send + Sync,
292
{
293
    type Output = Result<KeyStatus, Error>;
294

            
295
579
    fn poll(
296
579
        mut self: std::pin::Pin<&mut Self>,
297
579
        cx: &mut std::task::Context<'_>,
298
579
    ) -> std::task::Poll<Self::Output> {
299
579
        match &mut self.state {
300
389
            BuilderState::Executing(future) => future.as_mut().poll(cx),
301
190
            BuilderState::Pending(builder) => {
302
190
                let Options {
303
190
                    kv,
304
190
                    namespace,
305
190
                    key,
306
190
                    value,
307
190
                    expiration,
308
190
                    keep_existing_expiration,
309
190
                    check,
310
190
                } = builder.take().expect("expected builder to have options");
311
190
                let future = async move {
312
183
                    let result = kv
313
                        .execute_key_operation(KeyOperation {
314
190
                            namespace,
315
190
                            key,
316
190
                            command: Command::Set(SetCommand {
317
190
                                value: value.prepare()?,
318
190
                                expiration,
319
190
                                keep_existing_expiration,
320
190
                                check,
321
                                return_previous_value: false,
322
                            }),
323
199
                        })
324
199
                        .await?;
325
183
                    if let Output::Status(status) = result {
326
183
                        Ok(status)
327
                    } else {
328
                        unreachable!("Unexpected output from Set")
329
                    }
330
190
                }
331
190
                .boxed();
332
190

            
333
190
                self.state = BuilderState::Executing(future);
334
190
                self.poll(cx)
335
            }
336
        }
337
579
    }
338
}