1
use std::{
2
    ops::Add,
3
    time::{Duration, SystemTime},
4
};
5

            
6
use futures::{Future, FutureExt};
7
use serde::{Deserialize, Serialize};
8

            
9
use super::{
10
    BuilderState, Command, KeyCheck, KeyOperation, KeyStatus, KeyValue, Output, PendingValue,
11
    Timestamp,
12
};
13
use crate::{
14
    keyvalue::{SetCommand, Value},
15
    Error,
16
};
17

            
18
/// Executes [`Command::Set`] when awaited. Also offers methods to customize the
19
/// options for the operation.
20
#[must_use = "futures do nothing unless you `.await` or poll them"]
21
pub struct Builder<'a, KeyValue, V> {
22
    state: BuilderState<'a, Options<'a, KeyValue, V>, Result<KeyStatus, Error>>,
23
}
24

            
25
struct Options<'a, KeyValue, V> {
26
    kv: &'a KeyValue,
27
    namespace: Option<String>,
28
    key: String,
29
    value: PendingValue<'a, V>,
30
    expiration: Option<Timestamp>,
31
    keep_existing_expiration: bool,
32
    check: Option<KeyCheck>,
33
}
34

            
35
impl<'a, K, V> Builder<'a, K, V>
36
where
37
    K: KeyValue,
38
    V: Serialize + Send + Sync,
39
{
40
174
    pub(crate) fn new(
41
174
        kv: &'a K,
42
174
        namespace: Option<String>,
43
174
        key: String,
44
174
        value: PendingValue<'a, V>,
45
174
    ) -> Self {
46
174
        Self {
47
174
            state: BuilderState::Pending(Some(Options {
48
174
                key,
49
174
                value,
50
174
                kv,
51
174
                namespace,
52
174
                expiration: None,
53
174
                keep_existing_expiration: false,
54
174
                check: None,
55
174
            })),
56
174
        }
57
174
    }
58

            
59
50
    fn options(&mut self) -> &mut Options<'a, K, V> {
60
50
        if let BuilderState::Pending(Some(options)) = &mut self.state {
61
50
            options
62
        } else {
63
            panic!("Attempted to use after retrieving the result")
64
        }
65
50
    }
66

            
67
    /// Set this key to expire after `duration` from now.
68
27
    pub fn expire_in(mut self, duration: Duration) -> Self {
69
27
        // TODO consider using checked_add here and making it return an error.
70
27
        self.options().expiration = Some(Timestamp::from(SystemTime::now().add(duration)));
71
27
        self
72
27
    }
73

            
74
    /// Set this key to expire at the provided `time`.
75
    pub fn expire_at(mut self, time: SystemTime) -> Self {
76
        // TODO consider using checked_add here and making it return an error.
77
        self.options().expiration = Some(Timestamp::from(time));
78
        self
79
    }
80

            
81
    /// If the key already exists, do not update the currently set expiration.
82
4
    pub fn keep_existing_expiration(mut self) -> Self {
83
4
        self.options().keep_existing_expiration = true;
84
4
        self
85
4
    }
86

            
87
    /// Only set the value if this key already exists.
88
9
    pub fn only_if_exists(mut self) -> Self {
89
9
        self.options().check = Some(KeyCheck::OnlyIfPresent);
90
9
        self
91
9
    }
92

            
93
    /// Only set the value if this key isn't present.
94
10
    pub fn only_if_vacant(mut self) -> Self {
95
10
        self.options().check = Some(KeyCheck::OnlyIfVacant);
96
10
        self
97
10
    }
98

            
99
    /// Executes the Set operation, requesting the previous value be returned.
100
    /// If no change is made, None will be returned.
101
    #[allow(clippy::missing_panics_doc)]
102
13
    pub async fn returning_previous(self) -> Result<Option<Value>, Error> {
103
13
        if let BuilderState::Pending(Some(builder)) = self.state {
104
            let Options {
105
13
                kv,
106
13
                namespace,
107
13
                key,
108
13
                value,
109
13
                expiration,
110
13
                keep_existing_expiration,
111
13
                check,
112
            } = builder;
113

            
114
13
            let result = kv
115
                .execute_key_operation(KeyOperation {
116
13
                    namespace,
117
13
                    key,
118
13
                    command: Command::Set(SetCommand {
119
13
                        value: value.prepare()?,
120
13
                        expiration,
121
13
                        keep_existing_expiration,
122
13
                        check,
123
                        return_previous_value: true,
124
                    }),
125
6
                })
126
6
                .await?;
127
            match result {
128
13
                Output::Value(value) => Ok(value),
129
                Output::Status(KeyStatus::NotChanged) => Ok(None),
130
                Output::Status(_) => unreachable!("Unexpected output from Set"),
131
            }
132
        } else {
133
            panic!("Using future after it's been executed")
134
        }
135
13
    }
136

            
137
    /// Executes the Set operation, requesting the previous value be returned.
138
    /// If no change is made, None will be returned.
139
    #[allow(clippy::missing_panics_doc)]
140
9
    pub async fn returning_previous_as<OtherV: for<'de> Deserialize<'de>>(
141
9
        self,
142
9
    ) -> Result<Option<OtherV>, Error> {
143
9
        self.returning_previous()
144
4
            .await?
145
9
            .map(|value| value.deserialize())
146
9
            .transpose()
147
9
    }
148
}
149

            
150
impl<'a, K, V> Future for Builder<'a, K, V>
151
where
152
    K: KeyValue,
153
    V: Serialize + Send + Sync,
154
{
155
    type Output = Result<KeyStatus, Error>;
156

            
157
414
    fn poll(
158
414
        mut self: std::pin::Pin<&mut Self>,
159
414
        cx: &mut std::task::Context<'_>,
160
414
    ) -> std::task::Poll<Self::Output> {
161
414
        match &mut self.state {
162
253
            BuilderState::Executing(future) => future.as_mut().poll(cx),
163
161
            BuilderState::Pending(builder) => {
164
161
                let Options {
165
161
                    kv,
166
161
                    namespace,
167
161
                    key,
168
161
                    value,
169
161
                    expiration,
170
161
                    keep_existing_expiration,
171
161
                    check,
172
161
                } = builder.take().expect("expected builder to have options");
173
161
                let future = async move {
174
155
                    let result = kv
175
                        .execute_key_operation(KeyOperation {
176
161
                            namespace,
177
161
                            key,
178
161
                            command: Command::Set(SetCommand {
179
161
                                value: value.prepare()?,
180
161
                                expiration,
181
161
                                keep_existing_expiration,
182
161
                                check,
183
                                return_previous_value: false,
184
                            }),
185
92
                        })
186
93
                        .await?;
187
155
                    if let Output::Status(status) = result {
188
155
                        Ok(status)
189
                    } else {
190
                        unreachable!("Unexpected output from Set")
191
                    }
192
161
                }
193
161
                .boxed();
194
161

            
195
161
                self.state = BuilderState::Executing(future);
196
161
                self.poll(cx)
197
            }
198
        }
199
414
    }
200
}