1
#![allow(clippy::missing_panics_doc)]
2

            
3
use std::{
4
    fmt::{Debug, Display},
5
    io::ErrorKind,
6
    ops::Deref,
7
    path::{Path, PathBuf},
8
    time::{Duration, Instant},
9
};
10

            
11
use itertools::Itertools;
12
use serde::{Deserialize, Serialize};
13
use transmog_pot::Pot;
14

            
15
#[cfg(feature = "multiuser")]
16
use crate::admin::{PermissionGroup, Role, User};
17
use crate::{
18
    connection::{AccessPolicy, Connection, StorageConnection},
19
    document::{BorrowedDocument, CollectionDocument, Document, KeyId},
20
    keyvalue::KeyValue,
21
    limits::{LIST_TRANSACTIONS_DEFAULT_RESULT_COUNT, LIST_TRANSACTIONS_MAX_RESULTS},
22
    schema::{
23
        view::{
24
            map::{Mappings, ViewMappedValue},
25
            ReduceResult, ViewSchema,
26
        },
27
        Collection, CollectionName, MappedValue, NamedCollection, Schema, SchemaName, Schematic,
28
        SerializedCollection, View, ViewMapResult,
29
    },
30
    Error,
31
};
32

            
33
744102
#[derive(Serialize, Deserialize, Debug, PartialEq, Default, Clone, Collection)]
34
// This collection purposely uses names with characters that need
35
// escaping, since it's used in backup/restore.
36
#[collection(name = "_basic", authority = "khonsulabs_", views = [BasicCount, BasicByParentId, BasicByTag, BasicByCategory], core = crate)]
37
pub struct Basic {
38
    pub value: String,
39
    pub category: Option<String>,
40
    pub parent_id: Option<u64>,
41
    pub tags: Vec<String>,
42
}
43

            
44
impl Basic {
45
2116
    pub fn new(value: impl Into<String>) -> Self {
46
2116
        Self {
47
2116
            value: value.into(),
48
2116
            tags: Vec::default(),
49
2116
            category: None,
50
2116
            parent_id: None,
51
2116
        }
52
2116
    }
53

            
54
25
    pub fn with_category(mut self, category: impl Into<String>) -> Self {
55
25
        self.category = Some(category.into());
56
25
        self
57
25
    }
58

            
59
20
    pub fn with_tag(mut self, tag: impl Into<String>) -> Self {
60
20
        self.tags.push(tag.into());
61
20
        self
62
20
    }
63

            
64
    #[must_use]
65
650
    pub const fn with_parent_id(mut self, parent_id: u64) -> Self {
66
650
        self.parent_id = Some(parent_id);
67
650
        self
68
650
    }
69
}
70

            
71
1191528
#[derive(Debug, Clone, View)]
72
#[view(collection = Basic, key = (), value = usize, name = "count", core = crate)]
73
pub struct BasicCount;
74

            
75
impl ViewSchema for BasicCount {
76
    type View = Self;
77

            
78
25
    fn map(&self, document: &BorrowedDocument<'_>) -> ViewMapResult<Self::View> {
79
25
        Ok(document.emit_key_and_value((), 1))
80
25
    }
81

            
82
25
    fn reduce(
83
25
        &self,
84
25
        mappings: &[ViewMappedValue<Self::View>],
85
25
        _rereduce: bool,
86
25
    ) -> ReduceResult<Self::View> {
87
25
        Ok(mappings.iter().map(|map| map.value).sum())
88
25
    }
89
}
90

            
91
1203476
#[derive(Debug, Clone, View)]
92
#[view(collection = Basic, key = Option<u64>, value = usize, name = "by-parent-id", core = crate)]
93
pub struct BasicByParentId;
94

            
95
impl ViewSchema for BasicByParentId {
96
    type View = Self;
97

            
98
1300
    fn version(&self) -> u64 {
99
1300
        1
100
1300
    }
101

            
102
1275
    fn map(&self, document: &BorrowedDocument<'_>) -> ViewMapResult<Self::View> {
103
1275
        let contents = document.contents::<Basic>()?;
104
1275
        Ok(document.emit_key_and_value(contents.parent_id, 1))
105
1275
    }
106

            
107
1775
    fn reduce(
108
1775
        &self,
109
1775
        mappings: &[ViewMappedValue<Self::View>],
110
1775
        _rereduce: bool,
111
1775
    ) -> ReduceResult<Self::View> {
112
1900
        Ok(mappings.iter().map(|map| map.value).sum())
113
1775
    }
114
}
115

            
116
1192101
#[derive(Debug, Clone, View)]
117
#[view(collection = Basic, key = String, value = usize, name = "by-category", core = crate)]
118
pub struct BasicByCategory;
119

            
120
impl ViewSchema for BasicByCategory {
121
    type View = Self;
122

            
123
650
    fn map(&self, document: &BorrowedDocument<'_>) -> ViewMapResult<Self::View> {
124
650
        let contents = document.contents::<Basic>()?;
125
650
        if let Some(category) = &contents.category {
126
375
            Ok(document.emit_key_and_value(category.to_lowercase(), 1))
127
        } else {
128
275
            Ok(Mappings::none())
129
        }
130
650
    }
131

            
132
375
    fn reduce(
133
375
        &self,
134
375
        mappings: &[ViewMappedValue<Self::View>],
135
375
        _rereduce: bool,
136
375
    ) -> ReduceResult<Self::View> {
137
500
        Ok(mappings.iter().map(|map| map.value).sum())
138
375
    }
139
}
140

            
141
1197026
#[derive(Debug, Clone, View)]
142
#[view(collection = Basic, key = String, value = usize, name = "by-tag", core = crate)]
143
pub struct BasicByTag;
144

            
145
impl ViewSchema for BasicByTag {
146
    type View = Self;
147

            
148
525
    fn map(&self, document: &BorrowedDocument<'_>) -> ViewMapResult<Self::View> {
149
525
        let contents = document.contents::<Basic>()?;
150

            
151
525
        Ok(contents
152
525
            .tags
153
525
            .iter()
154
750
            .map(|tag| document.emit_key_and_value(tag.clone(), 1))
155
525
            .collect())
156
525
    }
157

            
158
1000
    fn reduce(
159
1000
        &self,
160
1000
        mappings: &[ViewMappedValue<Self::View>],
161
1000
        _rereduce: bool,
162
1000
    ) -> ReduceResult<Self::View> {
163
1250
        Ok(mappings.iter().map(|map| map.value).sum())
164
1000
    }
165
}
166

            
167
175
#[derive(Debug, Clone, View)]
168
#[view(collection = Basic, key = (), value = (), name = "by-parent-id", core = crate)]
169
pub struct BasicByBrokenParentId;
170

            
171
impl ViewSchema for BasicByBrokenParentId {
172
    type View = Self;
173

            
174
25
    fn map(&self, document: &BorrowedDocument<'_>) -> ViewMapResult<Self::View> {
175
25
        Ok(document.emit())
176
25
    }
177
}
178

            
179
1487700
#[derive(Serialize, Deserialize, Debug, PartialEq, Default, Clone, Collection)]
180
#[collection(name = "encrypted-basic", authority = "khonsulabs", views = [EncryptedBasicCount, EncryptedBasicByParentId, EncryptedBasicByCategory])]
181
#[collection(encryption_key = Some(KeyId::Master), encryption_optional, core = crate)]
182
pub struct EncryptedBasic {
183
    pub value: String,
184
    pub category: Option<String>,
185
    pub parent_id: Option<u64>,
186
}
187

            
188
impl EncryptedBasic {
189
1
    pub fn new(value: impl Into<String>) -> Self {
190
1
        Self {
191
1
            value: value.into(),
192
1
            category: None,
193
1
            parent_id: None,
194
1
        }
195
1
    }
196

            
197
    pub fn with_category(mut self, category: impl Into<String>) -> Self {
198
        self.category = Some(category.into());
199
        self
200
    }
201

            
202
    #[must_use]
203
    pub const fn with_parent_id(mut self, parent_id: u64) -> Self {
204
        self.parent_id = Some(parent_id);
205
        self
206
    }
207
}
208

            
209
744000
#[derive(Debug, Clone, View)]
210
#[view(collection = EncryptedBasic, key = (), value = usize, name = "count", core = crate)]
211
pub struct EncryptedBasicCount;
212

            
213
impl ViewSchema for EncryptedBasicCount {
214
    type View = Self;
215

            
216
    fn map(&self, document: &BorrowedDocument<'_>) -> ViewMapResult<Self::View> {
217
        Ok(document.emit_key_and_value((), 1))
218
    }
219

            
220
    fn reduce(
221
        &self,
222
        mappings: &[ViewMappedValue<Self::View>],
223
        _rereduce: bool,
224
    ) -> ReduceResult<Self::View> {
225
        Ok(mappings.iter().map(|map| map.value).sum())
226
    }
227
}
228

            
229
744025
#[derive(Debug, Clone, View)]
230
#[view(collection = EncryptedBasic, key = Option<u64>, value = usize, name = "by-parent-id", core = crate)]
231
pub struct EncryptedBasicByParentId;
232

            
233
impl ViewSchema for EncryptedBasicByParentId {
234
    type View = Self;
235

            
236
    fn map(&self, document: &BorrowedDocument<'_>) -> ViewMapResult<Self::View> {
237
        let contents = document.contents::<EncryptedBasic>()?;
238
        Ok(document.emit_key_and_value(contents.parent_id, 1))
239
    }
240

            
241
    fn reduce(
242
        &self,
243
        mappings: &[ViewMappedValue<Self::View>],
244
        _rereduce: bool,
245
    ) -> ReduceResult<Self::View> {
246
        Ok(mappings.iter().map(|map| map.value).sum())
247
    }
248
}
249

            
250
744025
#[derive(Debug, Clone, View)]
251
#[view(collection = EncryptedBasic, key = String, value = usize, name = "by-category", core = crate)]
252
pub struct EncryptedBasicByCategory;
253

            
254
impl ViewSchema for EncryptedBasicByCategory {
255
    type View = Self;
256

            
257
    fn map(&self, document: &BorrowedDocument<'_>) -> ViewMapResult<Self::View> {
258
        let contents = document.contents::<EncryptedBasic>()?;
259
        if let Some(category) = &contents.category {
260
            Ok(document.emit_key_and_value(category.to_lowercase(), 1))
261
        } else {
262
            Ok(Mappings::none())
263
        }
264
    }
265

            
266
    fn reduce(
267
        &self,
268
        mappings: &[ViewMappedValue<Self::View>],
269
        _rereduce: bool,
270
    ) -> ReduceResult<Self::View> {
271
        Ok(mappings.iter().map(|map| map.value).sum())
272
    }
273
}
274

            
275
743850
#[derive(Debug, Schema)]
276
#[schema(name = "basic", collections = [Basic, EncryptedBasic, Unique], core = crate)]
277
pub struct BasicSchema;
278

            
279
743850
#[derive(Serialize, Deserialize, Debug, PartialEq, Default, Collection)]
280
#[collection(name = "unique", authority = "khonsulabs", views = [UniqueValue], core = crate)]
281
pub struct Unique {
282
    pub value: String,
283
}
284

            
285
impl Unique {
286
25
    pub fn new(value: impl Display) -> Self {
287
25
        Self {
288
25
            value: value.to_string(),
289
25
        }
290
25
    }
291
}
292

            
293
749925
#[derive(Debug, Clone, View)]
294
#[view(collection = Unique, key = String, value = (), name = "unique-value", core = crate)]
295
pub struct UniqueValue;
296

            
297
impl ViewSchema for UniqueValue {
298
    type View = Self;
299

            
300
747450
    fn unique(&self) -> bool {
301
747450
        true
302
747450
    }
303

            
304
1000
    fn map(&self, document: &BorrowedDocument<'_>) -> ViewMapResult<Self::View> {
305
1000
        let entry = document.contents::<Unique>()?;
306
1000
        Ok(document.emit_key(entry.value))
307
1000
    }
308
}
309

            
310
impl NamedCollection for Unique {
311
    type ByNameView = UniqueValue;
312
}
313

            
314
pub struct TestDirectory(pub PathBuf);
315

            
316
impl TestDirectory {
317
113
    pub fn new<S: AsRef<Path>>(name: S) -> Self {
318
113
        let path = std::env::temp_dir().join(name);
319
113
        if path.exists() {
320
            std::fs::remove_dir_all(&path).expect("error clearing temporary directory");
321
113
        }
322
113
        Self(path)
323
113
    }
324
}
325

            
326
impl Drop for TestDirectory {
327
    fn drop(&mut self) {
328
2825
        if let Err(err) = std::fs::remove_dir_all(&self.0) {
329
            if err.kind() != ErrorKind::NotFound {
330
                eprintln!("Failed to clean up temporary folder: {:?}", err);
331
            }
332
2825
        }
333
2825
    }
334
}
335

            
336
impl AsRef<Path> for TestDirectory {
337
2975
    fn as_ref(&self) -> &Path {
338
2975
        &self.0
339
2975
    }
340
}
341

            
342
impl Deref for TestDirectory {
343
    type Target = PathBuf;
344

            
345
25
    fn deref(&self) -> &Self::Target {
346
25
        &self.0
347
25
    }
348
}
349

            
350
#[derive(Debug)]
351
pub struct BasicCollectionWithNoViews;
352

            
353
impl Collection for BasicCollectionWithNoViews {
354
200
    fn collection_name() -> CollectionName {
355
200
        Basic::collection_name()
356
200
    }
357

            
358
50
    fn define_views(_schema: &mut Schematic) -> Result<(), Error> {
359
50
        Ok(())
360
50
    }
361
}
362

            
363
impl SerializedCollection for BasicCollectionWithNoViews {
364
    type Contents = Basic;
365
    type Format = Pot;
366

            
367
25
    fn format() -> Self::Format {
368
25
        Pot::default()
369
25
    }
370
}
371

            
372
#[derive(Debug)]
373
pub struct BasicCollectionWithOnlyBrokenParentId;
374

            
375
impl Collection for BasicCollectionWithOnlyBrokenParentId {
376
175
    fn collection_name() -> CollectionName {
377
175
        Basic::collection_name()
378
175
    }
379

            
380
50
    fn define_views(schema: &mut Schematic) -> Result<(), Error> {
381
50
        schema.define_view(BasicByBrokenParentId)
382
50
    }
383
}
384

            
385
250
#[derive(Debug, Collection)]
386
#[collection(name = "unassociated", authority = "khonsulabs", core = crate)]
387
pub struct UnassociatedCollection;
388

            
389
3700
#[derive(Copy, Clone, Debug)]
390
pub enum HarnessTest {
391
    ServerConnectionTests = 1,
392
    StoreRetrieveUpdate,
393
    NotFound,
394
    Conflict,
395
    BadUpdate,
396
    NoUpdate,
397
    GetMultiple,
398
    List,
399
    ListTransactions,
400
    ViewQuery,
401
    UnassociatedCollection,
402
    Compact,
403
    ViewUpdate,
404
    ViewMultiEmit,
405
    ViewUnimplementedReduce,
406
    ViewAccessPolicies,
407
    Encryption,
408
    UniqueViews,
409
    NamedCollection,
410
    PubSubSimple,
411
    UserManagement,
412
    PubSubMultipleSubscribers,
413
    PubSubDropAndSend,
414
    PubSubUnsubscribe,
415
    PubSubDropCleanup,
416
    PubSubPublishAll,
417
    KvBasic,
418
    KvConcurrency,
419
    KvSet,
420
    KvIncrementDecrement,
421
    KvExpiration,
422
    KvDeleteExpire,
423
    KvTransactions,
424
}
425

            
426
impl HarnessTest {
427
    #[must_use]
428
    pub const fn port(self, base: u16) -> u16 {
429
        base + self as u16
430
    }
431
}
432

            
433
impl Display for HarnessTest {
434
3725
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
435
3725
        Debug::fmt(&self, f)
436
3725
    }
437
}
438

            
439
/// Compares two f64's accounting for the epsilon.
440
#[macro_export]
441
macro_rules! assert_f64_eq {
442
    ($a:expr, $b:expr) => {{
443
        let a: f64 = $a;
444
        let b: f64 = $b;
445
        assert!((a - b).abs() <= f64::EPSILON, "{:?} <> {:?}", a, b);
446
    }};
447
}
448

            
449
/// Creates a test suite that tests methods available on [`Connection`]
450
#[macro_export]
451
macro_rules! define_connection_test_suite {
452
    ($harness:ident) => {
453
        #[tokio::test]
454
5
        async fn server_connection_tests() -> anyhow::Result<()> {
455
            let harness =
456
                $harness::new($crate::test_util::HarnessTest::ServerConnectionTests).await?;
457
            let db = harness.server();
458
            $crate::test_util::basic_server_connection_tests(
459
                db.clone(),
460
                &format!("server-connection-tests-{}", $harness::server_name()),
461
            )
462
            .await?;
463
            harness.shutdown().await
464
        }
465

            
466
        #[tokio::test]
467
5
        async fn store_retrieve_update_delete() -> anyhow::Result<()> {
468
            let harness =
469
                $harness::new($crate::test_util::HarnessTest::StoreRetrieveUpdate).await?;
470
            let db = harness.connect().await?;
471
            $crate::test_util::store_retrieve_update_delete_tests(&db).await?;
472
            harness.shutdown().await
473
        }
474

            
475
        #[tokio::test]
476
5
        async fn not_found() -> anyhow::Result<()> {
477
            let harness = $harness::new($crate::test_util::HarnessTest::NotFound).await?;
478
            let db = harness.connect().await?;
479

            
480
            $crate::test_util::not_found_tests(&db).await?;
481
            harness.shutdown().await
482
        }
483

            
484
        #[tokio::test]
485
5
        async fn conflict() -> anyhow::Result<()> {
486
            let harness = $harness::new($crate::test_util::HarnessTest::Conflict).await?;
487
            let db = harness.connect().await?;
488

            
489
            $crate::test_util::conflict_tests(&db).await?;
490
            harness.shutdown().await
491
        }
492

            
493
        #[tokio::test]
494
5
        async fn bad_update() -> anyhow::Result<()> {
495
            let harness = $harness::new($crate::test_util::HarnessTest::BadUpdate).await?;
496
            let db = harness.connect().await?;
497

            
498
            $crate::test_util::bad_update_tests(&db).await?;
499
            harness.shutdown().await
500
        }
501

            
502
        #[tokio::test]
503
5
        async fn no_update() -> anyhow::Result<()> {
504
            let harness = $harness::new($crate::test_util::HarnessTest::NoUpdate).await?;
505
            let db = harness.connect().await?;
506

            
507
            $crate::test_util::no_update_tests(&db).await?;
508
            harness.shutdown().await
509
        }
510

            
511
        #[tokio::test]
512
5
        async fn get_multiple() -> anyhow::Result<()> {
513
            let harness = $harness::new($crate::test_util::HarnessTest::GetMultiple).await?;
514
            let db = harness.connect().await?;
515

            
516
            $crate::test_util::get_multiple_tests(&db).await?;
517
            harness.shutdown().await
518
        }
519

            
520
        #[tokio::test]
521
5
        async fn list() -> anyhow::Result<()> {
522
            let harness = $harness::new($crate::test_util::HarnessTest::List).await?;
523
            let db = harness.connect().await?;
524

            
525
            $crate::test_util::list_tests(&db).await?;
526
            harness.shutdown().await
527
        }
528

            
529
        #[tokio::test]
530
5
        async fn list_transactions() -> anyhow::Result<()> {
531
            let harness = $harness::new($crate::test_util::HarnessTest::ListTransactions).await?;
532
            let db = harness.connect().await?;
533

            
534
            $crate::test_util::list_transactions_tests(&db).await?;
535
            harness.shutdown().await
536
        }
537

            
538
        #[tokio::test]
539
5
        async fn view_query() -> anyhow::Result<()> {
540
            let harness = $harness::new($crate::test_util::HarnessTest::ViewQuery).await?;
541
            let db = harness.connect().await?;
542

            
543
            $crate::test_util::view_query_tests(&db).await?;
544
            harness.shutdown().await
545
        }
546

            
547
        #[tokio::test]
548
5
        async fn unassociated_collection() -> anyhow::Result<()> {
549
            let harness =
550
                $harness::new($crate::test_util::HarnessTest::UnassociatedCollection).await?;
551
            let db = harness.connect().await?;
552

            
553
            $crate::test_util::unassociated_collection_tests(&db).await?;
554
            harness.shutdown().await
555
        }
556

            
557
        #[tokio::test]
558
5
        async fn unimplemented_reduce() -> anyhow::Result<()> {
559
            let harness =
560
                $harness::new($crate::test_util::HarnessTest::ViewUnimplementedReduce).await?;
561
            let db = harness.connect().await?;
562

            
563
            $crate::test_util::unimplemented_reduce(&db).await?;
564
            harness.shutdown().await
565
        }
566

            
567
        #[tokio::test]
568
5
        async fn view_update() -> anyhow::Result<()> {
569
            let harness = $harness::new($crate::test_util::HarnessTest::ViewUpdate).await?;
570
            let db = harness.connect().await?;
571

            
572
            $crate::test_util::view_update_tests(&db).await?;
573
            harness.shutdown().await
574
        }
575

            
576
        #[tokio::test]
577
5
        async fn view_multi_emit() -> anyhow::Result<()> {
578
            let harness = $harness::new($crate::test_util::HarnessTest::ViewMultiEmit).await?;
579
            let db = harness.connect().await?;
580

            
581
            $crate::test_util::view_multi_emit_tests(&db).await?;
582
            harness.shutdown().await
583
        }
584

            
585
        #[tokio::test]
586
5
        async fn view_access_policies() -> anyhow::Result<()> {
587
            let harness = $harness::new($crate::test_util::HarnessTest::ViewAccessPolicies).await?;
588
            let db = harness.connect().await?;
589

            
590
            $crate::test_util::view_access_policy_tests(&db).await?;
591
            harness.shutdown().await
592
        }
593

            
594
        #[tokio::test]
595
5
        async fn unique_views() -> anyhow::Result<()> {
596
            let harness = $harness::new($crate::test_util::HarnessTest::UniqueViews).await?;
597
            let db = harness.connect().await?;
598

            
599
            $crate::test_util::unique_view_tests(&db).await?;
600
            harness.shutdown().await
601
        }
602

            
603
        #[tokio::test]
604
5
        async fn named_collection() -> anyhow::Result<()> {
605
            let harness = $harness::new($crate::test_util::HarnessTest::NamedCollection).await?;
606
            let db = harness.connect().await?;
607

            
608
            $crate::test_util::named_collection_tests(&db).await?;
609
            harness.shutdown().await
610
        }
611

            
612
        #[tokio::test]
613
        #[cfg(any(feature = "multiuser", feature = "local-multiuser", feature = "server"))]
614
4
        async fn user_management() -> anyhow::Result<()> {
615
            let harness = $harness::new($crate::test_util::HarnessTest::UserManagement).await?;
616
            let _db = harness.connect().await?;
617
            let server = harness.server();
618
            let admin = server
619
                .database::<$crate::admin::Admin>($crate::admin::ADMIN_DATABASE_NAME)
620
                .await?;
621

            
622
            $crate::test_util::user_management_tests(
623
                &admin,
624
                server.clone(),
625
                $harness::server_name(),
626
            )
627
            .await?;
628
            harness.shutdown().await
629
        }
630

            
631
        #[tokio::test]
632
5
        async fn compaction() -> anyhow::Result<()> {
633
            let harness = $harness::new($crate::test_util::HarnessTest::Compact).await?;
634
            let db = harness.connect().await?;
635

            
636
            $crate::test_util::compaction_tests(&db).await?;
637
            harness.shutdown().await
638
        }
639
    };
640
}
641

            
642
506
pub async fn store_retrieve_update_delete_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
643
506
    let original_value = Basic::new("initial_value");
644
506
    let collection = db.collection::<Basic>();
645
2225
    let header = collection.push(&original_value).await?;
646

            
647
506
    let mut doc = collection
648
1587
        .get(header.id)
649
1586
        .await?
650
506
        .expect("couldn't retrieve stored item");
651
506
    let mut value = doc.contents::<Basic>()?;
652
506
    assert_eq!(original_value, value);
653
506
    let old_revision = doc.header.revision.clone();
654
506

            
655
506
    // Update the value
656
506
    value.value = String::from("updated_value");
657
506
    doc.set_contents(&value)?;
658
1675
    db.update::<Basic, _>(&mut doc).await?;
659

            
660
    // update should cause the revision to be changed
661
506
    assert_ne!(doc.header.revision, old_revision);
662

            
663
    // Check the value in the database to ensure it has the new document
664
506
    let doc = collection
665
1478
        .get(header.id)
666
1477
        .await?
667
506
        .expect("couldn't retrieve stored item");
668
506
    assert_eq!(doc.contents::<Basic>()?, value);
669

            
670
    // These operations should have created two transactions with one change each
671
1467
    let transactions = db.list_executed_transactions(None, None).await?;
672
506
    assert_eq!(transactions.len(), 2);
673
506
    assert!(transactions[0].id < transactions[1].id);
674
1518
    for transaction in &transactions {
675
1012
        let changed_documents = transaction
676
1012
            .changes
677
1012
            .documents()
678
1012
            .expect("incorrect transaction type");
679
1012
        assert_eq!(changed_documents.len(), 1);
680
1012
        assert_eq!(changed_documents[0].collection, Basic::collection_name());
681
1012
        assert_eq!(changed_documents[0].id, header.id);
682
1012
        assert!(!changed_documents[0].deleted);
683
    }
684

            
685
1629
    db.collection::<Basic>().delete(&doc).await?;
686
1485
    assert!(collection.get(header.id).await?.is_none());
687
506
    let transactions = db
688
1539
        .list_executed_transactions(Some(transactions.last().as_ref().unwrap().id + 1), None)
689
1539
        .await?;
690
506
    assert_eq!(transactions.len(), 1);
691
506
    let transaction = transactions.first().unwrap();
692
506
    let changed_documents = transaction
693
506
        .changes
694
506
        .documents()
695
506
        .expect("incorrect transaction type");
696
506
    assert_eq!(changed_documents.len(), 1);
697
506
    assert_eq!(changed_documents[0].collection, Basic::collection_name());
698
506
    assert_eq!(changed_documents[0].id, header.id);
699
506
    assert!(changed_documents[0].deleted);
700

            
701
    // Use the Collection interface
702
1693
    let mut doc = original_value.clone().push_into(db).await?;
703
506
    doc.contents.category = Some(String::from("updated"));
704
1692
    doc.update(db).await?;
705
1506
    let reloaded = Basic::get(doc.header.id, db).await?.unwrap();
706
506
    assert_eq!(doc.contents, reloaded.contents);
707

            
708
    // Test Connection::insert with a specified id
709
506
    let doc = BorrowedDocument::with_contents(42, &Basic::new("42"))?;
710
506
    let document_42 = db
711
1731
        .insert::<Basic, _>(Some(doc.id), doc.contents.into_vec())
712
1731
        .await?;
713
506
    assert_eq!(document_42.id, 42);
714
1719
    let document_43 = Basic::new("43").insert_into(43, db).await?;
715
506
    assert_eq!(document_43.id, 43);
716

            
717
    // Test that inserting a document with the same ID results in a conflict:
718
506
    let conflict_err = Basic::new("43")
719
1647
        .insert_into(doc.header.id, db)
720
1647
        .await
721
506
        .unwrap_err();
722
506
    assert!(matches!(conflict_err.error, Error::DocumentConflict(..)));
723

            
724
506
    Ok(())
725
506
}
726

            
727
5
pub async fn not_found_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
728
5
    assert!(db.collection::<Basic>().get(1).await?.is_none());
729

            
730
5
    assert!(db.last_transaction_id().await?.is_none());
731

            
732
5
    Ok(())
733
5
}
734

            
735
5
pub async fn conflict_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
736
5
    let original_value = Basic::new("initial_value");
737
5
    let collection = db.collection::<Basic>();
738
5
    let header = collection.push(&original_value).await?;
739

            
740
5
    let mut doc = collection
741
5
        .get(header.id)
742
5
        .await?
743
5
        .expect("couldn't retrieve stored item");
744
5
    let mut value = doc.contents::<Basic>()?;
745
5
    value.value = String::from("updated_value");
746
5
    doc.set_contents(&value)?;
747
5
    db.update::<Basic, _>(&mut doc).await?;
748

            
749
    // To generate a conflict, let's try to do the same update again by
750
    // reverting the header
751
5
    doc.header = header;
752
5
    match db
753
5
        .update::<Basic, _>(&mut doc)
754
5
        .await
755
5
        .expect_err("conflict should have generated an error")
756
    {
757
5
        Error::DocumentConflict(collection, id) => {
758
5
            assert_eq!(collection, Basic::collection_name());
759
5
            assert_eq!(id, doc.header.id);
760
        }
761
        other => return Err(anyhow::Error::from(other)),
762
    }
763

            
764
    // Now, let's use the CollectionDocument API to modify the document through a refetch.
765
5
    let mut doc = CollectionDocument::<Basic>::try_from(&doc)?;
766
10
    doc.modify(db, |doc| {
767
10
        doc.contents.value = String::from("modify worked");
768
15
    })
769
15
    .await?;
770
5
    assert_eq!(doc.contents.value, "modify worked");
771
5
    let doc = Basic::get(doc.id, db).await?.unwrap();
772
5
    assert_eq!(doc.contents.value, "modify worked");
773

            
774
5
    Ok(())
775
5
}
776

            
777
5
pub async fn bad_update_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
778
5
    let mut doc = BorrowedDocument::with_contents(1, &Basic::default())?;
779
5
    match db.update::<Basic, _>(&mut doc).await {
780
5
        Err(Error::DocumentNotFound(collection, id)) => {
781
5
            assert_eq!(collection, Basic::collection_name());
782
5
            assert_eq!(id, 1);
783
5
            Ok(())
784
        }
785
        other => panic!("expected DocumentNotFound from update but got: {:?}", other),
786
    }
787
5
}
788

            
789
5
pub async fn no_update_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
790
5
    let original_value = Basic::new("initial_value");
791
5
    let collection = db.collection::<Basic>();
792
5
    let header = collection.push(&original_value).await?;
793

            
794
5
    let mut doc = collection
795
5
        .get(header.id)
796
4
        .await?
797
5
        .expect("couldn't retrieve stored item");
798
5
    db.update::<Basic, _>(&mut doc).await?;
799

            
800
5
    assert_eq!(doc.header, header);
801

            
802
5
    Ok(())
803
5
}
804

            
805
5
pub async fn get_multiple_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
806
5
    let collection = db.collection::<Basic>();
807
5
    let doc1_value = Basic::new("initial_value");
808
5
    let doc1 = collection.push(&doc1_value).await?;
809

            
810
5
    let doc2_value = Basic::new("second_value");
811
5
    let doc2 = collection.push(&doc2_value).await?;
812

            
813
5
    let both_docs = Basic::get_multiple(&[doc1.id, doc2.id], db).await?;
814
5
    assert_eq!(both_docs.len(), 2);
815

            
816
5
    let out_of_order = Basic::get_multiple(&[doc2.id, doc1.id], db).await?;
817
5
    assert_eq!(out_of_order.len(), 2);
818

            
819
    // The order of get_multiple isn't guaranteed, so these two checks are done
820
    // with iterators instead of direct indexing
821
5
    let doc1 = both_docs
822
5
        .iter()
823
5
        .find(|doc| doc.header.id == doc1.id)
824
5
        .expect("Couldn't find doc1");
825
5
    assert_eq!(doc1.contents.value, doc1_value.value);
826
5
    let doc2 = both_docs
827
5
        .iter()
828
10
        .find(|doc| doc.header.id == doc2.id)
829
5
        .expect("Couldn't find doc2");
830
5
    assert_eq!(doc2.contents.value, doc2_value.value);
831

            
832
5
    Ok(())
833
5
}
834

            
835
5
pub async fn list_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
836
5
    let collection = db.collection::<Basic>();
837
5
    let doc1_value = Basic::new("initial_value");
838
5
    let doc1 = collection.push(&doc1_value).await?;
839

            
840
5
    let doc2_value = Basic::new("second_value");
841
5
    let doc2 = collection.push(&doc2_value).await?;
842

            
843
5
    let all_docs = Basic::all(db).await?;
844
5
    assert_eq!(all_docs.len(), 2);
845

            
846
5
    let both_docs = Basic::list(doc1.id..=doc2.id, db).await?;
847
5
    assert_eq!(both_docs.len(), 2);
848

            
849
5
    assert_eq!(both_docs[0].contents.value, doc1_value.value);
850
5
    assert_eq!(both_docs[1].contents.value, doc2_value.value);
851

            
852
5
    let one_doc = Basic::list(doc1.id..doc2.id, db).await?;
853
5
    assert_eq!(one_doc.len(), 1);
854

            
855
5
    let limited = Basic::list(doc1.id..=doc2.id, db)
856
5
        .limit(1)
857
5
        .descending()
858
5
        .await?;
859
5
    assert_eq!(limited.len(), 1);
860
5
    assert_eq!(limited[0].contents.value, doc2_value.value);
861

            
862
5
    Ok(())
863
5
}
864

            
865
5
pub async fn list_transactions_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
866
5
    let collection = db.collection::<Basic>();
867
5

            
868
5
    // create LIST_TRANSACTIONS_MAX_RESULTS + 1 items, giving us just enough
869
5
    // transactions to test the edge cases of `list_transactions`
870
5
    futures::future::join_all(
871
5
        (0..=(LIST_TRANSACTIONS_MAX_RESULTS))
872
5005
            .map(|_| async { collection.push(&Basic::default()).await.unwrap() }),
873
4110
    )
874
4110
    .await;
875

            
876
    // Test defaults
877
5
    let transactions = db.list_executed_transactions(None, None).await?;
878
5
    assert_eq!(transactions.len(), LIST_TRANSACTIONS_DEFAULT_RESULT_COUNT);
879

            
880
    // Test max results limit
881
5
    let transactions = db
882
5
        .list_executed_transactions(None, Some(LIST_TRANSACTIONS_MAX_RESULTS + 1))
883
5
        .await?;
884
5
    assert_eq!(transactions.len(), LIST_TRANSACTIONS_MAX_RESULTS);
885

            
886
    // Test requesting 0 items
887
5
    let transactions = db.list_executed_transactions(None, Some(0)).await?;
888
5
    assert!(transactions.is_empty());
889

            
890
    // Test doing a loop fetching until we get no more results
891
5
    let mut transactions = Vec::new();
892
5
    let mut starting_id = None;
893
    loop {
894
60
        let chunk = db
895
60
            .list_executed_transactions(starting_id, Some(100))
896
60
            .await?;
897
60
        if chunk.is_empty() {
898
5
            break;
899
55
        }
900
55

            
901
55
        let max_id = chunk.last().map(|tx| tx.id).unwrap();
902
55
        starting_id = Some(max_id + 1);
903
55
        transactions.extend(chunk);
904
    }
905

            
906
5
    assert_eq!(transactions.len(), LIST_TRANSACTIONS_MAX_RESULTS + 1);
907

            
908
5
    Ok(())
909
5
}
910

            
911
5
pub async fn view_query_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
912
5
    let collection = db.collection::<Basic>();
913
5
    let a = collection.push(&Basic::new("A")).await?;
914
5
    let b = collection.push(&Basic::new("B")).await?;
915
5
    let a_child = collection
916
5
        .push(
917
5
            &Basic::new("A.1")
918
5
                .with_parent_id(a.id)
919
5
                .with_category("Alpha"),
920
5
        )
921
5
        .await?;
922
5
    collection
923
5
        .push(&Basic::new("B.1").with_parent_id(b.id).with_category("Beta"))
924
5
        .await?;
925
5
    collection
926
5
        .push(&Basic::new("B.2").with_parent_id(b.id).with_category("beta"))
927
5
        .await?;
928

            
929
5
    let a_children = db
930
5
        .view::<BasicByParentId>()
931
5
        .with_key(Some(a.id))
932
8
        .query()
933
8
        .await?;
934
5
    assert_eq!(a_children.len(), 1);
935

            
936
5
    let a_children = db
937
5
        .view::<BasicByParentId>()
938
5
        .with_key(Some(a.id))
939
7
        .query_with_collection_docs()
940
7
        .await?;
941
5
    assert_eq!(a_children.len(), 1);
942
5
    assert_eq!(a_children.get(0).unwrap().document.header, a_child);
943

            
944
5
    let b_children = db
945
5
        .view::<BasicByParentId>()
946
5
        .with_key(Some(b.id))
947
5
        .query()
948
2
        .await?;
949
5
    assert_eq!(b_children.len(), 2);
950

            
951
5
    let a_and_b_children = db
952
5
        .view::<BasicByParentId>()
953
5
        .with_keys([Some(a.id), Some(b.id)])
954
5
        .query()
955
2
        .await?;
956
5
    assert_eq!(a_and_b_children.len(), 3);
957

            
958
    // Test out of order keys
959
5
    let a_and_b_children = db
960
5
        .view::<BasicByParentId>()
961
5
        .with_keys([Some(b.id), Some(a.id)])
962
5
        .query()
963
2
        .await?;
964
5
    assert_eq!(a_and_b_children.len(), 3);
965

            
966
5
    let has_parent = db
967
5
        .view::<BasicByParentId>()
968
5
        .with_key_range(Some(0)..=Some(u64::MAX))
969
5
        .query()
970
2
        .await?;
971
5
    assert_eq!(has_parent.len(), 3);
972
    // Verify the result is sorted ascending
973
5
    assert!(has_parent
974
5
        .windows(2)
975
10
        .all(|window| window[0].key <= window[1].key));
976

            
977
    // Test limiting and descending order
978
5
    let last_with_parent = db
979
5
        .view::<BasicByParentId>()
980
5
        .with_key_range(Some(0)..=Some(u64::MAX))
981
5
        .descending()
982
5
        .limit(1)
983
5
        .query()
984
2
        .await?;
985
10
    assert_eq!(last_with_parent.iter().map(|m| m.key).unique().count(), 1);
986
5
    assert_eq!(last_with_parent[0].key, has_parent[2].key);
987

            
988
10
    let items_with_categories = db.view::<BasicByCategory>().query().await?;
989
5
    assert_eq!(items_with_categories.len(), 3);
990

            
991
    // Test deleting
992
5
    let deleted_count = db
993
5
        .view::<BasicByParentId>()
994
5
        .with_key(Some(b.id))
995
7
        .delete_docs()
996
7
        .await?;
997
5
    assert_eq!(b_children.len() as u64, deleted_count);
998
    assert_eq!(
999
5
        db.view::<BasicByParentId>()
5
            .with_key(Some(b.id))
5
            .query()
5
            .await?
5
            .len(),
        0
    );

            
5
    Ok(())
5
}

            
5
pub async fn unassociated_collection_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
5
    let result = db
5
        .insert::<UnassociatedCollection, _>(None, Vec::new())
4
        .await;
5
    match result {
5
        Err(Error::CollectionNotFound) => {}
        other => unreachable!("unexpected result: {:?}", other),
    }

            
5
    Ok(())
5
}

            
5
pub async fn unimplemented_reduce<C: Connection>(db: &C) -> anyhow::Result<()> {
5
    assert!(matches!(
5
        db.view::<UniqueValue>().reduce().await,
        Err(Error::ReduceUnimplemented)
    ));
5
    Ok(())
5
}

            
5
pub async fn view_update_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
5
    let collection = db.collection::<Basic>();
5
    let a = collection.push(&Basic::new("A")).await?;

            
5
    let a_children = db
5
        .view::<BasicByParentId>()
5
        .with_key(Some(a.id))
8
        .query()
8
        .await?;
5
    assert_eq!(a_children.len(), 0);
    // The reduce function of `BasicByParentId` acts as a "count" of records.
    assert_eq!(
5
        db.view::<BasicByParentId>()
5
            .with_key(Some(a.id))
5
            .reduce()
5
            .await?,
        0
    );

            
    // Test inserting a new record and the view being made available
5
    let a_child = collection
5
        .push(
5
            &Basic::new("A.1")
5
                .with_parent_id(a.id)
5
                .with_category("Alpha"),
5
        )
5
        .await?;

            
5
    let a_children = db
5
        .view::<BasicByParentId>()
5
        .with_key(Some(a.id))
5
        .query()
5
        .await?;
5
    assert_eq!(a_children.len(), 1);
    assert_eq!(
5
        db.view::<BasicByParentId>()
5
            .with_key(Some(a.id))
5
            .reduce()
5
            .await?,
        1
    );

            
    // Verify reduce_grouped matches our expectations.
    assert_eq!(
5
        db.view::<BasicByParentId>().reduce_grouped().await?,
5
        vec![MappedValue::new(None, 1,), MappedValue::new(Some(a.id), 1,),]
    );

            
    // Test updating the record and the view being updated appropriately
5
    let mut doc = db.collection::<Basic>().get(a_child.id).await?.unwrap();
5
    let mut basic = doc.contents::<Basic>()?;
5
    basic.parent_id = None;
5
    doc.set_contents(&basic)?;
5
    db.update::<Basic, _>(&mut doc).await?;

            
5
    let a_children = db
5
        .view::<BasicByParentId>()
5
        .with_key(Some(a.id))
5
        .query()
5
        .await?;
5
    assert_eq!(a_children.len(), 0);
    assert_eq!(
5
        db.view::<BasicByParentId>()
5
            .with_key(Some(a.id))
5
            .reduce()
5
            .await?,
        0
    );
5
    assert_eq!(db.view::<BasicByParentId>().reduce().await?, 2);

            
    // Test deleting a record and ensuring it goes away
5
    db.collection::<Basic>().delete(&doc).await?;

            
5
    let all_entries = db.view::<BasicByParentId>().query().await?;
5
    assert_eq!(all_entries.len(), 1);

            
    // Verify reduce_grouped matches our expectations.
    assert_eq!(
5
        db.view::<BasicByParentId>().reduce_grouped().await?,
5
        vec![MappedValue::new(None, 1,),]
    );

            
5
    Ok(())
5
}

            
5
pub async fn view_multi_emit_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
5
    let mut a = Basic::new("A")
5
        .with_tag("red")
5
        .with_tag("green")
5
        .push_into(db)
5
        .await?;
5
    let mut b = Basic::new("B")
5
        .with_tag("blue")
5
        .with_tag("green")
5
        .push_into(db)
5
        .await?;

            
8
    assert_eq!(db.view::<BasicByTag>().query().await?.len(), 4);

            
    assert_eq!(
5
        db.view::<BasicByTag>()
5
            .with_key(String::from("green"))
5
            .query()
5
            .await?
5
            .len(),
        2
    );

            
    assert_eq!(
5
        db.view::<BasicByTag>()
5
            .with_key(String::from("red"))
5
            .query()
2
            .await?
5
            .len(),
        1
    );

            
    assert_eq!(
5
        db.view::<BasicByTag>()
5
            .with_key(String::from("blue"))
5
            .query()
2
            .await?
5
            .len(),
        1
    );

            
    // Change tags
5
    a.contents.tags = vec![String::from("red"), String::from("blue")];
5
    a.update(db).await?;

            
    assert_eq!(
5
        db.view::<BasicByTag>()
5
            .with_key(String::from("green"))
5
            .query()
5
            .await?
5
            .len(),
        1
    );

            
    assert_eq!(
5
        db.view::<BasicByTag>()
5
            .with_key(String::from("red"))
5
            .query()
5
            .await?
5
            .len(),
        1
    );

            
    assert_eq!(
5
        db.view::<BasicByTag>()
5
            .with_key(String::from("blue"))
5
            .query()
2
            .await?
5
            .len(),
        2
    );
5
    b.contents.tags.clear();
5
    b.update(db).await?;

            
    assert_eq!(
5
        db.view::<BasicByTag>()
5
            .with_key(String::from("green"))
5
            .query()
5
            .await?
5
            .len(),
        0
    );

            
    assert_eq!(
5
        db.view::<BasicByTag>()
5
            .with_key(String::from("red"))
5
            .query()
5
            .await?
5
            .len(),
        1
    );

            
    assert_eq!(
5
        db.view::<BasicByTag>()
5
            .with_key(String::from("blue"))
5
            .query()
2
            .await?
5
            .len(),
        1
    );

            
5
    Ok(())
5
}

            
5
pub async fn view_access_policy_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
5
    let collection = db.collection::<Basic>();
5
    let a = collection.push(&Basic::new("A")).await?;

            
    // Test inserting a record that should match the view, but ask for it to be
    // NoUpdate. Verify we get no matches.
5
    collection
5
        .push(
5
            &Basic::new("A.1")
5
                .with_parent_id(a.id)
5
                .with_category("Alpha"),
5
        )
5
        .await?;

            
5
    let a_children = db
5
        .view::<BasicByParentId>()
5
        .with_key(Some(a.id))
5
        .with_access_policy(AccessPolicy::NoUpdate)
5
        .query()
2
        .await?;
5
    assert_eq!(a_children.len(), 0);

            
5
    tokio::time::sleep(Duration::from_millis(20)).await;

            
    // Verify the view still have no value, but this time ask for it to be
    // updated after returning
5
    let a_children = db
5
        .view::<BasicByParentId>()
5
        .with_key(Some(a.id))
5
        .with_access_policy(AccessPolicy::UpdateAfter)
5
        .query()
2
        .await?;
5
    assert_eq!(a_children.len(), 0);

            
    // Waiting on background jobs can be unreliable in a CI environment
5
    for _ in 0..10_u8 {
5
        tokio::time::sleep(Duration::from_millis(20)).await;

            
        // Now, the view should contain the entry.
5
        let a_children = db
5
            .view::<BasicByParentId>()
5
            .with_key(Some(a.id))
5
            .with_access_policy(AccessPolicy::NoUpdate)
5
            .query()
2
            .await?;
5
        if a_children.len() == 1 {
5
            return Ok(());
        }
    }
    panic!("view never updated")
5
}

            
5
pub async fn unique_view_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
8
    let first_doc = db.collection::<Unique>().push(&Unique::new("1")).await?;

            
    if let Err(Error::UniqueKeyViolation {
5
        view,
5
        existing_document,
5
        conflicting_document,
5
    }) = db.collection::<Unique>().push(&Unique::new("1")).await
    {
5
        assert_eq!(view, UniqueValue.view_name());
5
        assert_eq!(existing_document.id, first_doc.id);
        // We can't predict the conflicting document id since it's generated
        // inside of the transaction, but we can assert that it's different than
        // the document that was previously stored.
5
        assert_ne!(conflicting_document, existing_document);
    } else {
        unreachable!("unique key violation not triggered");
    }

            
5
    let second_doc = db.collection::<Unique>().push(&Unique::new("2")).await?;
5
    let mut second_doc = db.collection::<Unique>().get(second_doc.id).await?.unwrap();
5
    let mut contents = second_doc.contents::<Unique>()?;
5
    contents.value = String::from("1");
5
    second_doc.set_contents(&contents)?;
    if let Err(Error::UniqueKeyViolation {
5
        view,
5
        existing_document,
5
        conflicting_document,
5
    }) = db.update::<Unique, _>(&mut second_doc).await
    {
5
        assert_eq!(view, UniqueValue.view_name());
5
        assert_eq!(existing_document.id, first_doc.id);
5
        assert_eq!(conflicting_document.id, second_doc.header.id);
    } else {
        unreachable!("unique key violation not triggered");
    }

            
5
    Ok(())
5
}

            
5
pub async fn named_collection_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
8
    Unique::new("0").push_into(db).await?;
5
    let original_entry = Unique::entry("1", db)
5
        .update_with(|_existing: &mut Unique| unreachable!())
13
        .or_insert_with(|| Unique::new("1"))
13
        .await?
5
        .expect("Document not inserted");

            
5
    let updated = Unique::entry("1", db)
5
        .update_with(|existing: &mut Unique| {
5
            existing.value = String::from("2");
5
        })
12
        .or_insert_with(|| unreachable!())
12
        .await?
5
        .unwrap();
5
    assert_eq!(original_entry.id, updated.id);
5
    assert_ne!(original_entry.contents.value, updated.contents.value);

            
7
    let retrieved = Unique::entry("2", db).await?.unwrap();
5
    assert_eq!(retrieved.contents.value, updated.contents.value);

            
5
    let conflict = Unique::entry("2", db)
5
        .update_with(|existing: &mut Unique| {
5
            existing.value = String::from("0");
8
        })
8
        .await;
5
    assert!(matches!(conflict, Err(Error::UniqueKeyViolation { .. })));

            
5
    Ok(())
5
}

            
5
pub async fn compaction_tests<C: Connection + KeyValue>(db: &C) -> anyhow::Result<()> {
5
    let original_value = Basic::new("initial_value");
5
    let collection = db.collection::<Basic>();
5
    collection.push(&original_value).await?;

            
    // Test a collection compaction
5
    db.compact_collection::<Basic>().await?;

            
    // Test the key value store compaction
5
    db.set_key("foo", &1_u32).await?;
5
    db.compact_key_value_store().await?;

            
    // Compact everything... again...
5
    db.compact().await?;

            
5
    Ok(())
5
}

            
#[cfg(feature = "multiuser")]
4
pub async fn user_management_tests<C: Connection, S: StorageConnection>(
4
    admin: &C,
4
    server: S,
4
    server_name: &str,
4
) -> anyhow::Result<()> {
4
    let username = format!("user-management-tests-{}", server_name);
6
    let user_id = server.create_user(&username).await?;
    // Test the default created user state.
    {
4
        let user = User::get(user_id, admin)
4
            .await
4
            .unwrap()
4
            .expect("user not found");
4
        assert_eq!(user.contents.username, username);
4
        assert!(user.contents.groups.is_empty());
4
        assert!(user.contents.roles.is_empty());
    }

            
4
    let role = Role::named(format!("role-{}", server_name))
6
        .push_into(admin)
6
        .await?;
4
    let group = PermissionGroup::named(format!("group-{}", server_name))
6
        .push_into(admin)
6
        .await?;

            
    // Add the role and group.
5
    server.add_permission_group_to_user(user_id, &group).await?;
4
    server.add_role_to_user(user_id, &role).await?;

            
    // Test the results
    {
4
        let user = User::get(user_id, admin)
4
            .await
4
            .unwrap()
4
            .expect("user not found");
4
        assert_eq!(user.contents.groups, vec![group.header.id]);
4
        assert_eq!(user.contents.roles, vec![role.header.id]);
    }

            
    // Add the same things again (should not do anything). With names this time.
4
    server
6
        .add_permission_group_to_user(&username, &group)
6
        .await?;
4
    server.add_role_to_user(&username, &role).await?;
    {
        // TODO this is what's failing.
4
        let user = User::load(&username, admin)
3
            .await
4
            .unwrap()
4
            .expect("user not found");
4
        assert_eq!(user.contents.groups, vec![group.header.id]);
4
        assert_eq!(user.contents.roles, vec![role.header.id]);
    }

            
    // Remove the group.
4
    server
4
        .remove_permission_group_from_user(user_id, &group)
4
        .await?;
4
    server.remove_role_from_user(user_id, &role).await?;
    {
4
        let user = User::get(user_id, admin)
3
            .await
4
            .unwrap()
4
            .expect("user not found");
4
        assert!(user.contents.groups.is_empty());
4
        assert!(user.contents.roles.is_empty());
    }

            
    // Removing again shouldn't cause an error.
4
    server
4
        .remove_permission_group_from_user(user_id, &group)
3
        .await?;
4
    server.remove_role_from_user(user_id, &role).await?;

            
4
    Ok(())
4
}

            
/// Defines the `KeyValue` test suite
#[macro_export]
macro_rules! define_kv_test_suite {
    ($harness:ident) => {
        #[tokio::test]
5
        async fn basic_kv_test() -> anyhow::Result<()> {
            use $crate::keyvalue::{KeyStatus, KeyValue};
            let harness = $harness::new($crate::test_util::HarnessTest::KvBasic).await?;
            let db = harness.connect().await?;
            assert_eq!(
                db.set_key("akey", &String::from("avalue")).await?,
                KeyStatus::Inserted
            );
            assert_eq!(
                db.get_key("akey").into().await?,
                Some(String::from("avalue"))
            );
            assert_eq!(
                db.set_key("akey", &String::from("new_value"))
                    .returning_previous_as()
                    .await?,
                Some(String::from("avalue"))
            );
            assert_eq!(
                db.get_key("akey").into().await?,
                Some(String::from("new_value"))
            );
            assert_eq!(
                db.get_key("akey").and_delete().into().await?,
                Some(String::from("new_value"))
            );
            assert_eq!(db.get_key("akey").await?, None);
            assert_eq!(
                db.set_key("akey", &String::from("new_value"))
                    .returning_previous()
                    .await?,
                None
            );
            assert_eq!(db.delete_key("akey").await?, KeyStatus::Deleted);
            assert_eq!(db.delete_key("akey").await?, KeyStatus::NotChanged);

            
            harness.shutdown().await?;

            
            Ok(())
        }

            
        #[tokio::test]
5
        async fn kv_concurrency() -> anyhow::Result<()> {
            use $crate::keyvalue::{KeyStatus, KeyValue};
            const WRITERS: usize = 100;
            const INCREMENTS: usize = 100;
            let harness = $harness::new($crate::test_util::HarnessTest::KvConcurrency).await?;
            let db = harness.connect().await?;

            
            let handles = (0..WRITERS).map(|_| {
                let db = db.clone();
                tokio::task::spawn(async move {
                    for _ in 0..INCREMENTS {
                        db.increment_key_by("concurrency", 1_u64).await.unwrap();
                    }
                })
            });
            futures::future::join_all(handles).await;

            
            assert_eq!(
                db.get_key("concurrency").into_u64().await.unwrap().unwrap(),
                (WRITERS * INCREMENTS) as u64
            );

            
            harness.shutdown().await?;

            
            Ok(())
        }

            
        #[tokio::test]
5
        async fn kv_set_tests() -> anyhow::Result<()> {
            use $crate::keyvalue::{KeyStatus, KeyValue};
            let harness = $harness::new($crate::test_util::HarnessTest::KvSet).await?;
            let db = harness.connect().await?;
            let kv = db.with_key_namespace("set");

            
            assert_eq!(
                kv.set_key("a", &0_u32).only_if_exists().await?,
                KeyStatus::NotChanged
            );
            assert_eq!(
                kv.set_key("a", &0_u32).only_if_vacant().await?,
                KeyStatus::Inserted
            );
            assert_eq!(
                kv.set_key("a", &1_u32).only_if_vacant().await?,
                KeyStatus::NotChanged
            );
            assert_eq!(
                kv.set_key("a", &2_u32).only_if_exists().await?,
                KeyStatus::Updated,
            );
            assert_eq!(
                kv.set_key("a", &3_u32).returning_previous_as().await?,
                Some(2_u32),
            );

            
            harness.shutdown().await?;

            
            Ok(())
        }

            
        #[tokio::test]
5
        async fn kv_increment_decrement_tests() -> anyhow::Result<()> {
            use $crate::keyvalue::{KeyStatus, KeyValue};
            let harness =
                $harness::new($crate::test_util::HarnessTest::KvIncrementDecrement).await?;
            let db = harness.connect().await?;
            let kv = db.with_key_namespace("increment_decrement");

            
            // Empty keys should be equal to 0
            assert_eq!(kv.increment_key_by("i64", 1_i64).await?, 1_i64);
            assert_eq!(kv.get_key("i64").into_i64().await?, Some(1_i64));
            assert_eq!(kv.increment_key_by("u64", 1_u64).await?, 1_u64);
            $crate::assert_f64_eq!(kv.increment_key_by("f64", 1_f64).await?, 1_f64);

            
            // Test float incrementing/decrementing an existing value
            $crate::assert_f64_eq!(kv.increment_key_by("f64", 1_f64).await?, 2_f64);
            $crate::assert_f64_eq!(kv.decrement_key_by("f64", 2_f64).await?, 0_f64);

            
            // Empty keys should be equal to 0
            assert_eq!(kv.decrement_key_by("i64_2", 1_i64).await?, -1_i64);
            assert_eq!(
                kv.decrement_key_by("u64_2", 42_u64)
                    .allow_overflow()
                    .await?,
                u64::MAX - 41
            );
            assert_eq!(kv.decrement_key_by("u64_3", 42_u64).await?, u64::MIN);
            $crate::assert_f64_eq!(kv.decrement_key_by("f64_2", 1_f64).await?, -1_f64);

            
            // Test decrement wrapping with overflow
            kv.set_numeric_key("i64", i64::MIN).await?;
            assert_eq!(
                kv.decrement_key_by("i64", 1_i64).allow_overflow().await?,
                i64::MAX
            );
            assert_eq!(
                kv.decrement_key_by("u64", 2_u64).allow_overflow().await?,
                u64::MAX
            );

            
            // Test increment wrapping with overflow
            assert_eq!(
                kv.increment_key_by("i64", 1_i64).allow_overflow().await?,
                i64::MIN
            );
            assert_eq!(
                kv.increment_key_by("u64", 1_u64).allow_overflow().await?,
                u64::MIN
            );

            
            // Test saturating increments.
            kv.set_numeric_key("i64", i64::MAX - 1).await?;
            kv.set_numeric_key("u64", u64::MAX - 1).await?;
            assert_eq!(kv.increment_key_by("i64", 2_i64).await?, i64::MAX);
            assert_eq!(kv.increment_key_by("u64", 2_u64).await?, u64::MAX);

            
            // Test saturating decrements.
            kv.set_numeric_key("i64", i64::MIN + 1).await?;
            kv.set_numeric_key("u64", u64::MIN + 1).await?;
            assert_eq!(kv.decrement_key_by("i64", 2_i64).await?, i64::MIN);
            assert_eq!(kv.decrement_key_by("u64", 2_u64).await?, u64::MIN);

            
            // Test numerical conversion safety using get
            {
                // For i64 -> f64, the limit is 2^52 + 1 in either posive or
                // negative directions.
                kv.set_numeric_key("i64", (2_i64.pow(f64::MANTISSA_DIGITS)))
                    .await?;
                $crate::assert_f64_eq!(
                    kv.get_key("i64").into_f64().await?.unwrap(),
                    9_007_199_254_740_992_f64
                );
                kv.set_numeric_key("i64", -(2_i64.pow(f64::MANTISSA_DIGITS)))
                    .await?;
                $crate::assert_f64_eq!(
                    kv.get_key("i64").into_f64().await?.unwrap(),
                    -9_007_199_254_740_992_f64
                );

            
                kv.set_numeric_key("i64", (2_i64.pow(f64::MANTISSA_DIGITS) + 1))
                    .await?;
                assert!(matches!(kv.get_key("i64").into_f64().await, Err(_)));
                $crate::assert_f64_eq!(
                    kv.get_key("i64").into_f64_lossy().await?.unwrap(),
                    9_007_199_254_740_993_f64
                );
                kv.set_numeric_key("i64", -(2_i64.pow(f64::MANTISSA_DIGITS) + 1))
                    .await?;
                assert!(matches!(kv.get_key("i64").into_f64().await, Err(_)));
                $crate::assert_f64_eq!(
                    kv.get_key("i64").into_f64_lossy().await?.unwrap(),
                    -9_007_199_254_740_993_f64
                );

            
                // For i64 -> u64, the only limit is sign.
                kv.set_numeric_key("i64", -1_i64).await?;
                assert!(matches!(kv.get_key("i64").into_u64().await, Err(_)));
                assert_eq!(
                    kv.get_key("i64").into_u64_lossy(true).await?.unwrap(),
                    0_u64
                );
                assert_eq!(
                    kv.get_key("i64").into_u64_lossy(false).await?.unwrap(),
                    u64::MAX
                );

            
                // For f64 -> i64, the limit is fractional numbers. Saturating isn't tested in this conversion path.
                kv.set_numeric_key("f64", 1.1_f64).await?;
                assert!(matches!(kv.get_key("f64").into_i64().await, Err(_)));
                assert_eq!(
                    kv.get_key("f64").into_i64_lossy(false).await?.unwrap(),
                    1_i64
                );
                kv.set_numeric_key("f64", -1.1_f64).await?;
                assert!(matches!(kv.get_key("f64").into_i64().await, Err(_)));
                assert_eq!(
                    kv.get_key("f64").into_i64_lossy(false).await?.unwrap(),
                    -1_i64
                );

            
                // For f64 -> u64, the limit is fractional numbers or negative numbers. Saturating isn't tested in this conversion path.
                kv.set_numeric_key("f64", 1.1_f64).await?;
                assert!(matches!(kv.get_key("f64").into_u64().await, Err(_)));
                assert_eq!(
                    kv.get_key("f64").into_u64_lossy(false).await?.unwrap(),
                    1_u64
                );
                kv.set_numeric_key("f64", -1.1_f64).await?;
                assert!(matches!(kv.get_key("f64").into_u64().await, Err(_)));
                assert_eq!(
                    kv.get_key("f64").into_u64_lossy(false).await?.unwrap(),
                    0_u64
                );

            
                // For u64 -> i64, the limit is > i64::MAX
                kv.set_numeric_key("u64", i64::MAX as u64 + 1).await?;
                assert!(matches!(kv.get_key("u64").into_i64().await, Err(_)));
                assert_eq!(
                    kv.get_key("u64").into_i64_lossy(true).await?.unwrap(),
                    i64::MAX
                );
                assert_eq!(
                    kv.get_key("u64").into_i64_lossy(false).await?.unwrap(),
                    i64::MIN
                );
            }

            
            // Test that non-numeric keys won't be changed when attempting to incr/decr
            kv.set_key("non-numeric", &String::from("test")).await?;
            assert!(matches!(
                kv.increment_key_by("non-numeric", 1_i64).await,
                Err(_)
            ));
            assert!(matches!(
                kv.decrement_key_by("non-numeric", 1_i64).await,
                Err(_)
            ));
            assert_eq!(
                kv.get_key("non-numeric").into::<String>().await?.unwrap(),
                String::from("test")
            );

            
            // Test that NaN cannot be stored
            kv.set_numeric_key("f64", 0_f64).await?;
            assert!(matches!(
                kv.set_numeric_key("f64", f64::NAN).await,
                Err(bonsaidb_core::Error::NotANumber)
            ));
            // Verify the value was unchanged.
            $crate::assert_f64_eq!(kv.get_key("f64").into_f64().await?.unwrap(), 0.);
            // Try to increment by nan
            assert!(matches!(
                kv.increment_key_by("f64", f64::NAN).await,
                Err(bonsaidb_core::Error::NotANumber)
            ));
            $crate::assert_f64_eq!(kv.get_key("f64").into_f64().await?.unwrap(), 0.);

            
            harness.shutdown().await?;

            
            Ok(())
        }

            
        #[tokio::test]
5
        async fn kv_expiration_tests() -> anyhow::Result<()> {
            use std::time::Duration;

            
            use $crate::keyvalue::{KeyStatus, KeyValue};

            
            let harness = $harness::new($crate::test_util::HarnessTest::KvExpiration).await?;
            let db = harness.connect().await?;

            
            loop {
                let kv = db.with_key_namespace("expiration");

            
                kv.delete_key("a").await?;
                kv.delete_key("b").await?;

            
                // Test that the expiration is updated for key a, but not for key b.
                let timing = $crate::test_util::TimingTest::new(Duration::from_millis(500));
                let (r1, r2) = tokio::join!(
                    kv.set_key("a", &0_u32).expire_in(Duration::from_secs(2)),
                    kv.set_key("b", &0_u32).expire_in(Duration::from_secs(2))
                );
                if timing.elapsed() > Duration::from_millis(500) {
                    println!(
                        "Restarting test {}. Took too long {:?}",
                        line!(),
                        timing.elapsed(),
                    );
                    continue;
                }
                assert_eq!(r1?, KeyStatus::Inserted);
                assert_eq!(r2?, KeyStatus::Inserted);
                let (r1, r2) = tokio::join!(
                    kv.set_key("a", &1_u32).expire_in(Duration::from_secs(4)),
                    kv.set_key("b", &1_u32)
                        .expire_in(Duration::from_secs(100))
                        .keep_existing_expiration()
                );
                if timing.elapsed() > Duration::from_secs(1) {
                    println!(
                        "Restarting test {}. Took too long {:?}",
                        line!(),
                        timing.elapsed(),
                    );
                    continue;
                }

            
                assert_eq!(r1?, KeyStatus::Updated, "a wasn't an update");
                assert_eq!(r2?, KeyStatus::Updated, "b wasn't an update");

            
                let a = kv.get_key("a").into().await?;
                assert_eq!(a, Some(1u32), "a shouldn't have expired yet");

            
                // Before checking the value, make sure we haven't elapsed too
                // much time. If so, just restart the test.
                if !timing.wait_until(Duration::from_secs_f32(3.)).await {
                    println!(
                        "Restarting test {}. Took too long {:?}",
                        line!(),
                        timing.elapsed()
                    );
                    continue;
                }

            
                assert_eq!(kv.get_key("b").await?, None, "b never expired");

            
                timing.wait_until(Duration::from_secs_f32(5.)).await;
                assert_eq!(kv.get_key("a").await?, None, "a never expired");
                break;
            }
            harness.shutdown().await?;

            
            Ok(())
        }

            
        #[tokio::test]
5
        async fn delete_expire_tests() -> anyhow::Result<()> {
            use std::time::Duration;

            
            use $crate::keyvalue::{KeyStatus, KeyValue};

            
            let harness = $harness::new($crate::test_util::HarnessTest::KvDeleteExpire).await?;
            let db = harness.connect().await?;

            
            loop {
                let kv = db.with_key_namespace("delete_expire");

            
                kv.delete_key("a").await?;

            
                let timing = $crate::test_util::TimingTest::new(Duration::from_millis(100));

            
                // Create a key with an expiration. Delete the key. Set a new
                // value at that key with no expiration. Ensure it doesn't
                // expire.
                kv.set_key("a", &0_u32)
                    .expire_in(Duration::from_secs(2))
                    .await?;
                kv.delete_key("a").await?;
                kv.set_key("a", &1_u32).await?;
                if timing.elapsed() > Duration::from_secs(1) {
                    println!(
                        "Restarting test {}. Took too long {:?}",
                        line!(),
                        timing.elapsed(),
                    );
                    continue;
                }
                if !timing.wait_until(Duration::from_secs_f32(2.5)).await {
                    println!(
                        "Restarting test {}. Took too long {:?}",
                        line!(),
                        timing.elapsed()
                    );
                    continue;
                }

            
                assert_eq!(kv.get_key("a").into().await?, Some(1u32));

            
                break;
            }
            harness.shutdown().await?;

            
            Ok(())
        }

            
        #[tokio::test]
5
        async fn kv_transaction_tests() -> anyhow::Result<()> {
            use std::time::Duration;

            
            use $crate::{
                connection::Connection,
                keyvalue::{KeyStatus, KeyValue},
            };
            let harness = $harness::new($crate::test_util::HarnessTest::KvTransactions).await?;
            let db = harness.connect().await?;
            // Generate several transactions that we can validate. Persisting
            // happens in the background, so we delay between each step to give
            // it a moment.
            db.set_key("expires", &0_u32)
                .expire_in(Duration::from_secs(1))
                .await?;
            tokio::time::sleep(Duration::from_millis(100)).await;
            db.set_key("akey", &String::from("avalue")).await?;
            tokio::time::sleep(Duration::from_millis(100)).await;
            db.get_key("akey").and_delete().await?;
            tokio::time::sleep(Duration::from_millis(100)).await;
            db.set_numeric_key("nkey", 0_u64).await?;
            tokio::time::sleep(Duration::from_millis(100)).await;
            db.increment_key_by("nkey", 1_u64).await?;
            tokio::time::sleep(Duration::from_millis(100)).await;
            db.delete_key("nkey").await?;
            tokio::time::sleep(Duration::from_millis(100)).await;
            // Ensure this doesn't generate a transaction.
            db.delete_key("nkey").await?;

            
            tokio::time::sleep(Duration::from_secs(1)).await;

            
            let transactions = Connection::list_executed_transactions(&db, None, None).await?;
            let deleted_keys = transactions
                .iter()
                .filter_map(|tx| tx.changes.keys())
                .flatten()
                .filter(|changed_key| changed_key.deleted)
                .count();
            assert_eq!(deleted_keys, 3);
            let akey_changes = transactions
                .iter()
                .filter_map(|tx| tx.changes.keys())
                .flatten()
                .filter(|changed_key| changed_key.key == "akey")
                .count();
            assert_eq!(akey_changes, 2);
            let nkey_changes = transactions
                .iter()
                .filter_map(|tx| tx.changes.keys())
                .flatten()
                .filter(|changed_key| changed_key.key == "nkey")
                .count();
            assert_eq!(nkey_changes, 3);

            
            harness.shutdown().await?;

            
            Ok(())
        }
    };
}

            
pub struct TimingTest {
    tolerance: Duration,
    start: Instant,
}

            
impl TimingTest {
    #[must_use]
400
    pub fn new(tolerance: Duration) -> Self {
400
        Self {
400
            tolerance,
400
            start: Instant::now(),
400
        }
400
    }

            
600
    pub async fn wait_until(&self, absolute_duration: Duration) -> bool {
24
        let target = self.start + absolute_duration;
24
        let mut now = Instant::now();
24
        if now < target {
24
            tokio::time::sleep_until(target.into()).await;
24
            now = Instant::now();
        }
24
        let amount_past = now.checked_duration_since(target);
24

            
24
        // Return false if we're beyond the tolerance given
24
        amount_past.unwrap_or_default() < self.tolerance
24
    }

            
    #[must_use]
475
    pub fn elapsed(&self) -> Duration {
475
        Instant::now()
475
            .checked_duration_since(self.start)
475
            .unwrap_or_default()
475
    }
}

            
5
pub async fn basic_server_connection_tests<C: StorageConnection>(
5
    server: C,
5
    newdb_name: &str,
5
) -> anyhow::Result<()> {
5
    let mut schemas = server.list_available_schemas().await?;
5
    schemas.sort();
5
    assert!(schemas.contains(&BasicSchema::schema_name()));
5
    assert!(schemas.contains(&SchemaName::new("khonsulabs", "bonsaidb-admin")));

            
5
    let databases = server.list_databases().await?;
41
    assert!(databases.iter().any(|db| db.name == "tests"));

            
5
    server
8
        .create_database::<BasicSchema>(newdb_name, false)
8
        .await?;
8
    server.delete_database(newdb_name).await?;

            
    assert!(matches!(
5
        server.delete_database(newdb_name).await,
        Err(Error::DatabaseNotFound(_))
    ));

            
    assert!(matches!(
5
        server.create_database::<BasicSchema>("tests", false).await,
        Err(Error::DatabaseNameAlreadyTaken(_))
    ));

            
    assert!(matches!(
5
        server.create_database::<BasicSchema>("tests", true).await,
        Ok(_)
    ));

            
    assert!(matches!(
5
        server
5
            .create_database::<BasicSchema>("|invalidname", false)
2
            .await,
        Err(Error::InvalidDatabaseName(_))
    ));

            
    assert!(matches!(
5
        server
5
            .create_database::<UnassociatedCollection>(newdb_name, false)
2
            .await,
        Err(Error::SchemaNotRegistered(_))
    ));

            
5
    Ok(())
5
}