1
use arc_bytes::serde::Bytes;
2
use serde::{Deserialize, Serialize};
3

            
4
use crate::connection::{AsyncLowLevelConnection, LowLevelConnection};
5
use crate::document::{CollectionHeader, DocumentId, HasHeader, Header, Revision};
6
use crate::key::KeyEncoding;
7
use crate::schema::{Collection, CollectionName, SerializedCollection};
8
use crate::Error;
9

            
10
/// A list of operations to execute as a single unit. If any operation fails,
11
/// all changes are aborted. Transactions are ACID-compliant. ACID stands for:
12
///
13
/// - Atomic: All transactions are atomically applied. Readers outside of the
14
///   active transaction will never be able to read partially written data. In
15
///   BonsaiDb, readers are not blocked while writes are happening -- reads will
16
///   continue to read the existing value until the transaction is fully
17
///   executed. Once the transaction is fully executed, all future queries will
18
///   reflect the updated state immediately.
19
///
20
/// - Consistent: All transactions will be applied only if the data model is
21
///   able to remain fully consistent. This means that all constraints, such as
22
///   unique view keys, are validated before a transaction is allowed to be
23
///   committed.
24
///
25
/// - Isolated: Each transaction is executed in an isolated environment.
26
///   Currently, BonsaiDb does not offer interactive transactions, so this is
27
///   easily guaranteed. When BonsaiDb eventually has interactive transactions,
28
///   the transaction will have a fully isolated state until it is committed. No
29
///   two transactions can be affected by each other's changes.
30
///
31
///   In the event of a transaction being aborted or a power outage occurs while
32
///   a transaction is being applied, this isolation ensures that once BonsaiDb
33
///   opens the database again, the database will reflect the most recently
34
///   committed.
35
///
36
/// - Durable: When the transaction apply function has finished exectuing,
37
///   BonsaiDb guarantees that all data has been confirmed by the operating
38
///   system as being fully written to disk. This ensures that in the event of a
39
///   power outage, no data that has been confirmed will be lost.
40
///
41
/// When using one of the high-level functions to push/insert/update/delete
42
/// documents, behind the scenes single-[`Operation`] `Transaction`s are
43
/// applied. To ensure multiple changes happen in the same database operation,
44
/// multiple operations can be added to a `Transaction`:
45
///
46
/// ```rust
47
/// # bonsaidb_core::__doctest_prelude!();
48
/// # use bonsaidb_core::connection::Connection;
49
/// # fn test_fn<C: Connection>(db: &C) -> Result<(), Error> {
50
/// use bonsaidb_core::transaction::{Operation, Transaction};
51
/// let mut tx = Transaction::new();
52
/// tx.push(Operation::push_serialized::<MyCollection>(
53
///     &MyCollection::default(),
54
/// )?);
55
/// tx.push(Operation::push_serialized::<MyCollection>(
56
///     &MyCollection::default(),
57
/// )?);
58
/// let results = tx.apply(db)?;
59
/// assert_eq!(results.len(), 2);
60
/// println!("Two new documents inserted: {results:?}");
61
/// # Ok(())
62
/// # }
63
/// ```
64
360110
#[derive(Clone, Serialize, Deserialize, Default, Debug)]
65
#[must_use]
66
pub struct Transaction {
67
    /// The operations in this transaction.
68
    pub operations: Vec<Operation>,
69
}
70

            
71
impl Transaction {
72
    /// Returns a new, empty transaction.
73
24080
    pub fn new() -> Self {
74
24080
        Self::default()
75
24080
    }
76

            
77
    /// Adds an operation to the transaction.
78
442360
    pub fn push(&mut self, operation: Operation) {
79
442360
        self.operations.push(operation);
80
442360
    }
81

            
82
    /// Appends an operation to the transaction and returns self.
83
    pub fn with(mut self, operation: Operation) -> Self {
84
        self.push(operation);
85
        self
86
    }
87

            
88
    /// Applies the transaction to the `database`, returning the results of the
89
    /// operations. All operations will succeed or none will be performed and an
90
    /// error will be returned.
91
40
    pub fn apply<Connection: LowLevelConnection>(
92
40
        self,
93
40
        db: &Connection,
94
40
    ) -> Result<Vec<OperationResult>, Error> {
95
40
        db.apply_transaction(self)
96
40
    }
97

            
98
    /// Applies the transaction to the `database`, returning the results of the
99
    /// operations. All operations will succeed or none will be performed and an
100
    /// error will be returned.
101
555
    pub async fn apply_async<Connection: AsyncLowLevelConnection>(
102
555
        self,
103
555
        db: &Connection,
104
555
    ) -> Result<Vec<OperationResult>, Error> {
105
4037
        db.apply_transaction(self).await
106
555
    }
107
}
108

            
109
impl From<Operation> for Transaction {
110
903240
    fn from(operation: Operation) -> Self {
111
903240
        Self {
112
903240
            operations: vec![operation],
113
903240
        }
114
903240
    }
115
}
116

            
117
impl Transaction {
118
    /// Inserts a new document with `contents` into `collection`.  If `id` is
119
    /// `None` a unique id will be generated. If an id is provided and a
120
    /// document already exists with that id, a conflict error will be returned.
121
390716
    pub fn insert(
122
390716
        collection: CollectionName,
123
390716
        id: Option<DocumentId>,
124
390716
        contents: impl Into<Bytes>,
125
390716
    ) -> Self {
126
390716
        Self::from(Operation::insert(collection, id, contents))
127
390716
    }
128

            
129
    /// Updates a document in `collection`.
130
204772
    pub fn update(collection: CollectionName, header: Header, contents: impl Into<Bytes>) -> Self {
131
204772
        Self::from(Operation::update(collection, header, contents))
132
204772
    }
133

            
134
    /// Overwrites a document in `collection`. If a document with `id` exists,
135
    /// it will be overwritten. If a document with `id` doesn't exist, it will
136
    /// be created.
137
24
    pub fn overwrite(
138
24
        collection: CollectionName,
139
24
        id: DocumentId,
140
24
        contents: impl Into<Bytes>,
141
24
    ) -> Self {
142
24
        Self::from(Operation::overwrite(collection, id, contents))
143
24
    }
144

            
145
    /// Deletes a document from a `collection`.
146
62360
    pub fn delete(collection: CollectionName, header: Header) -> Self {
147
62360
        Self::from(Operation::delete(collection, header))
148
62360
    }
149
}
150

            
151
/// A single operation performed on a `Collection`.
152
547094
#[derive(Clone, Serialize, Deserialize, Debug)]
153
#[must_use]
154
pub struct Operation {
155
    /// The id of the `Collection`.
156
    pub collection: CollectionName,
157

            
158
    /// The command being performed.
159
    pub command: Command,
160
}
161

            
162
impl Operation {
163
    /// Inserts a new document with `contents` into `collection`.  If `id` is
164
    /// `None` a unique id will be generated. If an id is provided and a
165
    /// document already exists with that id, a conflict error will be returned.
166
780146
    pub fn insert(
167
780146
        collection: CollectionName,
168
780146
        id: Option<DocumentId>,
169
780146
        contents: impl Into<Bytes>,
170
780146
    ) -> Self {
171
780146
        Self {
172
780146
            collection,
173
780146
            command: Command::Insert {
174
780146
                id,
175
780146
                contents: contents.into(),
176
780146
            },
177
780146
        }
178
780146
    }
179

            
180
    /// Inserts a new document with the serialized representation of `contents`
181
    /// into `collection`. If `id` is `None` a unique id will be generated. If
182
    /// an id is provided and a document already exists with that id, a conflict
183
    /// error will be returned.
184
7732
    pub fn insert_serialized<C: SerializedCollection>(
185
7732
        id: Option<&C::PrimaryKey>,
186
7732
        contents: &C::Contents,
187
7732
    ) -> Result<Self, Error> {
188
7732
        let id = id.map(DocumentId::new).transpose()?;
189
7732
        let contents = C::serialize(contents)?;
190
7732
        Ok(Self::insert(C::collection_name(), id, contents))
191
7732
    }
192

            
193
    /// Pushes a new document with the serialized representation of `contents`
194
    /// into `collection`.
195
    ///
196
    /// ## Automatic ID Assignment
197
    ///
198
    /// This function calls [`SerializedCollection::natural_id()`] to try to
199
    /// retrieve a primary key value from `contents`. If an id is returned, the
200
    /// item is inserted with that id. If an id is not returned, an id will be
201
    /// automatically assigned, if possible, by the storage backend, which uses
202
    /// the [`Key`](crate::key::Key) trait to assign ids.
203
2974
    pub fn push_serialized<C: SerializedCollection>(contents: &C::Contents) -> Result<Self, Error> {
204
2974
        let id = C::natural_id(contents);
205
2974
        let id = id.as_ref().map(DocumentId::new).transpose()?;
206
2974
        let contents = C::serialize(contents)?;
207
2974
        Ok(Self::insert(C::collection_name(), id, contents))
208
2974
    }
209

            
210
    /// Updates a document in `collection`.
211
204920
    pub fn update(collection: CollectionName, header: Header, contents: impl Into<Bytes>) -> Self {
212
204920
        Self {
213
204920
            collection,
214
204920
            command: Command::Update {
215
204920
                header,
216
204920
                contents: contents.into(),
217
204920
            },
218
204920
        }
219
204920
    }
220

            
221
    /// Updates a document with the serialized representation of `contents` in
222
    /// `collection`.
223
8
    pub fn update_serialized<C: SerializedCollection>(
224
8
        header: CollectionHeader<C::PrimaryKey>,
225
8
        contents: &C::Contents,
226
8
    ) -> Result<Self, Error> {
227
8
        let contents = C::serialize(contents)?;
228
        Ok(Self::update(
229
8
            C::collection_name(),
230
8
            Header::try_from(header)?,
231
8
            contents,
232
        ))
233
8
    }
234

            
235
    /// Overwrites a document in `collection`. If a document with `id` exists,
236
    /// it will be overwritten. If a document with `id` doesn't exist, it will
237
    /// be created.
238
32
    pub fn overwrite(
239
32
        collection: CollectionName,
240
32
        id: DocumentId,
241
32
        contents: impl Into<Bytes>,
242
32
    ) -> Self {
243
32
        Self {
244
32
            collection,
245
32
            command: Command::Overwrite {
246
32
                id,
247
32
                contents: contents.into(),
248
32
            },
249
32
        }
250
32
    }
251

            
252
    /// Overwrites a document with the serialized representation of `contents`
253
    /// in `collection`. If a document with `id` exists, it will be overwritten.
254
    /// If a document with `id` doesn't exist, it will be created.
255
8
    pub fn overwrite_serialized<C: SerializedCollection, Key>(
256
8
        id: &Key,
257
8
        contents: &C::Contents,
258
8
    ) -> Result<Self, Error>
259
8
    where
260
8
        Key: KeyEncoding<C::PrimaryKey> + ?Sized,
261
8
    {
262
8
        let contents = C::serialize(contents)?;
263
        Ok(Self::overwrite(
264
8
            C::collection_name(),
265
8
            DocumentId::new(id)?,
266
8
            contents,
267
        ))
268
8
    }
269

            
270
    /// Deletes a document from a `collection`.
271
67800
    pub const fn delete(collection: CollectionName, header: Header) -> Self {
272
67800
        Self {
273
67800
            collection,
274
67800
            command: Command::Delete { header },
275
67800
        }
276
67800
    }
277

            
278
    /// Check that the document `id` still exists in `collection`. If a document
279
    /// with that id is not present, the transaction will not be applied and
280
    /// [`Error::DocumentNotFound`] will be returned.
281
    ///
282
    /// Upon success, [`OperationResult::Success`] will be included in the
283
    /// transaction's results.
284
1920
    pub const fn check_document_id_exists(collection: CollectionName, id: DocumentId) -> Self {
285
1920
        Self {
286
1920
            collection,
287
1920
            command: Command::Check { id, revision: None },
288
1920
        }
289
1920
    }
290

            
291
    /// Check that the document `id` still exists in [`Collection`] `C`. If a
292
    /// document with that id is not present, the transaction will not be
293
    /// applied and [`Error::DocumentNotFound`] will be returned.
294
    ///
295
    /// Upon success, [`OperationResult::Success`] will be included in the
296
    /// transaction's results.
297
48
    pub fn check_document_exists<C: Collection>(id: &C::PrimaryKey) -> Result<Self, Error> {
298
48
        Ok(Self::check_document_id_exists(
299
48
            C::collection_name(),
300
48
            DocumentId::new(id)?,
301
        ))
302
48
    }
303

            
304
    /// Check that the header of `doc_or_header` is the current revision of the
305
    /// stored document in [`Collection`] `C`. If a document with the header's
306
    /// id is not present, the transaction will not be applied and
307
    /// [`Error::DocumentNotFound`] will be returned. If a document with the
308
    /// header's id is present and the revision does not match, the transaction
309
    /// will not be applied and [`Error::DocumentConflict`] will be returned.
310
    ///
311
    /// Upon success, [`OperationResult::Success`] will be included in the
312
    /// transaction's results.
313
24
    pub fn check_document_is_current<C: Collection, H: HasHeader>(
314
24
        doc_or_header: &H,
315
24
    ) -> Result<Self, Error> {
316
24
        let header = doc_or_header.header()?;
317
24
        Ok(Self {
318
24
            collection: C::collection_name(),
319
24
            command: Command::Check {
320
24
                id: header.id,
321
24
                revision: Some(header.revision),
322
24
            },
323
24
        })
324
24
    }
325
}
326

            
327
/// A command to execute within a `Collection`.
328
578525
#[derive(Clone, Serialize, Deserialize, Debug)]
329
pub enum Command {
330
    /// Inserts a new document containing `contents`.
331
    Insert {
332
        /// An optional id for the document. If this is `None`, a unique id will
333
        /// be generated. If this is `Some()` and a document already exists with
334
        /// that id, a conflict error will be returned.
335
        id: Option<DocumentId>,
336
        /// The initial contents of the document.
337
        contents: Bytes,
338
    },
339

            
340
    /// Update an existing `Document` identified by `header`. `header.revision` must match
341
    /// the currently stored revision on the `Document`. If it does not, the
342
    /// command fill fail with a `DocumentConflict` error.
343
    Update {
344
        /// The header of the `Document`. The revision must match the current
345
        /// document.
346
        header: Header,
347

            
348
        /// The new contents to store within the `Document`.
349
        contents: Bytes,
350
    },
351

            
352
    /// Overwrite an existing `Document` identified by `id`. The revision will
353
    /// not be checked before the document is updated. If the document does not
354
    /// exist, it will be created.
355
    Overwrite {
356
        /// The id of the document to overwrite.
357
        id: DocumentId,
358

            
359
        /// The new contents to store within the `Document`.
360
        contents: Bytes,
361
    },
362

            
363
    /// Delete an existing `Document` identified by `id`. `revision` must match
364
    /// the currently stored revision on the `Document`. If it does not, the
365
    /// command fill fail with a `DocumentConflict` error.
366
    Delete {
367
        /// The current header of the `Document`.
368
        header: Header,
369
    },
370

            
371
    /// Checks whether a document exists, and optionally whether its revision is
372
    /// still current. If the document is not found, a `DocumentNotFound` error
373
    /// will be returned.  If the document revision is provided and does not
374
    /// match, a `DocumentConflict` error will be returned.
375
    Check {
376
        /// The id of the document to check.
377
        id: DocumentId,
378
        /// The revision of the document to check.
379
        revision: Option<Revision>,
380
    },
381
}
382

            
383
/// Information about the result of each `Operation` in a transaction.
384
3473760
#[derive(Clone, Debug, Serialize, Deserialize)]
385
pub enum OperationResult {
386
    /// An operation succeeded but had no information to output.
387
    Success,
388

            
389
    /// A `Document` was updated.
390
    DocumentUpdated {
391
        /// The id of the `Collection` of the updated `Document`.
392
        collection: CollectionName,
393

            
394
        /// The header of the updated `Document`.
395
        header: Header,
396
    },
397

            
398
    /// A `Document` was deleted.
399
    DocumentDeleted {
400
        /// The id of the `Collection` of the deleted `Document`.
401
        collection: CollectionName,
402

            
403
        /// The id of the deleted `Document`.
404
        id: DocumentId,
405
    },
406
}
407

            
408
/// Details about an executed transaction.
409
1174950
#[derive(Clone, Debug, Serialize, Deserialize)]
410
pub struct Executed {
411
    /// The id of the transaction.
412
    pub id: u64,
413

            
414
    /// A list of containing ids of `Documents` changed.
415
    pub changes: Changes,
416
}
417

            
418
/// A list of changes.
419
1268584
#[derive(Clone, Debug, Serialize, Deserialize)]
420
pub enum Changes {
421
    /// A list of changed documents.
422
    Documents(DocumentChanges),
423
    /// A list of changed keys.
424
    Keys(Vec<ChangedKey>),
425
}
426

            
427
impl Changes {
428
    /// Returns the list of documents changed in this transaction, or None if
429
    /// the transaction was not a document transaction.
430
    #[must_use]
431
64920
    pub const fn documents(&self) -> Option<&DocumentChanges> {
432
64920
        if let Self::Documents(changes) = self {
433
63960
            Some(changes)
434
        } else {
435
960
            None
436
        }
437
64920
    }
438

            
439
    /// Returns the list of keys changed in this transaction, or None if the
440
    /// transaction was not a `KeyValue` transaction.
441
    #[must_use]
442
9840
    pub fn keys(&self) -> Option<&[ChangedKey]> {
443
9840
        if let Self::Keys(keys) = self {
444
6960
            Some(keys)
445
        } else {
446
2880
            None
447
        }
448
9840
    }
449
}
450

            
451
/// A list of changed documents.
452
2999420
#[derive(Clone, Debug, Serialize, Deserialize)]
453
pub struct DocumentChanges {
454
    /// All of the collections changed.
455
    pub collections: Vec<CollectionName>,
456
    /// The individual document changes.
457
    pub documents: Vec<ChangedDocument>,
458
}
459

            
460
impl DocumentChanges {
461
    /// Returns the changed document and the name of the collection the change
462
    /// happened to.
463
    #[must_use]
464
4
    pub fn get(&self, index: usize) -> Option<(&CollectionName, &ChangedDocument)> {
465
4
        self.documents.get(index).and_then(|doc| {
466
4
            self.collections
467
4
                .get(usize::from(doc.collection))
468
4
                .map(|collection| (collection, doc))
469
4
        })
470
4
    }
471

            
472
    /// Returns the number of changes in this collection.
473
    #[must_use]
474
1
    pub fn len(&self) -> usize {
475
1
        self.documents.len()
476
1
    }
477

            
478
    /// Returns true if there are no changes.
479
    #[must_use]
480
1
    pub fn is_empty(&self) -> bool {
481
1
        self.documents.is_empty()
482
1
    }
483

            
484
    /// Returns an interator over all of the changed documents.
485
1
    pub const fn iter(&self) -> DocumentChangesIter<'_> {
486
1
        DocumentChangesIter {
487
1
            changes: self,
488
1
            index: Some(0),
489
1
        }
490
1
    }
491
}
492

            
493
impl<'a> IntoIterator for &'a DocumentChanges {
494
    type IntoIter = DocumentChangesIter<'a>;
495
    type Item = (&'a CollectionName, &'a ChangedDocument);
496

            
497
1
    fn into_iter(self) -> Self::IntoIter {
498
1
        self.iter()
499
1
    }
500
}
501

            
502
/// An iterator over [`DocumentChanges`].
503
#[must_use]
504
pub struct DocumentChangesIter<'a> {
505
    changes: &'a DocumentChanges,
506
    index: Option<usize>,
507
}
508

            
509
impl<'a> Iterator for DocumentChangesIter<'a> {
510
    type Item = (&'a CollectionName, &'a ChangedDocument);
511

            
512
4
    fn next(&mut self) -> Option<Self::Item> {
513
4
        self.index.and_then(|index| {
514
4
            let result = self.changes.get(index);
515
4
            if result.is_some() {
516
3
                self.index = index.checked_add(1);
517
3
            }
518
4
            result
519
4
        })
520
4
    }
521
}
522

            
523
/// A draining iterator over [`ChangedDocument`]s.
524
#[must_use]
525
pub struct DocumentChangesIntoIter {
526
    collections: Vec<CollectionName>,
527
    documents: std::vec::IntoIter<ChangedDocument>,
528
}
529

            
530
impl Iterator for DocumentChangesIntoIter {
531
    type Item = (CollectionName, ChangedDocument);
532

            
533
4
    fn next(&mut self) -> Option<Self::Item> {
534
4
        self.documents.next().and_then(|doc| {
535
4
            self.collections
536
4
                .get(usize::from(doc.collection))
537
4
                .map(|collection| (collection.clone(), doc))
538
4
        })
539
4
    }
540
}
541

            
542
impl IntoIterator for DocumentChanges {
543
    type IntoIter = DocumentChangesIntoIter;
544
    type Item = (CollectionName, ChangedDocument);
545

            
546
1
    fn into_iter(self) -> Self::IntoIter {
547
1
        DocumentChangesIntoIter {
548
1
            collections: self.collections,
549
1
            documents: self.documents.into_iter(),
550
1
        }
551
1
    }
552
}
553

            
554
1
#[test]
555
1
fn document_changes_iter() {
556
1
    use crate::schema::Qualified;
557
1

            
558
1
    let changes = DocumentChanges {
559
1
        collections: vec![CollectionName::private("a"), CollectionName::private("b")],
560
1
        documents: vec![
561
1
            ChangedDocument {
562
1
                collection: 0,
563
1
                id: DocumentId::from_u64(0),
564
1
                deleted: false,
565
1
            },
566
1
            ChangedDocument {
567
1
                collection: 0,
568
1
                id: DocumentId::from_u64(1),
569
1
                deleted: false,
570
1
            },
571
1
            ChangedDocument {
572
1
                collection: 1,
573
1
                id: DocumentId::from_u64(2),
574
1
                deleted: false,
575
1
            },
576
1
            ChangedDocument {
577
1
                collection: 2,
578
1
                id: DocumentId::from_u64(3),
579
1
                deleted: false,
580
1
            },
581
1
        ],
582
1
    };
583
1

            
584
1
    assert_eq!(changes.len(), 4);
585
1
    assert!(!changes.is_empty());
586

            
587
1
    let mut a_changes = 0;
588
1
    let mut b_changes = 0;
589
1
    let mut ids = Vec::new();
590
4
    for (collection, document) in &changes {
591
3
        assert!(!ids.contains(&document.id));
592
3
        ids.push(document.id.clone());
593
3
        match collection.name.as_ref() {
594
3
            "a" => a_changes += 1,
595
1
            "b" => b_changes += 1,
596
            _ => unreachable!("invalid collection name {collection}"),
597
        }
598
    }
599
1
    assert_eq!(a_changes, 2);
600
1
    assert_eq!(b_changes, 1);
601

            
602
1
    let mut a_changes = 0;
603
1
    let mut b_changes = 0;
604
1
    let mut ids = Vec::new();
605
4
    for (collection, document) in changes {
606
3
        assert!(!ids.contains(&document.id));
607
3
        ids.push(document.id.clone());
608
3
        match collection.name.as_ref() {
609
3
            "a" => a_changes += 1,
610
1
            "b" => b_changes += 1,
611
            _ => unreachable!("invalid collection name {collection}"),
612
        }
613
    }
614
1
    assert_eq!(a_changes, 2);
615
1
    assert_eq!(b_changes, 1);
616
1
}
617

            
618
/// A record of a changed document.
619
4199188
#[derive(Debug, Clone, Serialize, Deserialize)]
620
pub struct ChangedDocument {
621
    /// The index of the `CollectionName` within the `collections` field of [`Changes::Documents`].
622
    pub collection: u16,
623

            
624
    /// The id of the changed `Document`.
625
    pub id: DocumentId,
626

            
627
    /// If the `Document` has been deleted, this will be `true`.
628
    pub deleted: bool,
629
}
630

            
631
/// A record of a changed `KeyValue` entry.
632
180508
#[derive(Clone, Debug, Serialize, Deserialize)]
633
pub struct ChangedKey {
634
    /// The namespace of the key.
635
    pub namespace: Option<String>,
636

            
637
    /// The key that was changed.
638
    pub key: String,
639

            
640
    /// True if the key was deleted.
641
    pub deleted: bool,
642
}