1
use std::{
2
    ops::Add,
3
    time::{Duration, SystemTime},
4
};
5

            
6
use futures::{Future, FutureExt};
7
use serde::{Deserialize, Serialize};
8

            
9
use super::{
10
    BuilderState, Command, KeyCheck, KeyOperation, KeyStatus, KeyValue, Output, PendingValue,
11
    Timestamp,
12
};
13
use crate::{
14
    keyvalue::{SetCommand, Value},
15
    Error,
16
};
17

            
18
/// Executes [`Command::Set`] when awaited. Also offers methods to customize the
19
/// options for the operation.
20
#[must_use = "futures do nothing unless you `.await` or poll them"]
21
pub struct Builder<'a, KeyValue, V> {
22
    state: BuilderState<'a, Options<'a, KeyValue, V>, Result<KeyStatus, Error>>,
23
}
24

            
25
struct Options<'a, KeyValue, V> {
26
    kv: &'a KeyValue,
27
    namespace: Option<String>,
28
    key: String,
29
    value: PendingValue<'a, V>,
30
    expiration: Option<Timestamp>,
31
    keep_existing_expiration: bool,
32
    check: Option<KeyCheck>,
33
}
34

            
35
impl<'a, K, V> Builder<'a, K, V>
36
where
37
    K: KeyValue,
38
    V: Serialize + Send + Sync,
39
{
40
210
    pub(crate) fn new(
41
210
        kv: &'a K,
42
210
        namespace: Option<String>,
43
210
        key: String,
44
210
        value: PendingValue<'a, V>,
45
210
    ) -> Self {
46
210
        Self {
47
210
            state: BuilderState::Pending(Some(Options {
48
210
                key,
49
210
                value,
50
210
                kv,
51
210
                namespace,
52
210
                expiration: None,
53
210
                keep_existing_expiration: false,
54
210
                check: None,
55
210
            })),
56
210
        }
57
210
    }
58

            
59
61
    fn options(&mut self) -> &mut Options<'a, K, V> {
60
61
        if let BuilderState::Pending(Some(options)) = &mut self.state {
61
61
            options
62
        } else {
63
            panic!("Attempted to use after retrieving the result")
64
        }
65
61
    }
66

            
67
    /// Set this key to expire after `duration` from now.
68
33
    pub fn expire_in(mut self, duration: Duration) -> Self {
69
33
        // TODO consider using checked_add here and making it return an error.
70
33
        self.options().expiration = Some(Timestamp::from(SystemTime::now().add(duration)));
71
33
        self
72
33
    }
73

            
74
    /// Set this key to expire at the provided `time`.
75
    pub fn expire_at(mut self, time: SystemTime) -> Self {
76
        // TODO consider using checked_add here and making it return an error.
77
        self.options().expiration = Some(Timestamp::from(time));
78
        self
79
    }
80

            
81
    /// If the key already exists, do not update the currently set expiration.
82
5
    pub fn keep_existing_expiration(mut self) -> Self {
83
5
        self.options().keep_existing_expiration = true;
84
5
        self
85
5
    }
86

            
87
    /// Only set the value if this key already exists.
88
11
    pub fn only_if_exists(mut self) -> Self {
89
11
        self.options().check = Some(KeyCheck::OnlyIfPresent);
90
11
        self
91
11
    }
92

            
93
    /// Only set the value if this key isn't present.
94
12
    pub fn only_if_vacant(mut self) -> Self {
95
12
        self.options().check = Some(KeyCheck::OnlyIfVacant);
96
12
        self
97
12
    }
98

            
99
    /// Executes the Set operation, requesting the previous value be returned.
100
    /// If no change is made, None will be returned.
101
    #[allow(clippy::missing_panics_doc)]
102
16
    pub async fn returning_previous(self) -> Result<Option<Value>, Error> {
103
16
        if let BuilderState::Pending(Some(builder)) = self.state {
104
            let Options {
105
16
                kv,
106
16
                namespace,
107
16
                key,
108
16
                value,
109
16
                expiration,
110
16
                keep_existing_expiration,
111
16
                check,
112
            } = builder;
113

            
114
16
            let result = kv
115
                .execute_key_operation(KeyOperation {
116
16
                    namespace,
117
16
                    key,
118
16
                    command: Command::Set(SetCommand {
119
16
                        value: value.prepare()?,
120
16
                        expiration,
121
16
                        keep_existing_expiration,
122
16
                        check,
123
                        return_previous_value: true,
124
                    }),
125
6
                })
126
6
                .await?;
127
            match result {
128
16
                Output::Value(value) => Ok(value),
129
                Output::Status(KeyStatus::NotChanged) => Ok(None),
130
                Output::Status(_) => unreachable!("Unexpected output from Set"),
131
            }
132
        } else {
133
            panic!("Using future after it's been executed")
134
        }
135
16
    }
136

            
137
    /// Executes the Set operation, requesting the previous value be returned.
138
    /// If no change is made, None will be returned.
139
    #[allow(clippy::missing_panics_doc)]
140
11
    pub async fn returning_previous_as<OtherV: for<'de> Deserialize<'de>>(
141
11
        self,
142
11
    ) -> Result<Option<OtherV>, Error> {
143
11
        self.returning_previous()
144
4
            .await?
145
11
            .map(|value| value.deserialize())
146
11
            .transpose()
147
11
    }
148
}
149

            
150
impl<'a, K, V> Future for Builder<'a, K, V>
151
where
152
    K: KeyValue,
153
    V: Serialize + Send + Sync,
154
{
155
    type Output = Result<KeyStatus, Error>;
156

            
157
480
    fn poll(
158
480
        mut self: std::pin::Pin<&mut Self>,
159
480
        cx: &mut std::task::Context<'_>,
160
480
    ) -> std::task::Poll<Self::Output> {
161
480
        match &mut self.state {
162
286
            BuilderState::Executing(future) => future.as_mut().poll(cx),
163
194
            BuilderState::Pending(builder) => {
164
194
                let Options {
165
194
                    kv,
166
194
                    namespace,
167
194
                    key,
168
194
                    value,
169
194
                    expiration,
170
194
                    keep_existing_expiration,
171
194
                    check,
172
194
                } = builder.take().expect("expected builder to have options");
173
194
                let future = async move {
174
187
                    let result = kv
175
                        .execute_key_operation(KeyOperation {
176
194
                            namespace,
177
194
                            key,
178
194
                            command: Command::Set(SetCommand {
179
194
                                value: value.prepare()?,
180
194
                                expiration,
181
194
                                keep_existing_expiration,
182
194
                                check,
183
                                return_previous_value: false,
184
                            }),
185
92
                        })
186
93
                        .await?;
187
187
                    if let Output::Status(status) = result {
188
187
                        Ok(status)
189
                    } else {
190
                        unreachable!("Unexpected output from Set")
191
                    }
192
194
                }
193
194
                .boxed();
194
194

            
195
194
                self.state = BuilderState::Executing(future);
196
194
                self.poll(cx)
197
            }
198
        }
199
480
    }
200
}