1
use arc_bytes::serde::Bytes;
2
use serde::{Deserialize, Serialize};
3

            
4
use crate::{
5
    document::{CollectionHeader, DocumentId, Header},
6
    schema::{CollectionName, SerializedCollection},
7
    Error,
8
};
9

            
10
/// A list of operations to execute as a single unit. If any operation fails,
11
/// all changes are aborted. Reads that happen while the transaction is in
12
/// progress will return old data and not block.
13
20151
#[derive(Clone, Serialize, Deserialize, Default, Debug)]
14
#[must_use]
15
pub struct Transaction {
16
    /// The operations in this transaction.
17
    pub operations: Vec<Operation>,
18
}
19

            
20
impl Transaction {
21
    /// Returns a new, empty transaction.
22
162
    pub fn new() -> Self {
23
162
        Self::default()
24
162
    }
25

            
26
    /// Adds an operation to the transaction.
27
223209
    pub fn push(&mut self, operation: Operation) {
28
223209
        self.operations.push(operation);
29
223209
    }
30

            
31
    /// Appends an operation to the transaction and returns self.
32
    pub fn with(mut self, operation: Operation) -> Self {
33
        self.push(operation);
34
        self
35
    }
36
}
37

            
38
impl From<Operation> for Transaction {
39
491184
    fn from(operation: Operation) -> Self {
40
491184
        Self {
41
491184
            operations: vec![operation],
42
491184
        }
43
491184
    }
44
}
45

            
46
impl Transaction {
47
    /// Inserts a new document with `contents` into `collection`.  If `id` is
48
    /// `None` a unique id will be generated. If an id is provided and a
49
    /// document already exists with that id, a conflict error will be returned.
50
236158
    pub fn insert(
51
236158
        collection: CollectionName,
52
236158
        id: Option<DocumentId>,
53
236158
        contents: impl Into<Bytes>,
54
236158
    ) -> Self {
55
236158
        Self::from(Operation::insert(collection, id, contents))
56
236158
    }
57

            
58
    /// Updates a document in `collection`.
59
5048
    pub fn update(collection: CollectionName, header: Header, contents: impl Into<Bytes>) -> Self {
60
5048
        Self::from(Operation::update(collection, header, contents))
61
5048
    }
62

            
63
    /// Overwrites a document in `collection`. If a document with `id` exists,
64
    /// it will be overwritten. If a document with `id` doesn't exist, it will
65
    /// be created.
66
511
    pub fn overwrite(
67
511
        collection: CollectionName,
68
511
        id: DocumentId,
69
511
        contents: impl Into<Bytes>,
70
511
    ) -> Self {
71
511
        Self::from(Operation::overwrite(collection, id, contents))
72
511
    }
73

            
74
    /// Deletes a document from a `collection`.
75
37800
    pub fn delete(collection: CollectionName, header: Header) -> Self {
76
37800
        Self::from(Operation::delete(collection, header))
77
37800
    }
78
}
79

            
80
/// A single operation performed on a `Collection`.
81
28254
#[derive(Clone, Serialize, Deserialize, Debug)]
82
#[must_use]
83
pub struct Operation {
84
    /// The id of the `Collection`.
85
    pub collection: CollectionName,
86

            
87
    /// The command being performed.
88
    pub command: Command,
89
}
90

            
91
impl Operation {
92
    /// Inserts a new document with `contents` into `collection`.  If `id` is
93
    /// `None` a unique id will be generated. If an id is provided and a
94
    /// document already exists with that id, a conflict error will be returned.
95
244415
    pub fn insert(
96
244415
        collection: CollectionName,
97
244415
        id: Option<DocumentId>,
98
244415
        contents: impl Into<Bytes>,
99
244415
    ) -> Self {
100
244415
        Self {
101
244415
            collection,
102
244415
            command: Command::Insert {
103
244415
                id,
104
244415
                contents: contents.into(),
105
244415
            },
106
244415
        }
107
244415
    }
108

            
109
    /// Inserts a new document with the serialized representation of `contents`
110
    /// into `collection`. If `id` is `None` a unique id will be generated. If
111
    /// an id is provided and a document already exists with that id, a conflict
112
    /// error will be returned.
113
8252
    pub fn insert_serialized<C: SerializedCollection>(
114
8252
        id: Option<C::PrimaryKey>,
115
8252
        contents: &C::Contents,
116
8252
    ) -> Result<Self, Error> {
117
8252
        let id = id.map(DocumentId::new).transpose()?;
118
8252
        let contents = C::serialize(contents)?;
119
8252
        Ok(Self::insert(C::collection_name(), id, contents))
120
8252
    }
121

            
122
    /// Pushes a new document with the serialized representation of `contents`
123
    /// into `collection`.
124
    ///
125
    /// ## Automatic Id Assignment
126
    ///
127
    /// This function calls [`SerializedCollection::natural_id()`] to try to
128
    /// retrieve a primary key value from `contents`. If an id is returned, the
129
    /// item is inserted with that id. If an id is not returned, an id will be
130
    /// automatically assigned, if possible, by the storage backend, which uses
131
    /// the [`Key`](crate::key::Key) trait to assign ids.
132
    pub fn push_serialized<C: SerializedCollection>(contents: &C::Contents) -> Result<Self, Error> {
133
        let id = C::natural_id(contents);
134
        let id = id.map(DocumentId::new).transpose()?;
135
        let contents = C::serialize(contents)?;
136
        Ok(Self::insert(C::collection_name(), id, contents))
137
    }
138

            
139
    /// Updates a document in `collection`.
140
5048
    pub fn update(collection: CollectionName, header: Header, contents: impl Into<Bytes>) -> Self {
141
5048
        Self {
142
5048
            collection,
143
5048
            command: Command::Update {
144
5048
                header,
145
5048
                contents: contents.into(),
146
5048
            },
147
5048
        }
148
5048
    }
149

            
150
    /// Updates a document with the serialized representation of `contents` in
151
    /// `collection`.
152
    pub fn update_serialized<C: SerializedCollection>(
153
        header: CollectionHeader<C::PrimaryKey>,
154
        contents: &C::Contents,
155
    ) -> Result<Self, Error> {
156
        let contents = C::serialize(contents)?;
157
        Ok(Self::update(
158
            C::collection_name(),
159
            Header::try_from(header)?,
160
            contents,
161
        ))
162
    }
163

            
164
    /// Overwrites a document in `collection`. If a document with `id` exists,
165
    /// it will be overwritten. If a document with `id` doesn't exist, it will
166
    /// be created.
167
511
    pub fn overwrite(
168
511
        collection: CollectionName,
169
511
        id: DocumentId,
170
511
        contents: impl Into<Bytes>,
171
511
    ) -> Self {
172
511
        Self {
173
511
            collection,
174
511
            command: Command::Overwrite {
175
511
                id,
176
511
                contents: contents.into(),
177
511
            },
178
511
        }
179
511
    }
180

            
181
    /// Overwrites a document with the serialized representation of `contents`
182
    /// in `collection`. If a document with `id` exists, it will be overwritten.
183
    /// If a document with `id` doesn't exist, it will be created.
184
    pub fn overwrite_serialized<C: SerializedCollection>(
185
        id: C::PrimaryKey,
186
        contents: &C::Contents,
187
    ) -> Result<Self, Error> {
188
        let contents = C::serialize(contents)?;
189
        Ok(Self::overwrite(
190
            C::collection_name(),
191
            DocumentId::new(id)?,
192
            contents,
193
        ))
194
    }
195

            
196
    /// Deletes a document from a `collection`.
197
38070
    pub const fn delete(collection: CollectionName, header: Header) -> Self {
198
38070
        Self {
199
38070
            collection,
200
38070
            command: Command::Delete { header },
201
38070
        }
202
38070
    }
203
}
204

            
205
/// A command to execute within a `Collection`.
206
42381
#[derive(Clone, Serialize, Deserialize, Debug)]
207
pub enum Command {
208
    /// Inserts a new document containing `contents`.
209
    Insert {
210
        /// An optional id for the document. If this is `None`, a unique id will
211
        /// be generated. If this is `Some()` and a document already exists with
212
        /// that id, a conflict error will be returned.
213
        id: Option<DocumentId>,
214
        /// The initial contents of the document.
215
        contents: Bytes,
216
    },
217

            
218
    /// Update an existing `Document` identified by `header`. `header.revision` must match
219
    /// the currently stored revision on the `Document`. If it does not, the
220
    /// command fill fail with a `DocumentConflict` error.
221
    Update {
222
        /// The header of the `Document`. The revision must match the current
223
        /// document.
224
        header: Header,
225

            
226
        /// The new contents to store within the `Document`.
227
        contents: Bytes,
228
    },
229

            
230
    /// Overwrite an existing `Document` identified by `id`. The revision will
231
    /// not be checked before the document is updated. If the document does not
232
    /// exist, it will be created.
233
    Overwrite {
234
        /// The id of the document to overwrite.
235
        id: DocumentId,
236

            
237
        /// The new contents to store within the `Document`.
238
        contents: Bytes,
239
    },
240

            
241
    /// Delete an existing `Document` identified by `id`. `revision` must match
242
    /// the currently stored revision on the `Document`. If it does not, the
243
    /// command fill fail with a `DocumentConflict` error.
244
    Delete {
245
        /// The current header of the `Document`.
246
        header: Header,
247
    },
248
}
249

            
250
/// Information about the result of each `Operation` in a transaction.
251
40833
#[derive(Clone, Debug, Serialize, Deserialize)]
252
pub enum OperationResult {
253
    /// An operation succeeded but had no information to output.
254
    Success,
255

            
256
    /// A `Document` was updated.
257
    DocumentUpdated {
258
        /// The id of the `Collection` of the updated `Document`.
259
        collection: CollectionName,
260

            
261
        /// The header of the updated `Document`.
262
        header: Header,
263
    },
264

            
265
    /// A `Document` was deleted.
266
    DocumentDeleted {
267
        /// The id of the `Collection` of the deleted `Document`.
268
        collection: CollectionName,
269

            
270
        /// The id of the deleted `Document`.
271
        id: DocumentId,
272
    },
273
}
274

            
275
/// Details about an executed transaction.
276
11444
#[derive(Clone, Debug, Serialize, Deserialize)]
277
pub struct Executed {
278
    /// The id of the transaction.
279
    pub id: u64,
280

            
281
    /// A list of containing ids of `Documents` changed.
282
    pub changes: Changes,
283
}
284

            
285
/// A list of changes.
286
544242
#[derive(Clone, Debug, Serialize, Deserialize)]
287
pub enum Changes {
288
    /// A list of changed documents.
289
    Documents(DocumentChanges),
290
    /// A list of changed keys.
291
    Keys(Vec<ChangedKey>),
292
}
293

            
294
impl Changes {
295
    /// Returns the list of documents changed in this transaction, or None if
296
    /// the transaction was not a document transaction.
297
    #[must_use]
298
    pub const fn documents(&self) -> Option<&DocumentChanges> {
299
43578
        if let Self::Documents(changes) = self {
300
42930
            Some(changes)
301
        } else {
302
648
            None
303
        }
304
43578
    }
305

            
306
    /// Returns the list of keys changed in this transaction, or None if the
307
    /// transaction was not a `KeyValue` transaction.
308
    #[must_use]
309
    pub fn keys(&self) -> Option<&[ChangedKey]> {
310
5427
        if let Self::Keys(keys) = self {
311
3483
            Some(keys)
312
        } else {
313
1944
            None
314
        }
315
5427
    }
316
}
317

            
318
/// A list of changed documents.
319
1042818
#[derive(Clone, Debug, Serialize, Deserialize)]
320
pub struct DocumentChanges {
321
    /// All of the collections changed.
322
    pub collections: Vec<CollectionName>,
323
    /// The individual document changes.
324
    pub documents: Vec<ChangedDocument>,
325
}
326

            
327
impl DocumentChanges {
328
    /// Returns the changed document and the name of the collection the change
329
    /// happened to.
330
    #[must_use]
331
4
    pub fn get(&self, index: usize) -> Option<(&CollectionName, &ChangedDocument)> {
332
4
        self.documents.get(index).and_then(|doc| {
333
4
            self.collections
334
4
                .get(usize::from(doc.collection))
335
4
                .map(|collection| (collection, doc))
336
4
        })
337
4
    }
338

            
339
    /// Returns the number of changes in this collection.
340
    #[must_use]
341
1
    pub fn len(&self) -> usize {
342
1
        self.documents.len()
343
1
    }
344

            
345
    /// Returns true if there are no changes.
346
    #[must_use]
347
1
    pub fn is_empty(&self) -> bool {
348
1
        self.documents.is_empty()
349
1
    }
350

            
351
    /// Returns an interator over all of the changed documents.
352
1
    pub const fn iter(&self) -> DocumentChangesIter<'_> {
353
1
        DocumentChangesIter {
354
1
            changes: self,
355
1
            index: Some(0),
356
1
        }
357
1
    }
358
}
359

            
360
/// An iterator over [`DocumentChanges`].
361
#[must_use]
362
pub struct DocumentChangesIter<'a> {
363
    changes: &'a DocumentChanges,
364
    index: Option<usize>,
365
}
366

            
367
impl<'a> Iterator for DocumentChangesIter<'a> {
368
    type Item = (&'a CollectionName, &'a ChangedDocument);
369

            
370
4
    fn next(&mut self) -> Option<Self::Item> {
371
4
        self.index.and_then(|index| {
372
4
            let result = self.changes.get(index);
373
4
            if result.is_some() {
374
3
                self.index = index.checked_add(1);
375
3
            }
376
4
            result
377
4
        })
378
4
    }
379
}
380

            
381
/// A draining iterator over [`ChangedDocument`]s.
382
#[must_use]
383
pub struct DocumentChangesIntoIter {
384
    collections: Vec<CollectionName>,
385
    documents: std::vec::IntoIter<ChangedDocument>,
386
}
387

            
388
impl Iterator for DocumentChangesIntoIter {
389
    type Item = (CollectionName, ChangedDocument);
390

            
391
4
    fn next(&mut self) -> Option<Self::Item> {
392
4
        self.documents.next().and_then(|doc| {
393
4
            self.collections
394
4
                .get(usize::from(doc.collection))
395
4
                .map(|collection| (collection.clone(), doc))
396
4
        })
397
4
    }
398
}
399

            
400
impl IntoIterator for DocumentChanges {
401
    type Item = (CollectionName, ChangedDocument);
402

            
403
    type IntoIter = DocumentChangesIntoIter;
404

            
405
1
    fn into_iter(self) -> Self::IntoIter {
406
1
        DocumentChangesIntoIter {
407
1
            collections: self.collections,
408
1
            documents: self.documents.into_iter(),
409
1
        }
410
1
    }
411
}
412

            
413
1
#[test]
414
1
fn document_changes_iter() {
415
1
    let changes = DocumentChanges {
416
1
        collections: vec![CollectionName::private("a"), CollectionName::private("b")],
417
1
        documents: vec![
418
1
            ChangedDocument {
419
1
                collection: 0,
420
1
                id: DocumentId::from_u64(0),
421
1
                deleted: false,
422
1
            },
423
1
            ChangedDocument {
424
1
                collection: 0,
425
1
                id: DocumentId::from_u64(1),
426
1
                deleted: false,
427
1
            },
428
1
            ChangedDocument {
429
1
                collection: 1,
430
1
                id: DocumentId::from_u64(2),
431
1
                deleted: false,
432
1
            },
433
1
            ChangedDocument {
434
1
                collection: 2,
435
1
                id: DocumentId::from_u64(3),
436
1
                deleted: false,
437
1
            },
438
1
        ],
439
1
    };
440
1

            
441
1
    assert_eq!(changes.len(), 4);
442
1
    assert!(!changes.is_empty());
443

            
444
1
    let mut a_changes = 0;
445
1
    let mut b_changes = 0;
446
1
    let mut ids = Vec::new();
447
3
    for (collection, document) in changes.iter() {
448
3
        assert!(!ids.contains(&document.id));
449
3
        ids.push(document.id);
450
3
        match collection.name.as_ref() {
451
3
            "a" => a_changes += 1,
452
1
            "b" => b_changes += 1,
453
            _ => unreachable!("invalid collection name {collection}"),
454
        }
455
    }
456
1
    assert_eq!(a_changes, 2);
457
1
    assert_eq!(b_changes, 1);
458

            
459
1
    let mut a_changes = 0;
460
1
    let mut b_changes = 0;
461
1
    let mut ids = Vec::new();
462
4
    for (collection, document) in changes {
463
3
        assert!(!ids.contains(&document.id));
464
3
        ids.push(document.id);
465
3
        match collection.name.as_ref() {
466
3
            "a" => a_changes += 1,
467
1
            "b" => b_changes += 1,
468
            _ => unreachable!("invalid collection name {collection}"),
469
        }
470
    }
471
1
    assert_eq!(a_changes, 2);
472
1
    assert_eq!(b_changes, 1);
473
1
}
474

            
475
/// A record of a changed document.
476
1457662
#[derive(Debug, Clone, Serialize, Deserialize)]
477
pub struct ChangedDocument {
478
    /// The index of the `CollectionName` within the `collections` field of [`Changes::Documents`].
479
    pub collection: u16,
480

            
481
    /// The id of the changed `Document`.
482
    pub id: DocumentId,
483

            
484
    /// If the `Document` has been deleted, this will be `true`.
485
    pub deleted: bool,
486
}
487

            
488
/// A record of a changed `KeyValue` entry.
489
124548
#[derive(Clone, Debug, Serialize, Deserialize)]
490
pub struct ChangedKey {
491
    /// The namespace of the key.
492
    pub namespace: Option<String>,
493

            
494
    /// The key that was changed.
495
    pub key: String,
496

            
497
    /// True if the key was deleted.
498
    pub deleted: bool,
499
}