1
use arc_bytes::serde::Bytes;
2
use serde::{Deserialize, Serialize};
3

            
4
use crate::{
5
    connection::{AsyncLowLevelConnection, LowLevelConnection},
6
    document::{CollectionHeader, DocumentId, Header},
7
    schema::{CollectionName, SerializedCollection},
8
    Error,
9
};
10

            
11
/// A list of operations to execute as a single unit. If any operation fails,
12
/// all changes are aborted. Transactions are ACID-compliant. ACID stands for:
13
///
14
/// - Atomic: All transactions are atomically applied. Readers outside of the
15
///   active transaction will never be able to read partially written data. In
16
///   BonsaiDb, readers are not blocked while writes are happening -- reads will
17
///   continue to read the existing value until the transaction is fully
18
///   executed. Once the transaction is fully executed, all future queries will
19
///   reflect the updated state immediately.
20
///
21
/// - Consistent: All transactions will be applied only if the data model is
22
///   able to remain fully consistent. This means that all constraints, such as
23
///   unique view keys, are validated before a transaction is allowed to be
24
///   committed.
25
///
26
/// - Isolated: Each transaction is executed in an isolated environment.
27
///   Currently, BonsaiDb does not offer interactive transactions, so this is
28
///   easily guaranteed. When BonsaiDb eventually has interactive transactions,
29
///   the transaction will have a fully isolated state until it is committed. No
30
///   two transactions can be affected by each other's changes.
31
///
32
///   In the event of a transaction being aborted or a power outage occurs while
33
///   a transaction is being applied, this isolation ensures that once BonsaiDb
34
///   opens the database again, the database will reflect the most recently
35
///   committed.
36
///
37
/// - Durable: When the transaction apply function has finished exectuing,
38
///   BonsaiDb guarantees that all data has been confirmed by the operating
39
///   system as being fully written to disk. This ensures that in the event of a
40
///   power outage, no data that has been confirmed will be lost.
41
///
42
/// When using one of the high-level functions to push/insert/update/delete
43
/// documents, behind the scenes single-[`Operation`] `Transaction`s are
44
/// applied. To ensure multiple changes happen in the same database operation,
45
/// multiple operations can be added to a `Transaction`:
46
///
47
/// ```rust
48
/// # bonsaidb_core::__doctest_prelude!();
49
/// # use bonsaidb_core::connection::Connection;
50
/// # fn test_fn<C: Connection>(db: &C) -> Result<(), Error> {
51
/// use bonsaidb_core::transaction::{Operation, Transaction};
52
/// let mut tx = Transaction::new();
53
/// tx.push(Operation::push_serialized::<MyCollection>(
54
///     &MyCollection::default(),
55
/// )?);
56
/// tx.push(Operation::push_serialized::<MyCollection>(
57
///     &MyCollection::default(),
58
/// )?);
59
/// let results = tx.apply(db)?;
60
/// assert_eq!(results.len(), 2);
61
/// println!("Two new documents inserted: {results:?}");
62
/// # Ok(())
63
/// # }
64
/// ```
65
221634
#[derive(Clone, Serialize, Deserialize, Default, Debug)]
66
#[must_use]
67
pub struct Transaction {
68
    /// The operations in this transaction.
69
    pub operations: Vec<Operation>,
70
}
71

            
72
impl Transaction {
73
    /// Returns a new, empty transaction.
74
217
    pub fn new() -> Self {
75
217
        Self::default()
76
217
    }
77

            
78
    /// Adds an operation to the transaction.
79
351819
    pub fn push(&mut self, operation: Operation) {
80
351819
        self.operations.push(operation);
81
351819
    }
82

            
83
    /// Appends an operation to the transaction and returns self.
84
    pub fn with(mut self, operation: Operation) -> Self {
85
        self.push(operation);
86
        self
87
    }
88

            
89
    /// Applies the transaction to the `database`, returning the results of the
90
    /// operations. All operations will succeed or none will be performed and an
91
    /// error will be returned.
92
    pub fn apply<Connection: LowLevelConnection>(
93
        self,
94
        db: &Connection,
95
    ) -> Result<Vec<OperationResult>, Error> {
96
        db.apply_transaction(self)
97
    }
98

            
99
    /// Applies the transaction to the `database`, returning the results of the
100
    /// operations. All operations will succeed or none will be performed and an
101
    /// error will be returned.
102
    pub async fn apply_async<Connection: AsyncLowLevelConnection>(
103
        self,
104
        db: &Connection,
105
    ) -> Result<Vec<OperationResult>, Error> {
106
        db.apply_transaction(self).await
107
    }
108
}
109

            
110
impl From<Operation> for Transaction {
111
699639
    fn from(operation: Operation) -> Self {
112
699639
        Self {
113
699639
            operations: vec![operation],
114
699639
        }
115
699639
    }
116
}
117

            
118
impl Transaction {
119
    /// Inserts a new document with `contents` into `collection`.  If `id` is
120
    /// `None` a unique id will be generated. If an id is provided and a
121
    /// document already exists with that id, a conflict error will be returned.
122
318566
    pub fn insert(
123
318566
        collection: CollectionName,
124
318566
        id: Option<DocumentId>,
125
318566
        contents: impl Into<Bytes>,
126
318566
    ) -> Self {
127
318566
        Self::from(Operation::insert(collection, id, contents))
128
318566
    }
129

            
130
    /// Updates a document in `collection`.
131
5660
    pub fn update(collection: CollectionName, header: Header, contents: impl Into<Bytes>) -> Self {
132
5660
        Self::from(Operation::update(collection, header, contents))
133
5660
    }
134

            
135
    /// Overwrites a document in `collection`. If a document with `id` exists,
136
    /// it will be overwritten. If a document with `id` doesn't exist, it will
137
    /// be created.
138
517
    pub fn overwrite(
139
517
        collection: CollectionName,
140
517
        id: DocumentId,
141
517
        contents: impl Into<Bytes>,
142
517
    ) -> Self {
143
517
        Self::from(Operation::overwrite(collection, id, contents))
144
517
    }
145

            
146
    /// Deletes a document from a `collection`.
147
46035
    pub fn delete(collection: CollectionName, header: Header) -> Self {
148
46035
        Self::from(Operation::delete(collection, header))
149
46035
    }
150
}
151

            
152
/// A single operation performed on a `Collection`.
153
362215
#[derive(Clone, Serialize, Deserialize, Debug)]
154
#[must_use]
155
pub struct Operation {
156
    /// The id of the `Collection`.
157
    pub collection: CollectionName,
158

            
159
    /// The command being performed.
160
    pub command: Command,
161
}
162

            
163
impl Operation {
164
    /// Inserts a new document with `contents` into `collection`.  If `id` is
165
    /// `None` a unique id will be generated. If an id is provided and a
166
    /// document already exists with that id, a conflict error will be returned.
167
658258
    pub fn insert(
168
658258
        collection: CollectionName,
169
658258
        id: Option<DocumentId>,
170
658258
        contents: impl Into<Bytes>,
171
658258
    ) -> Self {
172
658258
        Self {
173
658258
            collection,
174
658258
            command: Command::Insert {
175
658258
                id,
176
658258
                contents: contents.into(),
177
658258
            },
178
658258
        }
179
658258
    }
180

            
181
    /// Inserts a new document with the serialized representation of `contents`
182
    /// into `collection`. If `id` is `None` a unique id will be generated. If
183
    /// an id is provided and a document already exists with that id, a conflict
184
    /// error will be returned.
185
11320
    pub fn insert_serialized<C: SerializedCollection>(
186
11320
        id: Option<C::PrimaryKey>,
187
11320
        contents: &C::Contents,
188
11320
    ) -> Result<Self, Error> {
189
11320
        let id = id.map(DocumentId::new).transpose()?;
190
11320
        let contents = C::serialize(contents)?;
191
11320
        Ok(Self::insert(C::collection_name(), id, contents))
192
11320
    }
193

            
194
    /// Pushes a new document with the serialized representation of `contents`
195
    /// into `collection`.
196
    ///
197
    /// ## Automatic ID Assignment
198
    ///
199
    /// This function calls [`SerializedCollection::natural_id()`] to try to
200
    /// retrieve a primary key value from `contents`. If an id is returned, the
201
    /// item is inserted with that id. If an id is not returned, an id will be
202
    /// automatically assigned, if possible, by the storage backend, which uses
203
    /// the [`Key`](crate::key::Key) trait to assign ids.
204
    pub fn push_serialized<C: SerializedCollection>(contents: &C::Contents) -> Result<Self, Error> {
205
        let id = C::natural_id(contents);
206
        let id = id.map(DocumentId::new).transpose()?;
207
        let contents = C::serialize(contents)?;
208
        Ok(Self::insert(C::collection_name(), id, contents))
209
    }
210

            
211
    /// Updates a document in `collection`.
212
5660
    pub fn update(collection: CollectionName, header: Header, contents: impl Into<Bytes>) -> Self {
213
5660
        Self {
214
5660
            collection,
215
5660
            command: Command::Update {
216
5660
                header,
217
5660
                contents: contents.into(),
218
5660
            },
219
5660
        }
220
5660
    }
221

            
222
    /// Updates a document with the serialized representation of `contents` in
223
    /// `collection`.
224
    pub fn update_serialized<C: SerializedCollection>(
225
        header: CollectionHeader<C::PrimaryKey>,
226
        contents: &C::Contents,
227
    ) -> Result<Self, Error> {
228
        let contents = C::serialize(contents)?;
229
        Ok(Self::update(
230
            C::collection_name(),
231
            Header::try_from(header)?,
232
            contents,
233
        ))
234
    }
235

            
236
    /// Overwrites a document in `collection`. If a document with `id` exists,
237
    /// it will be overwritten. If a document with `id` doesn't exist, it will
238
    /// be created.
239
517
    pub fn overwrite(
240
517
        collection: CollectionName,
241
517
        id: DocumentId,
242
517
        contents: impl Into<Bytes>,
243
517
    ) -> Self {
244
517
        Self {
245
517
            collection,
246
517
            command: Command::Overwrite {
247
517
                id,
248
517
                contents: contents.into(),
249
517
            },
250
517
        }
251
517
    }
252

            
253
    /// Overwrites a document with the serialized representation of `contents`
254
    /// in `collection`. If a document with `id` exists, it will be overwritten.
255
    /// If a document with `id` doesn't exist, it will be created.
256
    pub fn overwrite_serialized<C: SerializedCollection>(
257
        id: C::PrimaryKey,
258
        contents: &C::Contents,
259
    ) -> Result<Self, Error> {
260
        let contents = C::serialize(contents)?;
261
        Ok(Self::overwrite(
262
            C::collection_name(),
263
            DocumentId::new(id)?,
264
            contents,
265
        ))
266
    }
267

            
268
    /// Deletes a document from a `collection`.
269
46779
    pub const fn delete(collection: CollectionName, header: Header) -> Self {
270
46779
        Self {
271
46779
            collection,
272
46779
            command: Command::Delete { header },
273
46779
        }
274
46779
    }
275
}
276

            
277
/// A command to execute within a `Collection`.
278
393492
#[derive(Clone, Serialize, Deserialize, Debug)]
279
pub enum Command {
280
    /// Inserts a new document containing `contents`.
281
    Insert {
282
        /// An optional id for the document. If this is `None`, a unique id will
283
        /// be generated. If this is `Some()` and a document already exists with
284
        /// that id, a conflict error will be returned.
285
        id: Option<DocumentId>,
286
        /// The initial contents of the document.
287
        contents: Bytes,
288
    },
289

            
290
    /// Update an existing `Document` identified by `header`. `header.revision` must match
291
    /// the currently stored revision on the `Document`. If it does not, the
292
    /// command fill fail with a `DocumentConflict` error.
293
    Update {
294
        /// The header of the `Document`. The revision must match the current
295
        /// document.
296
        header: Header,
297

            
298
        /// The new contents to store within the `Document`.
299
        contents: Bytes,
300
    },
301

            
302
    /// Overwrite an existing `Document` identified by `id`. The revision will
303
    /// not be checked before the document is updated. If the document does not
304
    /// exist, it will be created.
305
    Overwrite {
306
        /// The id of the document to overwrite.
307
        id: DocumentId,
308

            
309
        /// The new contents to store within the `Document`.
310
        contents: Bytes,
311
    },
312

            
313
    /// Delete an existing `Document` identified by `id`. `revision` must match
314
    /// the currently stored revision on the `Document`. If it does not, the
315
    /// command fill fail with a `DocumentConflict` error.
316
    Delete {
317
        /// The current header of the `Document`.
318
        header: Header,
319
    },
320
}
321

            
322
/// Information about the result of each `Operation` in a transaction.
323
2218440
#[derive(Clone, Debug, Serialize, Deserialize)]
324
pub enum OperationResult {
325
    /// An operation succeeded but had no information to output.
326
    Success,
327

            
328
    /// A `Document` was updated.
329
    DocumentUpdated {
330
        /// The id of the `Collection` of the updated `Document`.
331
        collection: CollectionName,
332

            
333
        /// The header of the updated `Document`.
334
        header: Header,
335
    },
336

            
337
    /// A `Document` was deleted.
338
    DocumentDeleted {
339
        /// The id of the `Collection` of the deleted `Document`.
340
        collection: CollectionName,
341

            
342
        /// The id of the deleted `Document`.
343
        id: DocumentId,
344
    },
345
}
346

            
347
/// Details about an executed transaction.
348
744040
#[derive(Clone, Debug, Serialize, Deserialize)]
349
pub struct Executed {
350
    /// The id of the transaction.
351
    pub id: u64,
352

            
353
    /// A list of containing ids of `Documents` changed.
354
    pub changes: Changes,
355
}
356

            
357
/// A list of changes.
358
1003854
#[derive(Clone, Debug, Serialize, Deserialize)]
359
pub enum Changes {
360
    /// A list of changed documents.
361
    Documents(DocumentChanges),
362
    /// A list of changed keys.
363
    Keys(Vec<ChangedKey>),
364
}
365

            
366
impl Changes {
367
    /// Returns the list of documents changed in this transaction, or None if
368
    /// the transaction was not a document transaction.
369
    #[must_use]
370
    pub const fn documents(&self) -> Option<&DocumentChanges> {
371
50313
        if let Self::Documents(changes) = self {
372
49569
            Some(changes)
373
        } else {
374
744
            None
375
        }
376
50313
    }
377

            
378
    /// Returns the list of keys changed in this transaction, or None if the
379
    /// transaction was not a `KeyValue` transaction.
380
    #[must_use]
381
    pub fn keys(&self) -> Option<&[ChangedKey]> {
382
7440
        if let Self::Keys(keys) = self {
383
5208
            Some(keys)
384
        } else {
385
2232
            None
386
        }
387
7440
    }
388
}
389

            
390
/// A list of changed documents.
391
2272190
#[derive(Clone, Debug, Serialize, Deserialize)]
392
pub struct DocumentChanges {
393
    /// All of the collections changed.
394
    pub collections: Vec<CollectionName>,
395
    /// The individual document changes.
396
    pub documents: Vec<ChangedDocument>,
397
}
398

            
399
impl DocumentChanges {
400
    /// Returns the changed document and the name of the collection the change
401
    /// happened to.
402
    #[must_use]
403
4
    pub fn get(&self, index: usize) -> Option<(&CollectionName, &ChangedDocument)> {
404
4
        self.documents.get(index).and_then(|doc| {
405
4
            self.collections
406
4
                .get(usize::from(doc.collection))
407
4
                .map(|collection| (collection, doc))
408
4
        })
409
4
    }
410

            
411
    /// Returns the number of changes in this collection.
412
    #[must_use]
413
1
    pub fn len(&self) -> usize {
414
1
        self.documents.len()
415
1
    }
416

            
417
    /// Returns true if there are no changes.
418
    #[must_use]
419
1
    pub fn is_empty(&self) -> bool {
420
1
        self.documents.is_empty()
421
1
    }
422

            
423
    /// Returns an interator over all of the changed documents.
424
1
    pub const fn iter(&self) -> DocumentChangesIter<'_> {
425
1
        DocumentChangesIter {
426
1
            changes: self,
427
1
            index: Some(0),
428
1
        }
429
1
    }
430
}
431

            
432
/// An iterator over [`DocumentChanges`].
433
#[must_use]
434
pub struct DocumentChangesIter<'a> {
435
    changes: &'a DocumentChanges,
436
    index: Option<usize>,
437
}
438

            
439
impl<'a> Iterator for DocumentChangesIter<'a> {
440
    type Item = (&'a CollectionName, &'a ChangedDocument);
441

            
442
4
    fn next(&mut self) -> Option<Self::Item> {
443
4
        self.index.and_then(|index| {
444
4
            let result = self.changes.get(index);
445
4
            if result.is_some() {
446
3
                self.index = index.checked_add(1);
447
3
            }
448
4
            result
449
4
        })
450
4
    }
451
}
452

            
453
/// A draining iterator over [`ChangedDocument`]s.
454
#[must_use]
455
pub struct DocumentChangesIntoIter {
456
    collections: Vec<CollectionName>,
457
    documents: std::vec::IntoIter<ChangedDocument>,
458
}
459

            
460
impl Iterator for DocumentChangesIntoIter {
461
    type Item = (CollectionName, ChangedDocument);
462

            
463
4
    fn next(&mut self) -> Option<Self::Item> {
464
4
        self.documents.next().and_then(|doc| {
465
4
            self.collections
466
4
                .get(usize::from(doc.collection))
467
4
                .map(|collection| (collection.clone(), doc))
468
4
        })
469
4
    }
470
}
471

            
472
impl IntoIterator for DocumentChanges {
473
    type Item = (CollectionName, ChangedDocument);
474

            
475
    type IntoIter = DocumentChangesIntoIter;
476

            
477
1
    fn into_iter(self) -> Self::IntoIter {
478
1
        DocumentChangesIntoIter {
479
1
            collections: self.collections,
480
1
            documents: self.documents.into_iter(),
481
1
        }
482
1
    }
483
}
484

            
485
1
#[test]
486
1
fn document_changes_iter() {
487
1
    use crate::schema::Qualified;
488
1

            
489
1
    let changes = DocumentChanges {
490
1
        collections: vec![CollectionName::private("a"), CollectionName::private("b")],
491
1
        documents: vec![
492
1
            ChangedDocument {
493
1
                collection: 0,
494
1
                id: DocumentId::from_u64(0),
495
1
                deleted: false,
496
1
            },
497
1
            ChangedDocument {
498
1
                collection: 0,
499
1
                id: DocumentId::from_u64(1),
500
1
                deleted: false,
501
1
            },
502
1
            ChangedDocument {
503
1
                collection: 1,
504
1
                id: DocumentId::from_u64(2),
505
1
                deleted: false,
506
1
            },
507
1
            ChangedDocument {
508
1
                collection: 2,
509
1
                id: DocumentId::from_u64(3),
510
1
                deleted: false,
511
1
            },
512
1
        ],
513
1
    };
514
1

            
515
1
    assert_eq!(changes.len(), 4);
516
1
    assert!(!changes.is_empty());
517

            
518
1
    let mut a_changes = 0;
519
1
    let mut b_changes = 0;
520
1
    let mut ids = Vec::new();
521
3
    for (collection, document) in changes.iter() {
522
3
        assert!(!ids.contains(&document.id));
523
3
        ids.push(document.id);
524
3
        match collection.name.as_ref() {
525
3
            "a" => a_changes += 1,
526
1
            "b" => b_changes += 1,
527
            _ => unreachable!("invalid collection name {collection}"),
528
        }
529
    }
530
1
    assert_eq!(a_changes, 2);
531
1
    assert_eq!(b_changes, 1);
532

            
533
1
    let mut a_changes = 0;
534
1
    let mut b_changes = 0;
535
1
    let mut ids = Vec::new();
536
4
    for (collection, document) in changes {
537
3
        assert!(!ids.contains(&document.id));
538
3
        ids.push(document.id);
539
3
        match collection.name.as_ref() {
540
3
            "a" => a_changes += 1,
541
1
            "b" => b_changes += 1,
542
            _ => unreachable!("invalid collection name {collection}"),
543
        }
544
    }
545
1
    assert_eq!(a_changes, 2);
546
1
    assert_eq!(b_changes, 1);
547
1
}
548

            
549
/// A record of a changed document.
550
3181066
#[derive(Debug, Clone, Serialize, Deserialize)]
551
pub struct ChangedDocument {
552
    /// The index of the `CollectionName` within the `collections` field of [`Changes::Documents`].
553
    pub collection: u16,
554

            
555
    /// The id of the changed `Document`.
556
    pub id: DocumentId,
557

            
558
    /// If the `Document` has been deleted, this will be `true`.
559
    pub deleted: bool,
560
}
561

            
562
/// A record of a changed `KeyValue` entry.
563
89764
#[derive(Clone, Debug, Serialize, Deserialize)]
564
pub struct ChangedKey {
565
    /// The namespace of the key.
566
    pub namespace: Option<String>,
567

            
568
    /// The key that was changed.
569
    pub key: String,
570

            
571
    /// True if the key was deleted.
572
    pub deleted: bool,
573
}