1
use arc_bytes::serde::Bytes;
2
use serde::{Deserialize, Serialize};
3

            
4
use crate::connection::{AsyncLowLevelConnection, LowLevelConnection};
5
use crate::document::{CollectionHeader, DocumentId, HasHeader, Header, Revision};
6
use crate::key::KeyEncoding;
7
use crate::schema::{Collection, CollectionName, SerializedCollection};
8
use crate::Error;
9

            
10
/// A list of operations to execute as a single unit. If any operation fails,
11
/// all changes are aborted. Transactions are ACID-compliant. ACID stands for:
12
///
13
/// - Atomic: All transactions are atomically applied. Readers outside of the
14
///   active transaction will never be able to read partially written data. In
15
///   BonsaiDb, readers are not blocked while writes are happening -- reads will
16
///   continue to read the existing value until the transaction is fully
17
///   executed. Once the transaction is fully executed, all future queries will
18
///   reflect the updated state immediately.
19
///
20
/// - Consistent: All transactions will be applied only if the data model is
21
///   able to remain fully consistent. This means that all constraints, such as
22
///   unique view keys, are validated before a transaction is allowed to be
23
///   committed.
24
///
25
/// - Isolated: Each transaction is executed in an isolated environment.
26
///   Currently, BonsaiDb does not offer interactive transactions, so this is
27
///   easily guaranteed. When BonsaiDb eventually has interactive transactions,
28
///   the transaction will have a fully isolated state until it is committed. No
29
///   two transactions can be affected by each other's changes.
30
///
31
///   In the event of a transaction being aborted or a power outage occurs while
32
///   a transaction is being applied, this isolation ensures that once BonsaiDb
33
///   opens the database again, the database will reflect the most recently
34
///   committed.
35
///
36
/// - Durable: When the transaction apply function has finished exectuing,
37
///   BonsaiDb guarantees that all data has been confirmed by the operating
38
///   system as being fully written to disk. This ensures that in the event of a
39
///   power outage, no data that has been confirmed will be lost.
40
///
41
/// When using one of the high-level functions to push/insert/update/delete
42
/// documents, behind the scenes single-[`Operation`] `Transaction`s are
43
/// applied. To ensure multiple changes happen in the same database operation,
44
/// multiple operations can be added to a `Transaction`:
45
///
46
/// ```rust
47
/// # bonsaidb_core::__doctest_prelude!();
48
/// # use bonsaidb_core::connection::Connection;
49
/// # fn test_fn<C: Connection>(db: &C) -> Result<(), Error> {
50
/// use bonsaidb_core::transaction::{Operation, Transaction};
51
/// let mut tx = Transaction::new();
52
/// tx.push(Operation::push_serialized::<MyCollection>(
53
///     &MyCollection::default(),
54
/// )?);
55
/// tx.push(Operation::push_serialized::<MyCollection>(
56
///     &MyCollection::default(),
57
/// )?);
58
/// let results = tx.apply(db)?;
59
/// assert_eq!(results.len(), 2);
60
/// println!("Two new documents inserted: {results:?}");
61
/// # Ok(())
62
/// # }
63
/// ```
64
228780
#[derive(Clone, Serialize, Deserialize, Default, Debug)]
65
#[must_use]
66
pub struct Transaction {
67
    /// The operations in this transaction.
68
    pub operations: Vec<Operation>,
69
}
70

            
71
impl Transaction {
72
    /// Returns a new, empty transaction.
73
24040
    pub fn new() -> Self {
74
24040
        Self::default()
75
24040
    }
76

            
77
    /// Adds an operation to the transaction.
78
433880
    pub fn push(&mut self, operation: Operation) {
79
433880
        self.operations.push(operation);
80
433880
    }
81

            
82
    /// Appends an operation to the transaction and returns self.
83
    pub fn with(mut self, operation: Operation) -> Self {
84
        self.push(operation);
85
        self
86
    }
87

            
88
    /// Applies the transaction to the `database`, returning the results of the
89
    /// operations. All operations will succeed or none will be performed and an
90
    /// error will be returned.
91
40
    pub fn apply<Connection: LowLevelConnection>(
92
40
        self,
93
40
        db: &Connection,
94
40
    ) -> Result<Vec<OperationResult>, Error> {
95
40
        db.apply_transaction(self)
96
40
    }
97

            
98
    /// Applies the transaction to the `database`, returning the results of the
99
    /// operations. All operations will succeed or none will be performed and an
100
    /// error will be returned.
101
555
    pub async fn apply_async<Connection: AsyncLowLevelConnection>(
102
555
        self,
103
555
        db: &Connection,
104
555
    ) -> Result<Vec<OperationResult>, Error> {
105
4926
        db.apply_transaction(self).await
106
555
    }
107
}
108

            
109
impl From<Operation> for Transaction {
110
912280
    fn from(operation: Operation) -> Self {
111
912280
        Self {
112
912280
            operations: vec![operation],
113
912280
        }
114
912280
    }
115
}
116

            
117
impl Transaction {
118
    /// Inserts a new document with `contents` into `collection`.  If `id` is
119
    /// `None` a unique id will be generated. If an id is provided and a
120
    /// document already exists with that id, a conflict error will be returned.
121
404106
    pub fn insert(
122
404106
        collection: CollectionName,
123
404106
        id: Option<DocumentId>,
124
404106
        contents: impl Into<Bytes>,
125
404106
    ) -> Self {
126
404106
        Self::from(Operation::insert(collection, id, contents))
127
404106
    }
128

            
129
    /// Updates a document in `collection`.
130
215786
    pub fn update(collection: CollectionName, header: Header, contents: impl Into<Bytes>) -> Self {
131
215786
        Self::from(Operation::update(collection, header, contents))
132
215786
    }
133

            
134
    /// Overwrites a document in `collection`. If a document with `id` exists,
135
    /// it will be overwritten. If a document with `id` doesn't exist, it will
136
    /// be created.
137
24
    pub fn overwrite(
138
24
        collection: CollectionName,
139
24
        id: DocumentId,
140
24
        contents: impl Into<Bytes>,
141
24
    ) -> Self {
142
24
        Self::from(Operation::overwrite(collection, id, contents))
143
24
    }
144

            
145
    /// Deletes a document from a `collection`.
146
62840
    pub fn delete(collection: CollectionName, header: Header) -> Self {
147
62840
        Self::from(Operation::delete(collection, header))
148
62840
    }
149
}
150

            
151
/// A single operation performed on a `Collection`.
152
342266
#[derive(Clone, Serialize, Deserialize, Debug)]
153
#[must_use]
154
pub struct Operation {
155
    /// The id of the `Collection`.
156
    pub collection: CollectionName,
157

            
158
    /// The command being performed.
159
    pub command: Command,
160
}
161

            
162
impl Operation {
163
    /// Inserts a new document with `contents` into `collection`.  If `id` is
164
    /// `None` a unique id will be generated. If an id is provided and a
165
    /// document already exists with that id, a conflict error will be returned.
166
796509
    pub fn insert(
167
796509
        collection: CollectionName,
168
796509
        id: Option<DocumentId>,
169
796509
        contents: impl Into<Bytes>,
170
796509
    ) -> Self {
171
796509
        Self {
172
796509
            collection,
173
796509
            command: Command::Insert {
174
796509
                id,
175
796509
                contents: contents.into(),
176
796509
            },
177
796509
        }
178
796509
    }
179

            
180
    /// Inserts a new document with the serialized representation of `contents`
181
    /// into `collection`. If `id` is `None` a unique id will be generated. If
182
    /// an id is provided and a document already exists with that id, a conflict
183
    /// error will be returned.
184
8132
    pub fn insert_serialized<C: SerializedCollection>(
185
8132
        id: Option<&C::PrimaryKey>,
186
8132
        contents: &C::Contents,
187
8132
    ) -> Result<Self, Error> {
188
8132
        let id = id.map(DocumentId::new).transpose()?;
189
8132
        let contents = C::serialize(contents)?;
190
8132
        Ok(Self::insert(C::collection_name(), id, contents))
191
8132
    }
192

            
193
    /// Pushes a new document with the serialized representation of `contents`
194
    /// into `collection`.
195
    ///
196
    /// ## Automatic ID Assignment
197
    ///
198
    /// This function calls [`SerializedCollection::natural_id()`] to try to
199
    /// retrieve a primary key value from `contents`. If an id is returned, the
200
    /// item is inserted with that id. If an id is not returned, an id will be
201
    /// automatically assigned, if possible, by the storage backend, which uses
202
    /// the [`Key`](crate::key::Key) trait to assign ids.
203
2362
    pub fn push_serialized<C: SerializedCollection>(contents: &C::Contents) -> Result<Self, Error> {
204
2362
        let id = C::natural_id(contents);
205
2362
        let id = id.as_ref().map(DocumentId::new).transpose()?;
206
2362
        let contents = C::serialize(contents)?;
207
2362
        Ok(Self::insert(C::collection_name(), id, contents))
208
2362
    }
209

            
210
    /// Updates a document in `collection`.
211
215938
    pub fn update(collection: CollectionName, header: Header, contents: impl Into<Bytes>) -> Self {
212
215938
        Self {
213
215938
            collection,
214
215938
            command: Command::Update {
215
215938
                header,
216
215938
                contents: contents.into(),
217
215938
            },
218
215938
        }
219
215938
    }
220

            
221
    /// Updates a document with the serialized representation of `contents` in
222
    /// `collection`.
223
8
    pub fn update_serialized<C: SerializedCollection>(
224
8
        header: CollectionHeader<C::PrimaryKey>,
225
8
        contents: &C::Contents,
226
8
    ) -> Result<Self, Error> {
227
8
        let contents = C::serialize(contents)?;
228
        Ok(Self::update(
229
8
            C::collection_name(),
230
8
            Header::try_from(header)?,
231
8
            contents,
232
        ))
233
8
    }
234

            
235
    /// Overwrites a document in `collection`. If a document with `id` exists,
236
    /// it will be overwritten. If a document with `id` doesn't exist, it will
237
    /// be created.
238
32
    pub fn overwrite(
239
32
        collection: CollectionName,
240
32
        id: DocumentId,
241
32
        contents: impl Into<Bytes>,
242
32
    ) -> Self {
243
32
        Self {
244
32
            collection,
245
32
            command: Command::Overwrite {
246
32
                id,
247
32
                contents: contents.into(),
248
32
            },
249
32
        }
250
32
    }
251

            
252
    /// Overwrites a document with the serialized representation of `contents`
253
    /// in `collection`. If a document with `id` exists, it will be overwritten.
254
    /// If a document with `id` doesn't exist, it will be created.
255
8
    pub fn overwrite_serialized<C: SerializedCollection, Key>(
256
8
        id: &Key,
257
8
        contents: &C::Contents,
258
8
    ) -> Result<Self, Error>
259
8
    where
260
8
        Key: KeyEncoding<C::PrimaryKey> + ?Sized,
261
8
    {
262
8
        let contents = C::serialize(contents)?;
263
        Ok(Self::overwrite(
264
8
            C::collection_name(),
265
8
            DocumentId::new(id)?,
266
8
            contents,
267
        ))
268
8
    }
269

            
270
    /// Deletes a document from a `collection`.
271
68280
    pub const fn delete(collection: CollectionName, header: Header) -> Self {
272
68280
        Self {
273
68280
            collection,
274
68280
            command: Command::Delete { header },
275
68280
        }
276
68280
    }
277

            
278
    /// Check that the document `id` still exists in `collection`. If a document
279
    /// with that id is not present, the transaction will not be applied and
280
    /// [`Error::DocumentNotFound`] will be returned.
281
    ///
282
    /// Upon success, [`OperationResult::Success`] will be included in the
283
    /// transaction's results.
284
1920
    pub const fn check_document_id_exists(collection: CollectionName, id: DocumentId) -> Self {
285
1920
        Self {
286
1920
            collection,
287
1920
            command: Command::Check { id, revision: None },
288
1920
        }
289
1920
    }
290

            
291
    /// Check that the document `id` still exists in [`Collection`] `C`. If a
292
    /// document with that id is not present, the transaction will not be
293
    /// applied and [`Error::DocumentNotFound`] will be returned.
294
    ///
295
    /// Upon success, [`OperationResult::Success`] will be included in the
296
    /// transaction's results.
297
48
    pub fn check_document_exists<C: Collection>(id: &C::PrimaryKey) -> Result<Self, Error> {
298
48
        Ok(Self::check_document_id_exists(
299
48
            C::collection_name(),
300
48
            DocumentId::new(id)?,
301
        ))
302
48
    }
303

            
304
    /// Check that the header of `doc_or_header` is the current revision of the
305
    /// stored document in [`Collection`] `C`. If a document with the header's
306
    /// id is not present, the transaction will not be applied and
307
    /// [`Error::DocumentNotFound`] will be returned. If a document with the
308
    /// header's id is present and the revision does not match, the transaction
309
    /// will not be applied and [`Error::DocumentConflict`] will be returned.
310
    ///
311
    /// Upon success, [`OperationResult::Success`] will be included in the
312
    /// transaction's results.
313
24
    pub fn check_document_is_current<C: Collection, H: HasHeader>(
314
24
        doc_or_header: &H,
315
24
    ) -> Result<Self, Error> {
316
24
        let header = doc_or_header.header()?;
317
24
        Ok(Self {
318
24
            collection: C::collection_name(),
319
24
            command: Command::Check {
320
24
                id: header.id,
321
24
                revision: Some(header.revision),
322
24
            },
323
24
        })
324
24
    }
325
}
326

            
327
/// A command to execute within a `Collection`.
328
373693
#[derive(Clone, Serialize, Deserialize, Debug)]
329
pub enum Command {
330
    /// Inserts a new document containing `contents`.
331
    Insert {
332
        /// An optional id for the document. If this is `None`, a unique id will
333
        /// be generated. If this is `Some()` and a document already exists with
334
        /// that id, a conflict error will be returned.
335
        id: Option<DocumentId>,
336
        /// The initial contents of the document.
337
        contents: Bytes,
338
    },
339

            
340
    /// Update an existing `Document` identified by `header`. `header.revision` must match
341
    /// the currently stored revision on the `Document`. If it does not, the
342
    /// command fill fail with a `DocumentConflict` error.
343
    Update {
344
        /// The header of the `Document`. The revision must match the current
345
        /// document.
346
        header: Header,
347

            
348
        /// The new contents to store within the `Document`.
349
        contents: Bytes,
350
    },
351

            
352
    /// Overwrite an existing `Document` identified by `id`. The revision will
353
    /// not be checked before the document is updated. If the document does not
354
    /// exist, it will be created.
355
    Overwrite {
356
        /// The id of the document to overwrite.
357
        id: DocumentId,
358

            
359
        /// The new contents to store within the `Document`.
360
        contents: Bytes,
361
    },
362

            
363
    /// Delete an existing `Document` identified by `id`. `revision` must match
364
    /// the currently stored revision on the `Document`. If it does not, the
365
    /// command fill fail with a `DocumentConflict` error.
366
    Delete {
367
        /// The current header of the `Document`.
368
        header: Header,
369
    },
370

            
371
    /// Checks whether a document exists, and optionally whether its revision is
372
    /// still current. If the document is not found, a `DocumentNotFound` error
373
    /// will be returned.  If the document revision is provided and does not
374
    /// match, a `DocumentConflict` error will be returned.
375
    Check {
376
        /// The id of the document to check.
377
        id: DocumentId,
378
        /// The revision of the document to check.
379
        revision: Option<Revision>,
380
    },
381
}
382

            
383
/// Information about the result of each `Operation` in a transaction.
384
2085264
#[derive(Clone, Debug, Serialize, Deserialize)]
385
pub enum OperationResult {
386
    /// An operation succeeded but had no information to output.
387
    Success,
388

            
389
    /// A `Document` was updated.
390
    DocumentUpdated {
391
        /// The id of the `Collection` of the updated `Document`.
392
        collection: CollectionName,
393

            
394
        /// The header of the updated `Document`.
395
        header: Header,
396
    },
397

            
398
    /// A `Document` was deleted.
399
    DocumentDeleted {
400
        /// The id of the `Collection` of the deleted `Document`.
401
        collection: CollectionName,
402

            
403
        /// The id of the deleted `Document`.
404
        id: DocumentId,
405
    },
406
}
407

            
408
/// Details about an executed transaction.
409
704790
#[derive(Clone, Debug, Serialize, Deserialize)]
410
pub struct Executed {
411
    /// The id of the transaction.
412
    pub id: u64,
413

            
414
    /// A list of containing ids of `Documents` changed.
415
    pub changes: Changes,
416
}
417

            
418
/// A list of changes.
419
1202370
#[derive(Clone, Debug, Serialize, Deserialize)]
420
pub enum Changes {
421
    /// A list of changed documents.
422
    Documents(DocumentChanges),
423
    /// A list of changed keys.
424
    Keys(Vec<ChangedKey>),
425
}
426

            
427
impl Changes {
428
    /// Returns the list of documents changed in this transaction, or None if
429
    /// the transaction was not a document transaction.
430
    #[must_use]
431
    pub const fn documents(&self) -> Option<&DocumentChanges> {
432
64920
        if let Self::Documents(changes) = self {
433
63960
            Some(changes)
434
        } else {
435
960
            None
436
        }
437
64920
    }
438

            
439
    /// Returns the list of keys changed in this transaction, or None if the
440
    /// transaction was not a `KeyValue` transaction.
441
    #[must_use]
442
    pub fn keys(&self) -> Option<&[ChangedKey]> {
443
9360
        if let Self::Keys(keys) = self {
444
6480
            Some(keys)
445
        } else {
446
2880
            None
447
        }
448
9360
    }
449
}
450

            
451
/// A list of changed documents.
452
2580295
#[derive(Clone, Debug, Serialize, Deserialize)]
453
pub struct DocumentChanges {
454
    /// All of the collections changed.
455
    pub collections: Vec<CollectionName>,
456
    /// The individual document changes.
457
    pub documents: Vec<ChangedDocument>,
458
}
459

            
460
impl DocumentChanges {
461
    /// Returns the changed document and the name of the collection the change
462
    /// happened to.
463
    #[must_use]
464
4
    pub fn get(&self, index: usize) -> Option<(&CollectionName, &ChangedDocument)> {
465
4
        self.documents.get(index).and_then(|doc| {
466
4
            self.collections
467
4
                .get(usize::from(doc.collection))
468
4
                .map(|collection| (collection, doc))
469
4
        })
470
4
    }
471

            
472
    /// Returns the number of changes in this collection.
473
    #[must_use]
474
1
    pub fn len(&self) -> usize {
475
1
        self.documents.len()
476
1
    }
477

            
478
    /// Returns true if there are no changes.
479
    #[must_use]
480
1
    pub fn is_empty(&self) -> bool {
481
1
        self.documents.is_empty()
482
1
    }
483

            
484
    /// Returns an interator over all of the changed documents.
485
1
    pub const fn iter(&self) -> DocumentChangesIter<'_> {
486
1
        DocumentChangesIter {
487
1
            changes: self,
488
1
            index: Some(0),
489
1
        }
490
1
    }
491
}
492

            
493
/// An iterator over [`DocumentChanges`].
494
#[must_use]
495
pub struct DocumentChangesIter<'a> {
496
    changes: &'a DocumentChanges,
497
    index: Option<usize>,
498
}
499

            
500
impl<'a> Iterator for DocumentChangesIter<'a> {
501
    type Item = (&'a CollectionName, &'a ChangedDocument);
502

            
503
4
    fn next(&mut self) -> Option<Self::Item> {
504
4
        self.index.and_then(|index| {
505
4
            let result = self.changes.get(index);
506
4
            if result.is_some() {
507
3
                self.index = index.checked_add(1);
508
3
            }
509
4
            result
510
4
        })
511
4
    }
512
}
513

            
514
/// A draining iterator over [`ChangedDocument`]s.
515
#[must_use]
516
pub struct DocumentChangesIntoIter {
517
    collections: Vec<CollectionName>,
518
    documents: std::vec::IntoIter<ChangedDocument>,
519
}
520

            
521
impl Iterator for DocumentChangesIntoIter {
522
    type Item = (CollectionName, ChangedDocument);
523

            
524
4
    fn next(&mut self) -> Option<Self::Item> {
525
4
        self.documents.next().and_then(|doc| {
526
4
            self.collections
527
4
                .get(usize::from(doc.collection))
528
4
                .map(|collection| (collection.clone(), doc))
529
4
        })
530
4
    }
531
}
532

            
533
impl IntoIterator for DocumentChanges {
534
    type IntoIter = DocumentChangesIntoIter;
535
    type Item = (CollectionName, ChangedDocument);
536

            
537
1
    fn into_iter(self) -> Self::IntoIter {
538
1
        DocumentChangesIntoIter {
539
1
            collections: self.collections,
540
1
            documents: self.documents.into_iter(),
541
1
        }
542
1
    }
543
}
544

            
545
1
#[test]
546
1
fn document_changes_iter() {
547
1
    use crate::schema::Qualified;
548
1

            
549
1
    let changes = DocumentChanges {
550
1
        collections: vec![CollectionName::private("a"), CollectionName::private("b")],
551
1
        documents: vec![
552
1
            ChangedDocument {
553
1
                collection: 0,
554
1
                id: DocumentId::from_u64(0),
555
1
                deleted: false,
556
1
            },
557
1
            ChangedDocument {
558
1
                collection: 0,
559
1
                id: DocumentId::from_u64(1),
560
1
                deleted: false,
561
1
            },
562
1
            ChangedDocument {
563
1
                collection: 1,
564
1
                id: DocumentId::from_u64(2),
565
1
                deleted: false,
566
1
            },
567
1
            ChangedDocument {
568
1
                collection: 2,
569
1
                id: DocumentId::from_u64(3),
570
1
                deleted: false,
571
1
            },
572
1
        ],
573
1
    };
574
1

            
575
1
    assert_eq!(changes.len(), 4);
576
1
    assert!(!changes.is_empty());
577

            
578
1
    let mut a_changes = 0;
579
1
    let mut b_changes = 0;
580
1
    let mut ids = Vec::new();
581
3
    for (collection, document) in changes.iter() {
582
3
        assert!(!ids.contains(&document.id));
583
3
        ids.push(document.id.clone());
584
3
        match collection.name.as_ref() {
585
3
            "a" => a_changes += 1,
586
1
            "b" => b_changes += 1,
587
            _ => unreachable!("invalid collection name {collection}"),
588
        }
589
    }
590
1
    assert_eq!(a_changes, 2);
591
1
    assert_eq!(b_changes, 1);
592

            
593
1
    let mut a_changes = 0;
594
1
    let mut b_changes = 0;
595
1
    let mut ids = Vec::new();
596
4
    for (collection, document) in changes {
597
3
        assert!(!ids.contains(&document.id));
598
3
        ids.push(document.id.clone());
599
3
        match collection.name.as_ref() {
600
3
            "a" => a_changes += 1,
601
1
            "b" => b_changes += 1,
602
            _ => unreachable!("invalid collection name {collection}"),
603
        }
604
    }
605
1
    assert_eq!(a_changes, 2);
606
1
    assert_eq!(b_changes, 1);
607
1
}
608

            
609
/// A record of a changed document.
610
3612413
#[derive(Debug, Clone, Serialize, Deserialize)]
611
pub struct ChangedDocument {
612
    /// The index of the `CollectionName` within the `collections` field of [`Changes::Documents`].
613
    pub collection: u16,
614

            
615
    /// The id of the changed `Document`.
616
    pub id: DocumentId,
617

            
618
    /// If the `Document` has been deleted, this will be `true`.
619
    pub deleted: bool,
620
}
621

            
622
/// A record of a changed `KeyValue` entry.
623
100228
#[derive(Clone, Debug, Serialize, Deserialize)]
624
pub struct ChangedKey {
625
    /// The namespace of the key.
626
    pub namespace: Option<String>,
627

            
628
    /// The key that was changed.
629
    pub key: String,
630

            
631
    /// True if the key was deleted.
632
    pub deleted: bool,
633
}