1
#![allow(clippy::missing_panics_doc)]
2

            
3
use std::{
4
    fmt::{Debug, Display},
5
    io::ErrorKind,
6
    ops::Deref,
7
    path::{Path, PathBuf},
8
    sync::{
9
        atomic::{AtomicU32, Ordering},
10
        Arc,
11
    },
12
    time::{Duration, Instant},
13
};
14

            
15
use itertools::Itertools;
16
use serde::{Deserialize, Serialize};
17
use transmog_pot::Pot;
18

            
19
use crate::{
20
    admin::{PermissionGroup, Role, User},
21
    connection::{
22
        AccessPolicy, AsyncConnection, AsyncStorageConnection, Connection, StorageConnection,
23
    },
24
    document::{
25
        BorrowedDocument, CollectionDocument, CollectionHeader, DocumentId, Emit, Header, KeyId,
26
    },
27
    keyvalue::{AsyncKeyValue, KeyValue},
28
    limits::{LIST_TRANSACTIONS_DEFAULT_RESULT_COUNT, LIST_TRANSACTIONS_MAX_RESULTS},
29
    schema::{
30
        view::{
31
            map::{Mappings, ViewMappedValue},
32
            ReduceResult, ViewSchema,
33
        },
34
        Collection, CollectionName, MappedValue, NamedCollection, Qualified, Schema, SchemaName,
35
        Schematic, SerializedCollection, View, ViewMapResult,
36
    },
37
    Error,
38
};
39

            
40
1293694
#[derive(Serialize, Deserialize, Debug, PartialEq, Default, Clone, Collection)]
41
// This collection purposely uses names with characters that need
42
// escaping, since it's used in backup/restore.
43
#[collection(name = "_basic", authority = "khonsulabs_", views = [BasicCount, BasicByParentId, BasicByTag, BasicByCategory], core = crate)]
44
#[must_use]
45
pub struct Basic {
46
    pub value: String,
47
    pub category: Option<String>,
48
    pub parent_id: Option<u64>,
49
    pub tags: Vec<String>,
50
}
51

            
52
impl Basic {
53
2700
    pub fn new(value: impl Into<String>) -> Self {
54
2700
        Self {
55
2700
            value: value.into(),
56
2700
            tags: Vec::default(),
57
2700
            category: None,
58
2700
            parent_id: None,
59
2700
        }
60
2700
    }
61

            
62
40
    pub fn with_category(mut self, category: impl Into<String>) -> Self {
63
40
        self.category = Some(category.into());
64
40
        self
65
40
    }
66

            
67
32
    pub fn with_tag(mut self, tag: impl Into<String>) -> Self {
68
32
        self.tags.push(tag.into());
69
32
        self
70
32
    }
71

            
72
1271
    pub const fn with_parent_id(mut self, parent_id: u64) -> Self {
73
1271
        self.parent_id = Some(parent_id);
74
1271
        self
75
1271
    }
76
}
77

            
78
2074368
#[derive(Debug, Clone, View)]
79
#[view(collection = Basic, key = (), value = usize, name = "count", core = crate)]
80
pub struct BasicCount;
81

            
82
impl ViewSchema for BasicCount {
83
    type View = Self;
84

            
85
31
    fn map(&self, document: &BorrowedDocument<'_>) -> ViewMapResult<Self::View> {
86
31
        document.header.emit_key_and_value((), 1)
87
31
    }
88

            
89
31
    fn reduce(
90
31
        &self,
91
31
        mappings: &[ViewMappedValue<Self::View>],
92
31
        _rereduce: bool,
93
31
    ) -> ReduceResult<Self::View> {
94
31
        Ok(mappings.iter().map(|map| map.value).sum())
95
31
    }
96
}
97

            
98
2108435
#[derive(Debug, Clone, View)]
99
#[view(collection = Basic, key = Option<u64>, value = usize, name = "by-parent-id", core = crate)]
100
pub struct BasicByParentId;
101

            
102
impl ViewSchema for BasicByParentId {
103
    type View = Self;
104

            
105
2542
    fn version(&self) -> u64 {
106
2542
        1
107
2542
    }
108

            
109
2759
    fn map(&self, document: &BorrowedDocument<'_>) -> ViewMapResult<Self::View> {
110
2759
        let contents = Basic::document_contents(document)?;
111
2759
        document.header.emit_key_and_value(contents.parent_id, 1)
112
2759
    }
113

            
114
3255
    fn reduce(
115
3255
        &self,
116
3255
        mappings: &[ViewMappedValue<Self::View>],
117
3255
        _rereduce: bool,
118
3255
    ) -> ReduceResult<Self::View> {
119
3255
        Ok(mappings.iter().map(|map| map.value).sum())
120
3255
    }
121
}
122

            
123
2075823
#[derive(Debug, Clone, View)]
124
#[view(collection = Basic, key = String, value = usize, name = "by-category", core = crate)]
125
pub struct BasicByCategory;
126

            
127
impl ViewSchema for BasicByCategory {
128
    type View = Self;
129

            
130
1271
    fn map(&self, document: &BorrowedDocument<'_>) -> ViewMapResult<Self::View> {
131
1271
        let contents = Basic::document_contents(document)?;
132
1271
        if let Some(category) = &contents.category {
133
744
            document
134
744
                .header
135
744
                .emit_key_and_value(category.to_lowercase(), 1)
136
        } else {
137
527
            Ok(Mappings::none())
138
        }
139
1271
    }
140

            
141
496
    fn reduce(
142
496
        &self,
143
496
        mappings: &[ViewMappedValue<Self::View>],
144
496
        _rereduce: bool,
145
496
    ) -> ReduceResult<Self::View> {
146
744
        Ok(mappings.iter().map(|map| map.value).sum())
147
496
    }
148
}
149

            
150
2089246
#[derive(Debug, Clone, View)]
151
#[view(collection = Basic, key = String, value = usize, name = "by-tag", core = crate)]
152
pub struct BasicByTag;
153

            
154
impl ViewSchema for BasicByTag {
155
    type View = Self;
156

            
157
1023
    fn map(&self, document: &BorrowedDocument<'_>) -> ViewMapResult<Self::View> {
158
1023
        let contents = Basic::document_contents(document)?;
159
1023
        contents
160
1023
            .tags
161
1023
            .iter()
162
1488
            .map(|tag| document.header.emit_key_and_value(tag.clone(), 1))
163
1023
            .collect()
164
1023
    }
165

            
166
1736
    fn reduce(
167
1736
        &self,
168
1736
        mappings: &[ViewMappedValue<Self::View>],
169
1736
        _rereduce: bool,
170
1736
    ) -> ReduceResult<Self::View> {
171
2232
        Ok(mappings.iter().map(|map| map.value).sum())
172
1736
    }
173
}
174

            
175
341
#[derive(Debug, Clone, View)]
176
#[view(collection = Basic, key = (), value = (), name = "by-parent-id", core = crate)]
177
pub struct BasicByBrokenParentId;
178

            
179
impl ViewSchema for BasicByBrokenParentId {
180
    type View = Self;
181

            
182
31
    fn map(&self, document: &BorrowedDocument<'_>) -> ViewMapResult<Self::View> {
183
31
        document.header.emit()
184
31
    }
185
}
186

            
187
2586764
#[derive(Serialize, Deserialize, Debug, PartialEq, Default, Clone, Collection)]
188
#[collection(name = "encrypted-basic", authority = "khonsulabs", views = [EncryptedBasicCount, EncryptedBasicByParentId, EncryptedBasicByCategory])]
189
#[collection(encryption_key = Some(KeyId::Master), encryption_optional, core = crate)]
190
#[must_use]
191
pub struct EncryptedBasic {
192
    pub value: String,
193
    pub category: Option<String>,
194
    pub parent_id: Option<u64>,
195
}
196

            
197
impl EncryptedBasic {
198
1
    pub fn new(value: impl Into<String>) -> Self {
199
1
        Self {
200
1
            value: value.into(),
201
1
            category: None,
202
1
            parent_id: None,
203
1
        }
204
1
    }
205

            
206
    pub fn with_category(mut self, category: impl Into<String>) -> Self {
207
        self.category = Some(category.into());
208
        self
209
    }
210

            
211
    pub const fn with_parent_id(mut self, parent_id: u64) -> Self {
212
        self.parent_id = Some(parent_id);
213
        self
214
    }
215
}
216

            
217
1293692
#[derive(Debug, Clone, View)]
218
#[view(collection = EncryptedBasic, key = (), value = usize, name = "count", core = crate)]
219
pub struct EncryptedBasicCount;
220

            
221
impl ViewSchema for EncryptedBasicCount {
222
    type View = Self;
223

            
224
    fn map(&self, document: &BorrowedDocument<'_>) -> ViewMapResult<Self::View> {
225
        document.header.emit_key_and_value((), 1)
226
    }
227

            
228
    fn reduce(
229
        &self,
230
        mappings: &[ViewMappedValue<Self::View>],
231
        _rereduce: bool,
232
    ) -> ReduceResult<Self::View> {
233
        Ok(mappings.iter().map(|map| map.value).sum())
234
    }
235
}
236

            
237
1293692
#[derive(Debug, Clone, View)]
238
#[view(collection = EncryptedBasic, key = Option<u64>, value = usize, name = "by-parent-id", core = crate)]
239
pub struct EncryptedBasicByParentId;
240

            
241
impl ViewSchema for EncryptedBasicByParentId {
242
    type View = Self;
243

            
244
    fn map(&self, document: &BorrowedDocument<'_>) -> ViewMapResult<Self::View> {
245
        let contents = EncryptedBasic::document_contents(document)?;
246
        document.header.emit_key_and_value(contents.parent_id, 1)
247
    }
248

            
249
    fn reduce(
250
        &self,
251
        mappings: &[ViewMappedValue<Self::View>],
252
        _rereduce: bool,
253
    ) -> ReduceResult<Self::View> {
254
        Ok(mappings.iter().map(|map| map.value).sum())
255
    }
256
}
257

            
258
1293692
#[derive(Debug, Clone, View)]
259
#[view(collection = EncryptedBasic, key = String, value = usize, name = "by-category", core = crate)]
260
pub struct EncryptedBasicByCategory;
261

            
262
impl ViewSchema for EncryptedBasicByCategory {
263
    type View = Self;
264

            
265
    fn map(&self, document: &BorrowedDocument<'_>) -> ViewMapResult<Self::View> {
266
        let contents = EncryptedBasic::document_contents(document)?;
267
        if let Some(category) = &contents.category {
268
            document
269
                .header
270
                .emit_key_and_value(category.to_lowercase(), 1)
271
        } else {
272
            Ok(Mappings::none())
273
        }
274
    }
275

            
276
    fn reduce(
277
        &self,
278
        mappings: &[ViewMappedValue<Self::View>],
279
        _rereduce: bool,
280
    ) -> ReduceResult<Self::View> {
281
        Ok(mappings.iter().map(|map| map.value).sum())
282
    }
283
}
284

            
285
1293382
#[derive(Debug, Schema)]
286
#[schema(name = "basic", collections = [Basic, EncryptedBasic, Unique], core = crate)]
287
pub struct BasicSchema;
288

            
289
1293382
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Default, Collection)]
290
#[collection(name = "unique", authority = "khonsulabs", views = [UniqueValue], core = crate)]
291
pub struct Unique {
292
    pub value: String,
293
}
294

            
295
impl Unique {
296
40
    pub fn new(value: impl Display) -> Self {
297
40
        Self {
298
40
            value: value.to_string(),
299
40
        }
300
40
    }
301
}
302

            
303
1307518
#[derive(Debug, Clone, View)]
304
#[view(collection = Unique, key = String, value = (), name = "unique-value", core = crate)]
305
pub struct UniqueValue;
306

            
307
impl ViewSchema for UniqueValue {
308
    type View = Self;
309

            
310
1300574
    fn unique(&self) -> bool {
311
1300574
        true
312
1300574
    }
313

            
314
1984
    fn map(&self, document: &BorrowedDocument<'_>) -> ViewMapResult<Self::View> {
315
1984
        let entry = Unique::document_contents(document)?;
316
1984
        document.header.emit_key(entry.value)
317
1984
    }
318
}
319

            
320
impl NamedCollection for Unique {
321
    type ByNameView = UniqueValue;
322
}
323

            
324
pub struct TestDirectory(pub PathBuf);
325

            
326
impl TestDirectory {
327
    pub fn absolute<S: AsRef<Path>>(path: S) -> Self {
328
        let path = path.as_ref().to_owned();
329
        if path.exists() {
330
            std::fs::remove_dir_all(&path).expect("error clearing temporary directory");
331
        }
332
        Self(path)
333
    }
334
183
    pub fn new<S: AsRef<Path>>(name: S) -> Self {
335
183
        let path = std::env::temp_dir().join(name);
336
183
        if path.exists() {
337
            std::fs::remove_dir_all(&path).expect("error clearing temporary directory");
338
183
        }
339
183
        Self(path)
340
183
    }
341
}
342

            
343
impl Drop for TestDirectory {
344
    fn drop(&mut self) {
345
5673
        if let Err(err) = std::fs::remove_dir_all(&self.0) {
346
            if err.kind() != ErrorKind::NotFound {
347
                eprintln!("Failed to clean up temporary folder: {:?}", err);
348
            }
349
5673
        }
350
5673
    }
351
}
352

            
353
impl AsRef<Path> for TestDirectory {
354
6138
    fn as_ref(&self) -> &Path {
355
6138
        &self.0
356
6138
    }
357
}
358

            
359
impl Deref for TestDirectory {
360
    type Target = PathBuf;
361

            
362
31
    fn deref(&self) -> &Self::Target {
363
31
        &self.0
364
31
    }
365
}
366

            
367
#[derive(Debug)]
368
pub struct BasicCollectionWithNoViews;
369

            
370
impl Collection for BasicCollectionWithNoViews {
371
    type PrimaryKey = u64;
372

            
373
248
    fn collection_name() -> CollectionName {
374
248
        Basic::collection_name()
375
248
    }
376

            
377
62
    fn define_views(_schema: &mut Schematic) -> Result<(), Error> {
378
62
        Ok(())
379
62
    }
380
}
381

            
382
impl SerializedCollection for BasicCollectionWithNoViews {
383
    type Contents = Basic;
384
    type Format = Pot;
385

            
386
31
    fn format() -> Self::Format {
387
31
        Pot::default()
388
31
    }
389
}
390

            
391
#[derive(Debug)]
392
pub struct BasicCollectionWithOnlyBrokenParentId;
393

            
394
impl Collection for BasicCollectionWithOnlyBrokenParentId {
395
    type PrimaryKey = u64;
396

            
397
217
    fn collection_name() -> CollectionName {
398
217
        Basic::collection_name()
399
217
    }
400

            
401
62
    fn define_views(schema: &mut Schematic) -> Result<(), Error> {
402
62
        schema.define_view(BasicByBrokenParentId)
403
62
    }
404
}
405

            
406
496
#[derive(Serialize, Deserialize, Clone, Debug, Collection)]
407
#[collection(name = "unassociated", authority = "khonsulabs", core = crate)]
408
pub struct UnassociatedCollection;
409

            
410
7657
#[derive(Copy, Clone, Debug)]
411
pub enum HarnessTest {
412
    ServerConnectionTests = 1,
413
    StoreRetrieveUpdate,
414
    NotFound,
415
    Conflict,
416
    BadUpdate,
417
    NoUpdate,
418
    GetMultiple,
419
    List,
420
    ListTransactions,
421
    ViewQuery,
422
    UnassociatedCollection,
423
    Compact,
424
    ViewUpdate,
425
    ViewMultiEmit,
426
    ViewUnimplementedReduce,
427
    ViewAccessPolicies,
428
    Encryption,
429
    UniqueViews,
430
    NamedCollection,
431
    PubSubSimple,
432
    UserManagement,
433
    PubSubMultipleSubscribers,
434
    PubSubDropAndSend,
435
    PubSubUnsubscribe,
436
    PubSubDropCleanup,
437
    PubSubPublishAll,
438
    KvBasic,
439
    KvConcurrency,
440
    KvSet,
441
    KvIncrementDecrement,
442
    KvExpiration,
443
    KvDeleteExpire,
444
    KvTransactions,
445
}
446

            
447
impl HarnessTest {
448
    #[must_use]
449
    pub const fn port(self, base: u16) -> u16 {
450
        base + self as u16
451
    }
452
}
453

            
454
impl Display for HarnessTest {
455
7688
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
456
7688
        Debug::fmt(&self, f)
457
7688
    }
458
}
459

            
460
/// Compares two f64's accounting for the epsilon.
461
#[macro_export]
462
macro_rules! assert_f64_eq {
463
    ($a:expr, $b:expr) => {{
464
        let a: f64 = $a;
465
        let b: f64 = $b;
466
        assert!((a - b).abs() <= f64::EPSILON, "{:?} <> {:?}", a, b);
467
    }};
468
}
469

            
470
/// Creates a test suite that tests methods available on [`AsyncConnection`]
471
#[macro_export]
472
macro_rules! define_async_connection_test_suite {
473
    ($harness:ident) => {
474
        #[cfg(test)]
475
        mod r#async_connection {
476
            use super::$harness;
477
            #[tokio::test]
478
5
            async fn server_connection_tests() -> anyhow::Result<()> {
479
                let harness =
480
                    $harness::new($crate::test_util::HarnessTest::ServerConnectionTests).await?;
481
                let db = harness.server();
482
                $crate::test_util::basic_server_connection_tests(
483
                    db.clone(),
484
                    &format!("server-connection-tests-{}", $harness::server_name()),
485
                )
486
                .await?;
487
                harness.shutdown().await
488
            }
489

            
490
            #[tokio::test]
491
5
            async fn store_retrieve_update_delete() -> anyhow::Result<()> {
492
                let harness =
493
                    $harness::new($crate::test_util::HarnessTest::StoreRetrieveUpdate).await?;
494
                let db = harness.connect().await?;
495
                $crate::test_util::store_retrieve_update_delete_tests(&db).await?;
496
                harness.shutdown().await
497
            }
498

            
499
            #[tokio::test]
500
5
            async fn not_found() -> anyhow::Result<()> {
501
                let harness = $harness::new($crate::test_util::HarnessTest::NotFound).await?;
502
                let db = harness.connect().await?;
503

            
504
                $crate::test_util::not_found_tests(&db).await?;
505
                harness.shutdown().await
506
            }
507

            
508
            #[tokio::test]
509
5
            async fn conflict() -> anyhow::Result<()> {
510
                let harness = $harness::new($crate::test_util::HarnessTest::Conflict).await?;
511
                let db = harness.connect().await?;
512

            
513
                $crate::test_util::conflict_tests(&db).await?;
514
                harness.shutdown().await
515
            }
516

            
517
            #[tokio::test]
518
5
            async fn bad_update() -> anyhow::Result<()> {
519
                let harness = $harness::new($crate::test_util::HarnessTest::BadUpdate).await?;
520
                let db = harness.connect().await?;
521

            
522
                $crate::test_util::bad_update_tests(&db).await?;
523
                harness.shutdown().await
524
            }
525

            
526
            #[tokio::test]
527
5
            async fn no_update() -> anyhow::Result<()> {
528
                let harness = $harness::new($crate::test_util::HarnessTest::NoUpdate).await?;
529
                let db = harness.connect().await?;
530

            
531
                $crate::test_util::no_update_tests(&db).await?;
532
                harness.shutdown().await
533
            }
534

            
535
            #[tokio::test]
536
5
            async fn get_multiple() -> anyhow::Result<()> {
537
                let harness = $harness::new($crate::test_util::HarnessTest::GetMultiple).await?;
538
                let db = harness.connect().await?;
539

            
540
                $crate::test_util::get_multiple_tests(&db).await?;
541
                harness.shutdown().await
542
            }
543

            
544
            #[tokio::test]
545
5
            async fn list() -> anyhow::Result<()> {
546
                let harness = $harness::new($crate::test_util::HarnessTest::List).await?;
547
                let db = harness.connect().await?;
548

            
549
                $crate::test_util::list_tests(&db).await?;
550
                harness.shutdown().await
551
            }
552

            
553
            #[tokio::test]
554
5
            async fn list_transactions() -> anyhow::Result<()> {
555
                let harness =
556
                    $harness::new($crate::test_util::HarnessTest::ListTransactions).await?;
557
                let db = harness.connect().await?;
558

            
559
                $crate::test_util::list_transactions_tests(&db).await?;
560
                harness.shutdown().await
561
            }
562

            
563
            #[tokio::test]
564
5
            async fn view_query() -> anyhow::Result<()> {
565
                let harness = $harness::new($crate::test_util::HarnessTest::ViewQuery).await?;
566
                let db = harness.connect().await?;
567

            
568
                $crate::test_util::view_query_tests(&db).await?;
569
                harness.shutdown().await
570
            }
571

            
572
            #[tokio::test]
573
5
            async fn unassociated_collection() -> anyhow::Result<()> {
574
                let harness =
575
                    $harness::new($crate::test_util::HarnessTest::UnassociatedCollection).await?;
576
                let db = harness.connect().await?;
577

            
578
                $crate::test_util::unassociated_collection_tests(&db).await?;
579
                harness.shutdown().await
580
            }
581

            
582
            #[tokio::test]
583
5
            async fn unimplemented_reduce() -> anyhow::Result<()> {
584
                let harness =
585
                    $harness::new($crate::test_util::HarnessTest::ViewUnimplementedReduce).await?;
586
                let db = harness.connect().await?;
587

            
588
                $crate::test_util::unimplemented_reduce(&db).await?;
589
                harness.shutdown().await
590
            }
591

            
592
            #[tokio::test]
593
5
            async fn view_update() -> anyhow::Result<()> {
594
                let harness = $harness::new($crate::test_util::HarnessTest::ViewUpdate).await?;
595
                let db = harness.connect().await?;
596

            
597
                $crate::test_util::view_update_tests(&db).await?;
598
                harness.shutdown().await
599
            }
600

            
601
            #[tokio::test]
602
5
            async fn view_multi_emit() -> anyhow::Result<()> {
603
                let harness = $harness::new($crate::test_util::HarnessTest::ViewMultiEmit).await?;
604
                let db = harness.connect().await?;
605

            
606
                $crate::test_util::view_multi_emit_tests(&db).await?;
607
                harness.shutdown().await
608
            }
609

            
610
            #[tokio::test]
611
5
            async fn view_access_policies() -> anyhow::Result<()> {
612
                let harness =
613
                    $harness::new($crate::test_util::HarnessTest::ViewAccessPolicies).await?;
614
                let db = harness.connect().await?;
615

            
616
                $crate::test_util::view_access_policy_tests(&db).await?;
617
                harness.shutdown().await
618
            }
619

            
620
            #[tokio::test]
621
5
            async fn unique_views() -> anyhow::Result<()> {
622
                let harness = $harness::new($crate::test_util::HarnessTest::UniqueViews).await?;
623
                let db = harness.connect().await?;
624

            
625
                $crate::test_util::unique_view_tests(&db).await?;
626
                harness.shutdown().await
627
            }
628

            
629
            #[tokio::test]
630
5
            async fn named_collection() -> anyhow::Result<()> {
631
                let harness =
632
                    $harness::new($crate::test_util::HarnessTest::NamedCollection).await?;
633
                let db = harness.connect().await?;
634

            
635
                $crate::test_util::named_collection_tests(&db).await?;
636
                harness.shutdown().await
637
            }
638

            
639
            #[tokio::test]
640
5
            async fn user_management() -> anyhow::Result<()> {
641
                use $crate::connection::AsyncStorageConnection;
642
                let harness = $harness::new($crate::test_util::HarnessTest::UserManagement).await?;
643
                let _db = harness.connect().await?;
644
                let server = harness.server();
645
                let admin = server
646
                    .database::<$crate::admin::Admin>($crate::admin::ADMIN_DATABASE_NAME)
647
                    .await?;
648

            
649
                $crate::test_util::user_management_tests(
650
                    &admin,
651
                    server.clone(),
652
                    $harness::server_name(),
653
                )
654
                .await?;
655
                harness.shutdown().await
656
            }
657

            
658
            #[tokio::test]
659
5
            async fn compaction() -> anyhow::Result<()> {
660
                let harness = $harness::new($crate::test_util::HarnessTest::Compact).await?;
661
                let db = harness.connect().await?;
662

            
663
                $crate::test_util::compaction_tests(&db).await?;
664
                harness.shutdown().await
665
            }
666
        }
667
    };
668
}
669

            
670
/// Creates a test suite that tests methods available on [`AsyncConnection`]
671
#[macro_export]
672
macro_rules! define_blocking_connection_test_suite {
673
    ($harness:ident) => {
674
        #[cfg(test)]
675
        mod blocking_connection {
676
            use super::$harness;
677
            #[test]
678
3
            fn server_connection_tests() -> anyhow::Result<()> {
679
3
                let harness = $harness::new($crate::test_util::HarnessTest::ServerConnectionTests)?;
680
3
                let db = harness.server();
681
3
                $crate::test_util::blocking_basic_server_connection_tests(
682
3
                    db,
683
3
                    &format!("server-connection-tests-{}", $harness::server_name()),
684
3
                )?;
685
3
                harness.shutdown()
686
3
            }
687

            
688
            #[test]
689
3
            fn store_retrieve_update_delete() -> anyhow::Result<()> {
690
3
                let harness = $harness::new($crate::test_util::HarnessTest::StoreRetrieveUpdate)?;
691
3
                let db = harness.connect()?;
692
3
                $crate::test_util::blocking_store_retrieve_update_delete_tests(&db)?;
693
3
                harness.shutdown()
694
3
            }
695

            
696
            #[test]
697
3
            fn not_found() -> anyhow::Result<()> {
698
3
                let harness = $harness::new($crate::test_util::HarnessTest::NotFound)?;
699
3
                let db = harness.connect()?;
700

            
701
3
                $crate::test_util::blocking_not_found_tests(&db)?;
702
3
                harness.shutdown()
703
3
            }
704

            
705
            #[test]
706
3
            fn conflict() -> anyhow::Result<()> {
707
3
                let harness = $harness::new($crate::test_util::HarnessTest::Conflict)?;
708
3
                let db = harness.connect()?;
709

            
710
3
                $crate::test_util::blocking_conflict_tests(&db)?;
711
3
                harness.shutdown()
712
3
            }
713

            
714
            #[test]
715
3
            fn bad_update() -> anyhow::Result<()> {
716
3
                let harness = $harness::new($crate::test_util::HarnessTest::BadUpdate)?;
717
3
                let db = harness.connect()?;
718

            
719
3
                $crate::test_util::blocking_bad_update_tests(&db)?;
720
3
                harness.shutdown()
721
3
            }
722

            
723
            #[test]
724
3
            fn no_update() -> anyhow::Result<()> {
725
3
                let harness = $harness::new($crate::test_util::HarnessTest::NoUpdate)?;
726
3
                let db = harness.connect()?;
727

            
728
3
                $crate::test_util::blocking_no_update_tests(&db)?;
729
3
                harness.shutdown()
730
3
            }
731

            
732
            #[test]
733
3
            fn get_multiple() -> anyhow::Result<()> {
734
3
                let harness = $harness::new($crate::test_util::HarnessTest::GetMultiple)?;
735
3
                let db = harness.connect()?;
736

            
737
3
                $crate::test_util::blocking_get_multiple_tests(&db)?;
738
3
                harness.shutdown()
739
3
            }
740

            
741
            #[test]
742
3
            fn list() -> anyhow::Result<()> {
743
3
                let harness = $harness::new($crate::test_util::HarnessTest::List)?;
744
3
                let db = harness.connect()?;
745

            
746
3
                $crate::test_util::blocking_list_tests(&db)?;
747
3
                harness.shutdown()
748
3
            }
749

            
750
            #[test]
751
3
            fn list_transactions() -> anyhow::Result<()> {
752
3
                let harness = $harness::new($crate::test_util::HarnessTest::ListTransactions)?;
753
3
                let db = harness.connect()?;
754

            
755
3
                $crate::test_util::blocking_list_transactions_tests(&db)?;
756
3
                harness.shutdown()
757
3
            }
758

            
759
            #[test]
760
3
            fn view_query() -> anyhow::Result<()> {
761
3
                let harness = $harness::new($crate::test_util::HarnessTest::ViewQuery)?;
762
3
                let db = harness.connect()?;
763

            
764
3
                $crate::test_util::blocking_view_query_tests(&db)?;
765
3
                harness.shutdown()
766
3
            }
767

            
768
            #[test]
769
3
            fn unassociated_collection() -> anyhow::Result<()> {
770
3
                let harness =
771
3
                    $harness::new($crate::test_util::HarnessTest::UnassociatedCollection)?;
772
3
                let db = harness.connect()?;
773

            
774
3
                $crate::test_util::blocking_unassociated_collection_tests(&db)?;
775
3
                harness.shutdown()
776
3
            }
777

            
778
            #[test]
779
3
            fn unimplemented_reduce() -> anyhow::Result<()> {
780
3
                let harness =
781
3
                    $harness::new($crate::test_util::HarnessTest::ViewUnimplementedReduce)?;
782
3
                let db = harness.connect()?;
783

            
784
3
                $crate::test_util::blocking_unimplemented_reduce(&db)?;
785
3
                harness.shutdown()
786
3
            }
787

            
788
            #[test]
789
3
            fn view_update() -> anyhow::Result<()> {
790
3
                let harness = $harness::new($crate::test_util::HarnessTest::ViewUpdate)?;
791
3
                let db = harness.connect()?;
792

            
793
3
                $crate::test_util::blocking_view_update_tests(&db)?;
794
3
                harness.shutdown()
795
3
            }
796

            
797
            #[test]
798
3
            fn view_multi_emit() -> anyhow::Result<()> {
799
3
                let harness = $harness::new($crate::test_util::HarnessTest::ViewMultiEmit)?;
800
3
                let db = harness.connect()?;
801

            
802
3
                $crate::test_util::blocking_view_multi_emit_tests(&db)?;
803
3
                harness.shutdown()
804
3
            }
805

            
806
            #[test]
807
3
            fn view_access_policies() -> anyhow::Result<()> {
808
3
                let harness = $harness::new($crate::test_util::HarnessTest::ViewAccessPolicies)?;
809
3
                let db = harness.connect()?;
810

            
811
3
                $crate::test_util::blocking_view_access_policy_tests(&db)?;
812
3
                harness.shutdown()
813
3
            }
814

            
815
            #[test]
816
3
            fn unique_views() -> anyhow::Result<()> {
817
3
                let harness = $harness::new($crate::test_util::HarnessTest::UniqueViews)?;
818
3
                let db = harness.connect()?;
819

            
820
3
                $crate::test_util::blocking_unique_view_tests(&db)?;
821
3
                harness.shutdown()
822
3
            }
823

            
824
            #[test]
825
3
            fn named_collection() -> anyhow::Result<()> {
826
3
                let harness = $harness::new($crate::test_util::HarnessTest::NamedCollection)?;
827
3
                let db = harness.connect()?;
828

            
829
3
                $crate::test_util::blocking_named_collection_tests(&db)?;
830
3
                harness.shutdown()
831
3
            }
832

            
833
            #[test]
834
3
            fn user_management() -> anyhow::Result<()> {
835
                use $crate::connection::StorageConnection;
836
3
                let harness = $harness::new($crate::test_util::HarnessTest::UserManagement)?;
837
3
                let _db = harness.connect()?;
838
3
                let server = harness.server();
839
3
                let admin =
840
3
                    server.database::<$crate::admin::Admin>($crate::admin::ADMIN_DATABASE_NAME)?;
841

            
842
3
                $crate::test_util::blocking_user_management_tests(
843
3
                    &admin,
844
3
                    server,
845
3
                    $harness::server_name(),
846
3
                )?;
847
3
                harness.shutdown()
848
3
            }
849

            
850
            #[test]
851
3
            fn compaction() -> anyhow::Result<()> {
852
3
                let harness = $harness::new($crate::test_util::HarnessTest::Compact)?;
853
3
                let db = harness.connect()?;
854

            
855
3
                $crate::test_util::blocking_compaction_tests(&db)?;
856
3
                harness.shutdown()
857
3
            }
858
        }
859
    };
860
}
861

            
862
506
pub async fn store_retrieve_update_delete_tests<C: AsyncConnection>(db: &C) -> anyhow::Result<()> {
863
506
    let original_value = Basic::new("initial_value");
864
506
    let collection = db.collection::<Basic>();
865
2098
    let header = collection.push(&original_value).await?;
866

            
867
506
    let mut doc = collection
868
1473
        .get(header.id)
869
1473
        .await?
870
506
        .expect("couldn't retrieve stored item");
871
506
    let mut value = Basic::document_contents(&doc)?;
872
506
    assert_eq!(original_value, value);
873
506
    let old_revision = doc.header.revision;
874
506

            
875
506
    // Update the value
876
506
    value.value = String::from("updated_value");
877
506
    Basic::set_document_contents(&mut doc, value.clone())?;
878
1611
    db.update::<Basic, _>(&mut doc).await?;
879

            
880
    // update should cause the revision to be changed
881
506
    assert_ne!(doc.header.revision, old_revision);
882

            
883
    // Check the value in the database to ensure it has the new document
884
506
    let doc = collection
885
1433
        .get(header.id)
886
1433
        .await?
887
506
        .expect("couldn't retrieve stored item");
888
506
    assert_eq!(Basic::document_contents(&doc)?, value);
889

            
890
    // These operations should have created two transactions with one change each
891
1443
    let transactions = db.list_executed_transactions(None, None).await?;
892
506
    assert_eq!(transactions.len(), 2);
893
506
    assert!(transactions[0].id < transactions[1].id);
894
1518
    for transaction in &transactions {
895
1012
        let changes = transaction
896
1012
            .changes
897
1012
            .documents()
898
1012
            .expect("incorrect transaction type");
899
1012
        assert_eq!(changes.documents.len(), 1);
900
1012
        assert_eq!(changes.collections.len(), 1);
901
1012
        assert_eq!(changes.collections[0], Basic::collection_name());
902
1012
        assert_eq!(changes.documents[0].collection, 0);
903
1012
        assert_eq!(header.id, changes.documents[0].id.deserialize()?);
904
1012
        assert!(!changes.documents[0].deleted);
905
    }
906

            
907
1652
    db.collection::<Basic>().delete(&doc).await?;
908
1431
    assert!(collection.get(header.id).await?.is_none());
909
506
    let transactions = db
910
1558
        .list_executed_transactions(Some(transactions.last().as_ref().unwrap().id + 1), None)
911
1558
        .await?;
912
506
    assert_eq!(transactions.len(), 1);
913
506
    let transaction = transactions.first().unwrap();
914
506
    let changes = transaction
915
506
        .changes
916
506
        .documents()
917
506
        .expect("incorrect transaction type");
918
506
    assert_eq!(changes.documents.len(), 1);
919
506
    assert_eq!(changes.collections[0], Basic::collection_name());
920
506
    assert_eq!(header.id, changes.documents[0].id.deserialize()?);
921
506
    assert!(changes.documents[0].deleted);
922

            
923
    // Use the Collection interface
924
1586
    let mut doc = original_value.clone().push_into_async(db).await?;
925
506
    doc.contents.category = Some(String::from("updated"));
926
1634
    doc.update_async(db).await?;
927
1462
    let reloaded = Basic::get_async(doc.header.id, db).await?.unwrap();
928
506
    assert_eq!(doc.contents, reloaded.contents);
929

            
930
    // Test Connection::insert with a specified id
931
506
    let doc = BorrowedDocument::with_contents::<Basic>(42, &Basic::new("42"))?;
932
506
    let document_42 = db
933
1608
        .insert::<Basic, _, _>(Some(doc.header.id), doc.contents.into_vec())
934
1608
        .await?;
935
506
    assert_eq!(document_42.id, 42);
936
1591
    let document_43 = Basic::new("43").insert_into_async(43, db).await?;
937
506
    assert_eq!(document_43.header.id, 43);
938

            
939
    // Test that inserting a document with the same ID results in a conflict:
940
506
    let conflict_err = Basic::new("43")
941
1517
        .insert_into_async(doc.header.id, db)
942
1517
        .await
943
506
        .unwrap_err();
944
506
    assert!(matches!(conflict_err.error, Error::DocumentConflict(..)));
945

            
946
    // Test that overwriting works
947
506
    let overwritten = Basic::new("43")
948
1665
        .overwrite_into_async(doc.header.id, db)
949
1665
        .await
950
506
        .unwrap();
951
506
    assert!(overwritten.header.revision.id > doc.header.revision.id);
952

            
953
506
    Ok(())
954
506
}
955

            
956
3
pub fn blocking_store_retrieve_update_delete_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
957
3
    let original_value = Basic::new("initial_value");
958
3
    let collection = db.collection::<Basic>();
959
3
    let header = collection.push(&original_value)?;
960

            
961
3
    let mut doc = collection
962
3
        .get(header.id)?
963
3
        .expect("couldn't retrieve stored item");
964
3
    let mut value = Basic::document_contents(&doc)?;
965
3
    assert_eq!(original_value, value);
966
3
    let old_revision = doc.header.revision;
967
3

            
968
3
    // Update the value
969
3
    value.value = String::from("updated_value");
970
3
    Basic::set_document_contents(&mut doc, value.clone())?;
971
3
    db.update::<Basic, _>(&mut doc)?;
972

            
973
    // update should cause the revision to be changed
974
3
    assert_ne!(doc.header.revision, old_revision);
975

            
976
    // Check the value in the database to ensure it has the new document
977
3
    let doc = collection
978
3
        .get(header.id)?
979
3
        .expect("couldn't retrieve stored item");
980
3
    assert_eq!(Basic::document_contents(&doc)?, value);
981

            
982
    // These operations should have created two transactions with one change each
983
3
    let transactions = db.list_executed_transactions(None, None)?;
984
3
    assert_eq!(transactions.len(), 2);
985
3
    assert!(transactions[0].id < transactions[1].id);
986
9
    for transaction in &transactions {
987
6
        let changes = transaction
988
6
            .changes
989
6
            .documents()
990
6
            .expect("incorrect transaction type");
991
6
        assert_eq!(changes.documents.len(), 1);
992
6
        assert_eq!(changes.collections.len(), 1);
993
6
        assert_eq!(changes.collections[0], Basic::collection_name());
994
6
        assert_eq!(changes.documents[0].collection, 0);
995
6
        assert_eq!(header.id, changes.documents[0].id.deserialize()?);
996
6
        assert!(!changes.documents[0].deleted);
997
    }
998

            
999
3
    db.collection::<Basic>().delete(&doc)?;
3
    assert!(collection.get(header.id)?.is_none());
3
    let transactions =
3
        db.list_executed_transactions(Some(transactions.last().as_ref().unwrap().id + 1), None)?;
3
    assert_eq!(transactions.len(), 1);
3
    let transaction = transactions.first().unwrap();
3
    let changes = transaction
3
        .changes
3
        .documents()
3
        .expect("incorrect transaction type");
3
    assert_eq!(changes.documents.len(), 1);
3
    assert_eq!(changes.collections[0], Basic::collection_name());
3
    assert_eq!(header.id, changes.documents[0].id.deserialize()?);
3
    assert!(changes.documents[0].deleted);

            
    // Use the Collection interface
3
    let mut doc = original_value.push_into(db)?;
3
    doc.contents.category = Some(String::from("updated"));
3
    doc.update(db)?;
3
    let reloaded = Basic::get(doc.header.id, db)?.unwrap();
3
    assert_eq!(doc.contents, reloaded.contents);

            
    // Test Connection::insert with a specified id
3
    let doc = BorrowedDocument::with_contents::<Basic>(42, &Basic::new("42"))?;
3
    let document_42 = db.insert::<Basic, _, _>(Some(doc.header.id), doc.contents.into_vec())?;
3
    assert_eq!(document_42.id, 42);
3
    let document_43 = Basic::new("43").insert_into(43, db)?;
3
    assert_eq!(document_43.header.id, 43);

            
    // Test that inserting a document with the same ID results in a conflict:
3
    let conflict_err = Basic::new("43").insert_into(doc.header.id, db).unwrap_err();
3
    assert!(matches!(conflict_err.error, Error::DocumentConflict(..)));

            
    // Test that overwriting works
3
    let overwritten = Basic::new("43").overwrite_into(doc.header.id, db).unwrap();
3
    assert!(overwritten.header.revision.id > doc.header.revision.id);

            
3
    Ok(())
3
}

            
5
pub async fn not_found_tests<C: AsyncConnection>(db: &C) -> anyhow::Result<()> {
5
    assert!(db.collection::<Basic>().get(1).await?.is_none());

            
5
    assert!(db.last_transaction_id().await?.is_none());

            
5
    Ok(())
5
}

            
3
pub fn blocking_not_found_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
3
    assert!(db.collection::<Basic>().get(1)?.is_none());

            
3
    assert!(db.last_transaction_id()?.is_none());

            
3
    Ok(())
3
}

            
5
pub async fn conflict_tests<C: AsyncConnection>(db: &C) -> anyhow::Result<()> {
5
    let original_value = Basic::new("initial_value");
5
    let collection = db.collection::<Basic>();
5
    let header = collection.push(&original_value).await?;

            
5
    let mut doc = collection
5
        .get(header.id)
5
        .await?
5
        .expect("couldn't retrieve stored item");
5
    let mut value = Basic::document_contents(&doc)?;
5
    value.value = String::from("updated_value");
5
    Basic::set_document_contents(&mut doc, value.clone())?;
5
    db.update::<Basic, _>(&mut doc).await?;

            
    // To generate a conflict, let's try to do the same update again by
    // reverting the header
5
    doc.header = Header::try_from(header).unwrap();
5
    match db
5
        .update::<Basic, _>(&mut doc)
5
        .await
5
        .expect_err("conflict should have generated an error")
    {
5
        Error::DocumentConflict(collection, header) => {
5
            assert_eq!(collection, Basic::collection_name());
5
            assert_eq!(header.id, doc.header.id);
        }
        other => return Err(anyhow::Error::from(other)),
    }

            
    // Let's force an update through overwrite. After this succeeds, the header
    // is updated to the new revision.
5
    db.collection::<Basic>().overwrite(&mut doc).await.unwrap();

            
    // Now, let's use the CollectionDocument API to modify the document through a refetch.
5
    let mut doc = CollectionDocument::<Basic>::try_from(&doc)?;
5
    doc.modify_async(db, |doc| {
5
        doc.contents.value = String::from("modify worked");
5
    })
5
    .await?;
5
    assert_eq!(doc.contents.value, "modify worked");
5
    let doc = Basic::get_async(doc.header.id, db).await?.unwrap();
5
    assert_eq!(doc.contents.value, "modify worked");

            
5
    Ok(())
5
}

            
3
pub fn blocking_conflict_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
3
    let original_value = Basic::new("initial_value");
3
    let collection = db.collection::<Basic>();
3
    let header = collection.push(&original_value)?;

            
3
    let mut doc = collection
3
        .get(header.id)?
3
        .expect("couldn't retrieve stored item");
3
    let mut value = Basic::document_contents(&doc)?;
3
    value.value = String::from("updated_value");
3
    Basic::set_document_contents(&mut doc, value)?;
3
    db.update::<Basic, _>(&mut doc)?;

            
    // To generate a conflict, let's try to do the same update again by
    // reverting the header
3
    doc.header = Header::try_from(header).unwrap();
3
    match db
3
        .update::<Basic, _>(&mut doc)
3
        .expect_err("conflict should have generated an error")
    {
3
        Error::DocumentConflict(collection, header) => {
3
            assert_eq!(collection, Basic::collection_name());
3
            assert_eq!(header.id, doc.header.id);
        }
        other => return Err(anyhow::Error::from(other)),
    }

            
    // Let's force an update through overwrite. After this succeeds, the header
    // is updated to the new revision.
3
    db.collection::<Basic>().overwrite(&mut doc).unwrap();

            
    // Now, let's use the CollectionDocument API to modify the document through a refetch.
3
    let mut doc = CollectionDocument::<Basic>::try_from(&doc)?;
3
    doc.modify(db, |doc| {
3
        doc.contents.value = String::from("modify worked");
3
    })?;
3
    assert_eq!(doc.contents.value, "modify worked");
3
    let doc = Basic::get(doc.header.id, db)?.unwrap();
3
    assert_eq!(doc.contents.value, "modify worked");

            
3
    Ok(())
3
}

            
5
pub async fn bad_update_tests<C: AsyncConnection>(db: &C) -> anyhow::Result<()> {
5
    let mut doc = BorrowedDocument::with_contents::<Basic>(1, &Basic::default())?;
5
    match db.update::<Basic, _>(&mut doc).await {
5
        Err(Error::DocumentNotFound(collection, id)) => {
5
            assert_eq!(collection, Basic::collection_name());
5
            assert_eq!(id.as_ref(), &DocumentId::from_u64(1));
5
            Ok(())
        }
        other => panic!("expected DocumentNotFound from update but got: {:?}", other),
    }
5
}

            
3
pub fn blocking_bad_update_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
3
    let mut doc = BorrowedDocument::with_contents::<Basic>(1, &Basic::default())?;
3
    match db.update::<Basic, _>(&mut doc) {
3
        Err(Error::DocumentNotFound(collection, id)) => {
3
            assert_eq!(collection, Basic::collection_name());
3
            assert_eq!(id.as_ref(), &DocumentId::from_u64(1));
3
            Ok(())
        }
        other => panic!("expected DocumentNotFound from update but got: {:?}", other),
    }
3
}

            
5
pub async fn no_update_tests<C: AsyncConnection>(db: &C) -> anyhow::Result<()> {
5
    let original_value = Basic::new("initial_value");
5
    let collection = db.collection::<Basic>();
5
    let header = collection.push(&original_value).await?;

            
5
    let mut doc = collection
5
        .get(header.id)
5
        .await?
5
        .expect("couldn't retrieve stored item");
5
    db.update::<Basic, _>(&mut doc).await?;

            
5
    assert_eq!(CollectionHeader::try_from(doc.header)?, header);

            
5
    Ok(())
5
}

            
3
pub fn blocking_no_update_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
3
    let original_value = Basic::new("initial_value");
3
    let collection = db.collection::<Basic>();
3
    let header = collection.push(&original_value)?;

            
3
    let mut doc = collection
3
        .get(header.id)?
3
        .expect("couldn't retrieve stored item");
3
    db.update::<Basic, _>(&mut doc)?;

            
3
    assert_eq!(CollectionHeader::try_from(doc.header)?, header);

            
3
    Ok(())
3
}

            
5
pub async fn get_multiple_tests<C: AsyncConnection>(db: &C) -> anyhow::Result<()> {
5
    let collection = db.collection::<Basic>();
5
    let doc1_value = Basic::new("initial_value");
5
    let doc1 = collection.push(&doc1_value).await?;

            
5
    let doc2_value = Basic::new("second_value");
5
    let doc2 = collection.push(&doc2_value).await?;

            
5
    let both_docs = Basic::get_multiple_async([doc1.id, doc2.id], db).await?;
5
    assert_eq!(both_docs.len(), 2);

            
5
    let out_of_order = Basic::get_multiple_async([doc2.id, doc1.id], db).await?;
5
    assert_eq!(out_of_order.len(), 2);

            
    // The order of get_multiple isn't guaranteed, so these two checks are done
    // with iterators instead of direct indexing
5
    let doc1 = both_docs
5
        .iter()
5
        .find(|doc| doc.header.id == doc1.id)
5
        .expect("Couldn't find doc1");
5
    assert_eq!(doc1.contents.value, doc1_value.value);
5
    let doc2 = both_docs
5
        .iter()
10
        .find(|doc| doc.header.id == doc2.id)
5
        .expect("Couldn't find doc2");
5
    assert_eq!(doc2.contents.value, doc2_value.value);

            
5
    Ok(())
5
}

            
3
pub fn blocking_get_multiple_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
3
    let collection = db.collection::<Basic>();
3
    let doc1_value = Basic::new("initial_value");
3
    let doc1 = collection.push(&doc1_value)?;

            
3
    let doc2_value = Basic::new("second_value");
3
    let doc2 = collection.push(&doc2_value)?;

            
3
    let both_docs = Basic::get_multiple([doc1.id, doc2.id], db)?;
3
    assert_eq!(both_docs.len(), 2);

            
3
    let out_of_order = Basic::get_multiple([doc2.id, doc1.id], db)?;
3
    assert_eq!(out_of_order.len(), 2);

            
    // The order of get_multiple isn't guaranteed, so these two checks are done
    // with iterators instead of direct indexing
3
    let doc1 = both_docs
3
        .iter()
3
        .find(|doc| doc.header.id == doc1.id)
3
        .expect("Couldn't find doc1");
3
    assert_eq!(doc1.contents.value, doc1_value.value);
3
    let doc2 = both_docs
3
        .iter()
6
        .find(|doc| doc.header.id == doc2.id)
3
        .expect("Couldn't find doc2");
3
    assert_eq!(doc2.contents.value, doc2_value.value);

            
3
    Ok(())
3
}

            
5
pub async fn list_tests<C: AsyncConnection>(db: &C) -> anyhow::Result<()> {
5
    let collection = db.collection::<Basic>();
5
    let doc1_value = Basic::new("initial_value");
5
    let doc1 = collection.push(&doc1_value).await?;

            
5
    let doc2_value = Basic::new("second_value");
5
    let doc2 = collection.push(&doc2_value).await?;

            
5
    let all_docs = Basic::all_async(db).await?;
5
    assert_eq!(all_docs.len(), 2);
5
    assert_eq!(Basic::all_async(db).count().await?, 2);

            
5
    let both_docs = Basic::list_async(doc1.id..=doc2.id, db).await?;
5
    assert_eq!(both_docs.len(), 2);
5
    assert_eq!(Basic::list_async(doc1.id..=doc2.id, db).count().await?, 2);

            
5
    assert_eq!(both_docs[0].contents.value, doc1_value.value);
5
    assert_eq!(both_docs[1].contents.value, doc2_value.value);

            
5
    let both_headers = Basic::list_async(doc1.id..=doc2.id, db).headers().await?;

            
5
    assert_eq!(both_headers.len(), 2);

            
5
    let one_doc = Basic::list_async(doc1.id..doc2.id, db).await?;
5
    assert_eq!(one_doc.len(), 1);

            
5
    let limited = Basic::list_async(doc1.id..=doc2.id, db)
5
        .limit(1)
5
        .descending()
5
        .await?;
5
    assert_eq!(limited.len(), 1);
5
    assert_eq!(limited[0].contents.value, doc2_value.value);

            
5
    Ok(())
5
}

            
3
pub fn blocking_list_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
3
    let collection = db.collection::<Basic>();
3
    let doc1_value = Basic::new("initial_value");
3
    let doc1 = collection.push(&doc1_value)?;

            
3
    let doc2_value = Basic::new("second_value");
3
    let doc2 = collection.push(&doc2_value)?;

            
3
    let all_docs = Basic::all(db).query()?;
3
    assert_eq!(all_docs.len(), 2);
3
    assert_eq!(Basic::all(db).count()?, 2);

            
3
    let both_docs = Basic::list(doc1.id..=doc2.id, db).query()?;
3
    assert_eq!(both_docs.len(), 2);
3
    assert_eq!(Basic::list(doc1.id..=doc2.id, db).count()?, 2);

            
3
    assert_eq!(both_docs[0].contents.value, doc1_value.value);
3
    assert_eq!(both_docs[1].contents.value, doc2_value.value);

            
3
    let both_headers = Basic::list(doc1.id..=doc2.id, db).headers()?;

            
3
    assert_eq!(both_headers.len(), 2);

            
3
    let one_doc = Basic::list(doc1.id..doc2.id, db).query()?;
3
    assert_eq!(one_doc.len(), 1);

            
3
    let limited = Basic::list(doc1.id..=doc2.id, db)
3
        .limit(1)
3
        .descending()
3
        .query()?;
3
    assert_eq!(limited.len(), 1);
3
    assert_eq!(limited[0].contents.value, doc2_value.value);

            
3
    Ok(())
3
}

            
5
pub async fn list_transactions_tests<C: AsyncConnection>(db: &C) -> anyhow::Result<()> {
5
    let collection = db.collection::<Basic>();
5

            
5
    // create LIST_TRANSACTIONS_MAX_RESULTS + 1 items, giving us just enough
5
    // transactions to test the edge cases of `list_transactions`
5
    futures::future::join_all(
5
        (0..=(LIST_TRANSACTIONS_MAX_RESULTS))
5005
            .map(|_| async { collection.push(&Basic::default()).await.unwrap() }),
4582
    )
4582
    .await;

            
    // Test defaults
5
    let transactions = db.list_executed_transactions(None, None).await?;
5
    assert_eq!(
5
        u32::try_from(transactions.len()).unwrap(),
5
        LIST_TRANSACTIONS_DEFAULT_RESULT_COUNT
5
    );

            
    // Test max results limit
5
    let transactions = db
5
        .list_executed_transactions(None, Some(LIST_TRANSACTIONS_MAX_RESULTS + 1))
5
        .await?;
5
    assert_eq!(
5
        u32::try_from(transactions.len()).unwrap(),
5
        LIST_TRANSACTIONS_MAX_RESULTS
5
    );

            
    // Test requesting 0 items
5
    let transactions = db.list_executed_transactions(None, Some(0)).await?;
5
    assert!(transactions.is_empty());

            
    // Test doing a loop fetching until we get no more results
5
    let mut transactions = Vec::new();
5
    let mut starting_id = None;
    loop {
60
        let chunk = db
60
            .list_executed_transactions(starting_id, Some(100))
60
            .await?;
60
        if chunk.is_empty() {
5
            break;
55
        }
55

            
55
        let max_id = chunk.last().map(|tx| tx.id).unwrap();
55
        starting_id = Some(max_id + 1);
55
        transactions.extend(chunk);
    }

            
5
    assert_eq!(
5
        u32::try_from(transactions.len()).unwrap(),
5
        LIST_TRANSACTIONS_MAX_RESULTS + 1
5
    );

            
5
    Ok(())
5
}

            
3
pub fn blocking_list_transactions_tests<C: Connection + Clone + 'static>(
3
    db: &C,
3
) -> anyhow::Result<()> {
3
    // create LIST_TRANSACTIONS_MAX_RESULTS + 1 items, giving us just enough
3
    // transactions to test the edge cases of `list_transactions`
3
    let mut threads = Vec::with_capacity(num_cpus::get());
3
    let transaction_counter = Arc::new(AtomicU32::new(0));
6
    for _ in 0..threads.capacity() {
6
        let db = db.clone();
6
        let transaction_counter = transaction_counter.clone();
6
        threads.push(std::thread::spawn(move || {
6
            let collection = db.collection::<Basic>();
3009
            while transaction_counter.fetch_add(1, Ordering::SeqCst)
3009
                <= LIST_TRANSACTIONS_MAX_RESULTS
3003
            {
3003
                collection.push(&Basic::default()).unwrap();
3003
            }
6
        }));
6
    }

            
9
    for thread in threads {
6
        thread.join().unwrap();
6
    }

            
    // Test defaults
3
    let transactions = db.list_executed_transactions(None, None)?;
3
    assert_eq!(
3
        u32::try_from(transactions.len()).unwrap(),
3
        LIST_TRANSACTIONS_DEFAULT_RESULT_COUNT
3
    );

            
    // Test max results limit
3
    let transactions =
3
        db.list_executed_transactions(None, Some(LIST_TRANSACTIONS_MAX_RESULTS + 1))?;
3
    assert_eq!(
3
        u32::try_from(transactions.len()).unwrap(),
3
        LIST_TRANSACTIONS_MAX_RESULTS
3
    );

            
    // Test requesting 0 items
3
    let transactions = db.list_executed_transactions(None, Some(0))?;
3
    assert!(transactions.is_empty());

            
    // Test doing a loop fetching until we get no more results
3
    let mut transactions = Vec::new();
3
    let mut starting_id = None;
    loop {
36
        let chunk = db.list_executed_transactions(starting_id, Some(100))?;
36
        if chunk.is_empty() {
3
            break;
33
        }
33

            
33
        let max_id = chunk.last().map(|tx| tx.id).unwrap();
33
        starting_id = Some(max_id + 1);
33
        transactions.extend(chunk);
    }

            
3
    assert_eq!(
3
        u32::try_from(transactions.len()).unwrap(),
3
        LIST_TRANSACTIONS_MAX_RESULTS + 1
3
    );

            
3
    Ok(())
3
}

            
5
pub async fn view_query_tests<C: AsyncConnection>(db: &C) -> anyhow::Result<()> {
5
    let collection = db.collection::<Basic>();
5
    let a = collection.push(&Basic::new("A")).await?;
5
    let b = collection.push(&Basic::new("B")).await?;
5
    let a_child = collection
5
        .push(
5
            &Basic::new("A.1")
5
                .with_parent_id(a.id)
5
                .with_category("Alpha"),
5
        )
5
        .await?;
5
    collection
5
        .push(&Basic::new("B.1").with_parent_id(b.id).with_category("Beta"))
5
        .await?;
5
    collection
5
        .push(&Basic::new("B.2").with_parent_id(b.id).with_category("beta"))
5
        .await?;

            
5
    let a_children = db
5
        .view::<BasicByParentId>()
5
        .with_key(Some(a.id))
5
        .query()
5
        .await?;
5
    assert_eq!(a_children.len(), 1);

            
5
    let a_children = db
5
        .view::<BasicByParentId>()
5
        .with_key(Some(a.id))
10
        .query_with_collection_docs()
10
        .await?;
5
    assert_eq!(a_children.len(), 1);
5
    assert_eq!(a_children.get(0).unwrap().document.header, a_child);

            
5
    let b_children = db
5
        .view::<BasicByParentId>()
5
        .with_key(Some(b.id))
5
        .query()
5
        .await?;
5
    assert_eq!(b_children.len(), 2);

            
5
    let a_and_b_children = db
5
        .view::<BasicByParentId>()
5
        .with_keys([Some(a.id), Some(b.id)])
5
        .query()
5
        .await?;
5
    assert_eq!(a_and_b_children.len(), 3);

            
    // Test out of order keys
5
    let a_and_b_children = db
5
        .view::<BasicByParentId>()
5
        .with_keys([Some(b.id), Some(a.id)])
5
        .query()
5
        .await?;
5
    assert_eq!(a_and_b_children.len(), 3);

            
5
    let has_parent = db
5
        .view::<BasicByParentId>()
5
        .with_key_range(Some(0)..=Some(u64::MAX))
5
        .query()
5
        .await?;
5
    assert_eq!(has_parent.len(), 3);
    // Verify the result is sorted ascending
5
    assert!(has_parent
5
        .windows(2)
10
        .all(|window| window[0].key <= window[1].key));

            
    // Test limiting and descending order
5
    let last_with_parent = db
5
        .view::<BasicByParentId>()
5
        .with_key_range(Some(0)..=Some(u64::MAX))
5
        .descending()
5
        .limit(1)
5
        .query()
5
        .await?;
10
    assert_eq!(last_with_parent.iter().map(|m| m.key).unique().count(), 1);
5
    assert_eq!(last_with_parent[0].key, has_parent[2].key);

            
5
    let items_with_categories = db.view::<BasicByCategory>().query().await?;
5
    assert_eq!(items_with_categories.len(), 3);

            
    // Test deleting
5
    let deleted_count = db
5
        .view::<BasicByParentId>()
5
        .with_key(Some(b.id))
5
        .delete_docs()
5
        .await?;
5
    assert_eq!(b_children.len() as u64, deleted_count);
    assert_eq!(
5
        db.view::<BasicByParentId>()
5
            .with_key(Some(b.id))
5
            .query()
5
            .await?
5
            .len(),
        0
    );

            
5
    Ok(())
5
}

            
3
pub fn blocking_view_query_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
3
    let collection = db.collection::<Basic>();
3
    let a = collection.push(&Basic::new("A"))?;
3
    let b = collection.push(&Basic::new("B"))?;
3
    let a_child = collection.push(
3
        &Basic::new("A.1")
3
            .with_parent_id(a.id)
3
            .with_category("Alpha"),
3
    )?;
3
    collection.push(&Basic::new("B.1").with_parent_id(b.id).with_category("Beta"))?;
3
    collection.push(&Basic::new("B.2").with_parent_id(b.id).with_category("beta"))?;

            
3
    let a_children = db.view::<BasicByParentId>().with_key(Some(a.id)).query()?;
3
    assert_eq!(a_children.len(), 1);

            
3
    let a_children = db
3
        .view::<BasicByParentId>()
3
        .with_key(Some(a.id))
3
        .query_with_collection_docs()?;
3
    assert_eq!(a_children.len(), 1);
3
    assert_eq!(a_children.get(0).unwrap().document.header, a_child);

            
3
    let b_children = db.view::<BasicByParentId>().with_key(Some(b.id)).query()?;
3
    assert_eq!(b_children.len(), 2);

            
3
    let a_and_b_children = db
3
        .view::<BasicByParentId>()
3
        .with_keys([Some(a.id), Some(b.id)])
3
        .query()?;
3
    assert_eq!(a_and_b_children.len(), 3);

            
    // Test out of order keys
3
    let a_and_b_children = db
3
        .view::<BasicByParentId>()
3
        .with_keys([Some(b.id), Some(a.id)])
3
        .query()?;
3
    assert_eq!(a_and_b_children.len(), 3);

            
3
    let has_parent = db
3
        .view::<BasicByParentId>()
3
        .with_key_range(Some(0)..=Some(u64::MAX))
3
        .query()?;
3
    assert_eq!(has_parent.len(), 3);
    // Verify the result is sorted ascending
3
    assert!(has_parent
3
        .windows(2)
6
        .all(|window| window[0].key <= window[1].key));

            
    // Test limiting and descending order
3
    let last_with_parent = db
3
        .view::<BasicByParentId>()
3
        .with_key_range(Some(0)..=Some(u64::MAX))
3
        .descending()
3
        .limit(1)
3
        .query()?;
6
    assert_eq!(last_with_parent.iter().map(|m| m.key).unique().count(), 1);
3
    assert_eq!(last_with_parent[0].key, has_parent[2].key);

            
3
    let items_with_categories = db.view::<BasicByCategory>().query()?;
3
    assert_eq!(items_with_categories.len(), 3);

            
    // Test deleting
3
    let deleted_count = db
3
        .view::<BasicByParentId>()
3
        .with_key(Some(b.id))
3
        .delete_docs()?;
3
    assert_eq!(b_children.len() as u64, deleted_count);
3
    assert_eq!(
3
        db.view::<BasicByParentId>()
3
            .with_key(Some(b.id))
3
            .query()?
3
            .len(),
        0
    );

            
3
    Ok(())
3
}

            
5
pub async fn unassociated_collection_tests<C: AsyncConnection>(db: &C) -> anyhow::Result<()> {
5
    let result = db
5
        .collection::<UnassociatedCollection>()
5
        .push(&UnassociatedCollection)
5
        .await;
5
    match result {
5
        Err(Error::CollectionNotFound) => {}
        other => unreachable!("unexpected result: {:?}", other),
    }

            
5
    Ok(())
5
}

            
3
pub fn blocking_unassociated_collection_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
3
    let result = db
3
        .collection::<UnassociatedCollection>()
3
        .push(&UnassociatedCollection);
3
    match result {
3
        Err(Error::CollectionNotFound) => {}
        other => unreachable!("unexpected result: {:?}", other),
    }

            
3
    Ok(())
3
}

            
5
pub async fn unimplemented_reduce<C: AsyncConnection>(db: &C) -> anyhow::Result<()> {
5
    assert!(matches!(
5
        db.view::<UniqueValue>().reduce().await,
        Err(Error::ReduceUnimplemented)
    ));
5
    Ok(())
5
}

            
3
pub fn blocking_unimplemented_reduce<C: Connection>(db: &C) -> anyhow::Result<()> {
3
    assert!(matches!(
3
        db.view::<UniqueValue>().reduce(),
        Err(Error::ReduceUnimplemented)
    ));
3
    Ok(())
3
}

            
5
pub async fn view_update_tests<C: AsyncConnection>(db: &C) -> anyhow::Result<()> {
5
    let collection = db.collection::<Basic>();
5
    let a = collection.push(&Basic::new("A")).await?;

            
5
    let a_children = db
5
        .view::<BasicByParentId>()
5
        .with_key(Some(a.id))
5
        .query()
5
        .await?;
5
    assert_eq!(a_children.len(), 0);
    // The reduce function of `BasicByParentId` acts as a "count" of records.
    assert_eq!(
5
        db.view::<BasicByParentId>()
5
            .with_key(Some(a.id))
5
            .reduce()
5
            .await?,
        0
    );

            
    // Test inserting a new record and the view being made available
5
    let a_child = collection
5
        .push(
5
            &Basic::new("A.1")
5
                .with_parent_id(a.id)
5
                .with_category("Alpha"),
5
        )
5
        .await?;

            
5
    let a_children = db
5
        .view::<BasicByParentId>()
5
        .with_key(Some(a.id))
5
        .query()
5
        .await?;
5
    assert_eq!(a_children.len(), 1);
    assert_eq!(
5
        db.view::<BasicByParentId>()
5
            .with_key(Some(a.id))
5
            .reduce()
5
            .await?,
        1
    );

            
    // Verify reduce_grouped matches our expectations.
    assert_eq!(
5
        db.view::<BasicByParentId>().reduce_grouped().await?,
5
        vec![MappedValue::new(None, 1,), MappedValue::new(Some(a.id), 1,),]
    );

            
    // Test updating the record and the view being updated appropriately
5
    let mut doc = db.collection::<Basic>().get(a_child.id).await?.unwrap();
5
    let mut basic = Basic::document_contents(&doc)?;
5
    basic.parent_id = None;
5
    Basic::set_document_contents(&mut doc, basic)?;
5
    db.update::<Basic, _>(&mut doc).await?;

            
5
    let a_children = db
5
        .view::<BasicByParentId>()
5
        .with_key(Some(a.id))
5
        .query()
5
        .await?;
5
    assert_eq!(a_children.len(), 0);
    assert_eq!(
5
        db.view::<BasicByParentId>()
5
            .with_key(Some(a.id))
5
            .reduce()
5
            .await?,
        0
    );
5
    assert_eq!(db.view::<BasicByParentId>().reduce().await?, 2);

            
    // Test deleting a record and ensuring it goes away
5
    db.collection::<Basic>().delete(&doc).await?;

            
5
    let all_entries = db.view::<BasicByParentId>().query().await?;
5
    assert_eq!(all_entries.len(), 1);

            
    // Verify reduce_grouped matches our expectations.
    assert_eq!(
5
        db.view::<BasicByParentId>().reduce_grouped().await?,
5
        vec![MappedValue::new(None, 1,),]
    );

            
    // Remove the final document, which has a parent id of None. We'll add a new
    // document with None to verify that the mapper handles the edge case of a
    // delete/insert in the same mapping operation.
5
    db.view::<BasicByParentId>().delete_docs().await?;
5
    collection.push(&Basic::new("B")).await?;

            
5
    let all_entries = db.view::<BasicByParentId>().query().await?;
5
    assert_eq!(all_entries.len(), 1);

            
5
    Ok(())
5
}

            
3
pub fn blocking_view_update_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
3
    let collection = db.collection::<Basic>();
3
    let a = collection.push(&Basic::new("A"))?;

            
3
    let a_children = db.view::<BasicByParentId>().with_key(Some(a.id)).query()?;
3
    assert_eq!(a_children.len(), 0);
    // The reduce function of `BasicByParentId` acts as a "count" of records.
3
    assert_eq!(
3
        db.view::<BasicByParentId>().with_key(Some(a.id)).reduce()?,
        0
    );

            
    // Test inserting a new record and the view being made available
3
    let a_child = collection.push(
3
        &Basic::new("A.1")
3
            .with_parent_id(a.id)
3
            .with_category("Alpha"),
3
    )?;

            
3
    let a_children = db.view::<BasicByParentId>().with_key(Some(a.id)).query()?;
3
    assert_eq!(a_children.len(), 1);
3
    assert_eq!(
3
        db.view::<BasicByParentId>().with_key(Some(a.id)).reduce()?,
        1
    );

            
    // Verify reduce_grouped matches our expectations.
3
    assert_eq!(
3
        db.view::<BasicByParentId>().reduce_grouped()?,
3
        vec![MappedValue::new(None, 1,), MappedValue::new(Some(a.id), 1,),]
    );

            
    // Test updating the record and the view being updated appropriately
3
    let mut doc = db.collection::<Basic>().get(a_child.id)?.unwrap();
3
    let mut basic = Basic::document_contents(&doc)?;
3
    basic.parent_id = None;
3
    Basic::set_document_contents(&mut doc, basic)?;
3
    db.update::<Basic, _>(&mut doc)?;

            
3
    let a_children = db.view::<BasicByParentId>().with_key(Some(a.id)).query()?;
3
    assert_eq!(a_children.len(), 0);
3
    assert_eq!(
3
        db.view::<BasicByParentId>().with_key(Some(a.id)).reduce()?,
        0
    );
3
    assert_eq!(db.view::<BasicByParentId>().reduce()?, 2);

            
    // Test deleting a record and ensuring it goes away
3
    db.collection::<Basic>().delete(&doc)?;

            
3
    let all_entries = db.view::<BasicByParentId>().query()?;
3
    assert_eq!(all_entries.len(), 1);

            
    // Verify reduce_grouped matches our expectations.
3
    assert_eq!(
3
        db.view::<BasicByParentId>().reduce_grouped()?,
3
        vec![MappedValue::new(None, 1,),]
    );

            
    // Remove the final document, which has a parent id of None. We'll add a new
    // document with None to verify that the mapper handles the edge case of a
    // delete/insert in the same mapping operation.
3
    db.view::<BasicByParentId>().delete_docs()?;
3
    collection.push(&Basic::new("B"))?;

            
3
    let all_entries = db.view::<BasicByParentId>().query()?;
3
    assert_eq!(all_entries.len(), 1);

            
3
    Ok(())
3
}

            
5
pub async fn view_multi_emit_tests<C: AsyncConnection>(db: &C) -> anyhow::Result<()> {
5
    let mut a = Basic::new("A")
5
        .with_tag("red")
5
        .with_tag("green")
5
        .push_into_async(db)
5
        .await?;
5
    let mut b = Basic::new("B")
5
        .with_tag("blue")
5
        .with_tag("green")
5
        .push_into_async(db)
5
        .await?;

            
5
    assert_eq!(db.view::<BasicByTag>().query().await?.len(), 4);

            
    assert_eq!(
5
        db.view::<BasicByTag>()
5
            .with_key(String::from("green"))
5
            .query()
5
            .await?
5
            .len(),
        2
    );

            
    assert_eq!(
5
        db.view::<BasicByTag>()
5
            .with_key(String::from("red"))
5
            .query()
5
            .await?
5
            .len(),
        1
    );

            
    assert_eq!(
5
        db.view::<BasicByTag>()
5
            .with_key(String::from("blue"))
5
            .query()
5
            .await?
5
            .len(),
        1
    );

            
    // Change tags
5
    a.contents.tags = vec![String::from("red"), String::from("blue")];
5
    a.update_async(db).await?;

            
    assert_eq!(
5
        db.view::<BasicByTag>()
5
            .with_key(String::from("green"))
5
            .query()
5
            .await?
5
            .len(),
        1
    );

            
    assert_eq!(
5
        db.view::<BasicByTag>()
5
            .with_key(String::from("red"))
5
            .query()
5
            .await?
5
            .len(),
        1
    );

            
    assert_eq!(
5
        db.view::<BasicByTag>()
5
            .with_key(String::from("blue"))
5
            .query()
5
            .await?
5
            .len(),
        2
    );
5
    b.contents.tags.clear();
5
    b.update_async(db).await?;

            
    assert_eq!(
5
        db.view::<BasicByTag>()
5
            .with_key(String::from("green"))
5
            .query()
5
            .await?
5
            .len(),
        0
    );

            
    assert_eq!(
5
        db.view::<BasicByTag>()
5
            .with_key(String::from("red"))
5
            .query()
5
            .await?
5
            .len(),
        1
    );

            
    assert_eq!(
5
        db.view::<BasicByTag>()
5
            .with_key(String::from("blue"))
5
            .query()
5
            .await?
5
            .len(),
        1
    );

            
5
    Ok(())
5
}

            
3
pub fn blocking_view_multi_emit_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
3
    let mut a = Basic::new("A")
3
        .with_tag("red")
3
        .with_tag("green")
3
        .push_into(db)?;
3
    let mut b = Basic::new("B")
3
        .with_tag("blue")
3
        .with_tag("green")
3
        .push_into(db)?;

            
3
    assert_eq!(db.view::<BasicByTag>().query()?.len(), 4);

            
3
    assert_eq!(
3
        db.view::<BasicByTag>()
3
            .with_key(String::from("green"))
3
            .query()?
3
            .len(),
        2
    );

            
3
    assert_eq!(
3
        db.view::<BasicByTag>()
3
            .with_key(String::from("red"))
3
            .query()?
3
            .len(),
        1
    );

            
3
    assert_eq!(
3
        db.view::<BasicByTag>()
3
            .with_key(String::from("blue"))
3
            .query()?
3
            .len(),
        1
    );

            
    // Change tags
3
    a.contents.tags = vec![String::from("red"), String::from("blue")];
3
    a.update(db)?;

            
3
    assert_eq!(
3
        db.view::<BasicByTag>()
3
            .with_key(String::from("green"))
3
            .query()?
3
            .len(),
        1
    );

            
3
    assert_eq!(
3
        db.view::<BasicByTag>()
3
            .with_key(String::from("red"))
3
            .query()?
3
            .len(),
        1
    );

            
3
    assert_eq!(
3
        db.view::<BasicByTag>()
3
            .with_key(String::from("blue"))
3
            .query()?
3
            .len(),
        2
    );
3
    b.contents.tags.clear();
3
    b.update(db)?;

            
3
    assert_eq!(
3
        db.view::<BasicByTag>()
3
            .with_key(String::from("green"))
3
            .query()?
3
            .len(),
        0
    );

            
3
    assert_eq!(
3
        db.view::<BasicByTag>()
3
            .with_key(String::from("red"))
3
            .query()?
3
            .len(),
        1
    );

            
3
    assert_eq!(
3
        db.view::<BasicByTag>()
3
            .with_key(String::from("blue"))
3
            .query()?
3
            .len(),
        1
    );

            
3
    Ok(())
3
}

            
5
pub async fn view_access_policy_tests<C: AsyncConnection>(db: &C) -> anyhow::Result<()> {
5
    let collection = db.collection::<Basic>();
5
    let a = collection.push(&Basic::new("A")).await?;

            
    // Test inserting a record that should match the view, but ask for it to be
    // NoUpdate. Verify we get no matches.
5
    collection
5
        .push(
5
            &Basic::new("A.1")
5
                .with_parent_id(a.id)
5
                .with_category("Alpha"),
5
        )
5
        .await?;

            
5
    let a_children = db
5
        .view::<BasicByParentId>()
5
        .with_key(Some(a.id))
5
        .with_access_policy(AccessPolicy::NoUpdate)
5
        .query()
5
        .await?;
5
    assert_eq!(a_children.len(), 0);

            
5
    tokio::time::sleep(Duration::from_millis(20)).await;

            
    // Verify the view still have no value, but this time ask for it to be
    // updated after returning
5
    let a_children = db
5
        .view::<BasicByParentId>()
5
        .with_key(Some(a.id))
5
        .with_access_policy(AccessPolicy::UpdateAfter)
5
        .query()
5
        .await?;
5
    assert_eq!(a_children.len(), 0);

            
    // Waiting on background jobs can be unreliable in a CI environment
5
    for _ in 0..10_u8 {
5
        tokio::time::sleep(Duration::from_millis(20)).await;

            
        // Now, the view should contain the entry.
5
        let a_children = db
5
            .view::<BasicByParentId>()
5
            .with_key(Some(a.id))
5
            .with_access_policy(AccessPolicy::NoUpdate)
5
            .query()
5
            .await?;
5
        if a_children.len() == 1 {
5
            return Ok(());
        }
    }
    panic!("view never updated")
5
}

            
3
pub fn blocking_view_access_policy_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
3
    let collection = db.collection::<Basic>();
3
    let a = collection.push(&Basic::new("A"))?;

            
    // Test inserting a record that should match the view, but ask for it to be
    // NoUpdate. Verify we get no matches.
3
    collection.push(
3
        &Basic::new("A.1")
3
            .with_parent_id(a.id)
3
            .with_category("Alpha"),
3
    )?;

            
3
    let a_children = db
3
        .view::<BasicByParentId>()
3
        .with_key(Some(a.id))
3
        .with_access_policy(AccessPolicy::NoUpdate)
3
        .query()?;
3
    assert_eq!(a_children.len(), 0);

            
3
    std::thread::sleep(Duration::from_millis(20));

            
    // Verify the view still have no value, but this time ask for it to be
    // updated after returning
3
    let a_children = db
3
        .view::<BasicByParentId>()
3
        .with_key(Some(a.id))
3
        .with_access_policy(AccessPolicy::UpdateAfter)
3
        .query()?;
3
    assert_eq!(a_children.len(), 0);

            
    // Waiting on background jobs can be unreliable in a CI environment
3
    for _ in 0..10_u8 {
3
        std::thread::sleep(Duration::from_millis(20));

            
        // Now, the view should contain the entry.
3
        let a_children = db
3
            .view::<BasicByParentId>()
3
            .with_key(Some(a.id))
3
            .with_access_policy(AccessPolicy::NoUpdate)
3
            .query()?;
3
        if a_children.len() == 1 {
3
            return Ok(());
        }
    }
    panic!("view never updated")
3
}

            
5
pub async fn unique_view_tests<C: AsyncConnection>(db: &C) -> anyhow::Result<()> {
5
    let first_doc = db.collection::<Unique>().push(&Unique::new("1")).await?;

            
    if let Err(Error::UniqueKeyViolation {
5
        view,
5
        existing_document,
5
        conflicting_document,
5
    }) = db.collection::<Unique>().push(&Unique::new("1")).await
    {
5
        assert_eq!(view, UniqueValue.view_name());
5
        assert_eq!(first_doc.id, existing_document.id.deserialize()?);
        // We can't predict the conflicting document id since it's generated
        // inside of the transaction, but we can assert that it's different than
        // the document that was previously stored.
5
        assert_ne!(conflicting_document, existing_document);
    } else {
        unreachable!("unique key violation not triggered");
    }

            
5
    let second_doc = db.collection::<Unique>().push(&Unique::new("2")).await?;
5
    let mut second_doc = db.collection::<Unique>().get(second_doc.id).await?.unwrap();
5
    let mut contents = Unique::document_contents(&second_doc)?;
5
    contents.value = String::from("1");
5
    Unique::set_document_contents(&mut second_doc, contents)?;
    if let Err(Error::UniqueKeyViolation {
5
        view,
5
        existing_document,
5
        conflicting_document,
5
    }) = db.update::<Unique, _>(&mut second_doc).await
    {
5
        assert_eq!(view, UniqueValue.view_name());
5
        assert_eq!(first_doc.id, existing_document.id.deserialize()?);
5
        assert_eq!(conflicting_document.id, second_doc.header.id);
    } else {
        unreachable!("unique key violation not triggered");
    }

            
5
    Ok(())
5
}

            
3
pub fn blocking_unique_view_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
3
    let first_doc = db.collection::<Unique>().push(&Unique::new("1"))?;

            
    if let Err(Error::UniqueKeyViolation {
3
        view,
3
        existing_document,
3
        conflicting_document,
3
    }) = db.collection::<Unique>().push(&Unique::new("1"))
    {
3
        assert_eq!(view, UniqueValue.view_name());
3
        assert_eq!(first_doc.id, existing_document.id.deserialize()?);
        // We can't predict the conflicting document id since it's generated
        // inside of the transaction, but we can assert that it's different than
        // the document that was previously stored.
3
        assert_ne!(conflicting_document, existing_document);
    } else {
        unreachable!("unique key violation not triggered");
    }

            
3
    let second_doc = db.collection::<Unique>().push(&Unique::new("2"))?;
3
    let mut second_doc = db.collection::<Unique>().get(second_doc.id)?.unwrap();
3
    let mut contents = Unique::document_contents(&second_doc)?;
3
    contents.value = String::from("1");
3
    Unique::set_document_contents(&mut second_doc, contents)?;
    if let Err(Error::UniqueKeyViolation {
3
        view,
3
        existing_document,
3
        conflicting_document,
3
    }) = db.update::<Unique, _>(&mut second_doc)
    {
3
        assert_eq!(view, UniqueValue.view_name());
3
        assert_eq!(first_doc.id, existing_document.id.deserialize()?);
3
        assert_eq!(conflicting_document.id, second_doc.header.id);
    } else {
        unreachable!("unique key violation not triggered");
    }

            
3
    Ok(())
3
}

            
5
pub async fn named_collection_tests<C: AsyncConnection>(db: &C) -> anyhow::Result<()> {
5
    Unique::new("0").push_into_async(db).await?;
5
    let original_entry = Unique::entry_async("1", db)
5
        .update_with(|_existing: &mut Unique| unreachable!())
15
        .or_insert_with(|| Unique::new("1"))
15
        .await?
5
        .expect("Document not inserted");

            
5
    let updated = Unique::entry_async("1", db)
5
        .update_with(|existing: &mut Unique| {
5
            existing.value = String::from("2");
5
        })
15
        .or_insert_with(|| unreachable!())
15
        .await?
5
        .unwrap();
5
    assert_eq!(original_entry.header.id, updated.header.id);
5
    assert_ne!(original_entry.contents.value, updated.contents.value);

            
10
    let retrieved = Unique::entry_async("2", db).await?.unwrap();
5
    assert_eq!(retrieved.contents.value, updated.contents.value);

            
5
    let conflict = Unique::entry_async("2", db)
5
        .update_with(|existing: &mut Unique| {
5
            existing.value = String::from("0");
15
        })
15
        .await;
5
    assert!(matches!(conflict, Err(Error::UniqueKeyViolation { .. })));

            
5
    Ok(())
5
}

            
3
pub fn blocking_named_collection_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
3
    Unique::new("0").push_into(db)?;
3
    let original_entry = Unique::entry("1", db)
3
        .update_with(|_existing: &mut Unique| unreachable!())
3
        .or_insert_with(|| Unique::new("1"))
3
        .execute()?
3
        .expect("Document not inserted");

            
3
    let updated = Unique::entry("1", db)
3
        .update_with(|existing: &mut Unique| {
3
            existing.value = String::from("2");
3
        })
3
        .or_insert_with(|| unreachable!())
3
        .execute()?
3
        .unwrap();
3
    assert_eq!(original_entry.header.id, updated.header.id);
3
    assert_ne!(original_entry.contents.value, updated.contents.value);

            
3
    let retrieved = Unique::entry("2", db).execute()?.unwrap();
3
    assert_eq!(retrieved.contents.value, updated.contents.value);

            
3
    let conflict = Unique::entry("2", db)
3
        .update_with(|existing: &mut Unique| {
3
            existing.value = String::from("0");
3
        })
3
        .execute();
3
    assert!(matches!(conflict, Err(Error::UniqueKeyViolation { .. })));

            
3
    Ok(())
3
}

            
5
pub async fn compaction_tests<C: AsyncConnection + AsyncKeyValue>(db: &C) -> anyhow::Result<()> {
5
    let original_value = Basic::new("initial_value");
5
    let collection = db.collection::<Basic>();
5
    collection.push(&original_value).await?;

            
    // Test a collection compaction
5
    db.compact_collection::<Basic>().await?;

            
    // Test the key value store compaction
5
    db.set_key("foo", &1_u32).await?;
5
    db.compact_key_value_store().await?;

            
    // Compact everything... again...
5
    db.compact().await?;

            
5
    Ok(())
5
}

            
3
pub fn blocking_compaction_tests<C: Connection + KeyValue>(db: &C) -> anyhow::Result<()> {
3
    let original_value = Basic::new("initial_value");
3
    let collection = db.collection::<Basic>();
3
    collection.push(&original_value)?;

            
    // Test a collection compaction
3
    db.compact_collection::<Basic>()?;

            
    // Test the key value store compaction
3
    db.set_key("foo", &1_u32).execute()?;
3
    db.compact_key_value_store()?;

            
    // Compact everything... again...
3
    db.compact()?;

            
3
    Ok(())
3
}

            
5
pub async fn user_management_tests<C: AsyncConnection, S: AsyncStorageConnection>(
5
    admin: &C,
5
    server: S,
5
    server_name: &str,
5
) -> anyhow::Result<()> {
5
    let username = format!("user-management-tests-{}", server_name);
5
    let user_id = server.create_user(&username).await?;
    // Test the default created user state.
    {
5
        let user = User::get_async(user_id, admin)
5
            .await
5
            .unwrap()
5
            .expect("user not found");
5
        assert_eq!(user.contents.username, username);
5
        assert!(user.contents.groups.is_empty());
5
        assert!(user.contents.roles.is_empty());
    }

            
5
    let role = Role::named(format!("role-{}", server_name))
5
        .push_into_async(admin)
5
        .await
5
        .unwrap();
5
    let group = PermissionGroup::named(format!("group-{}", server_name))
5
        .push_into_async(admin)
5
        .await
5
        .unwrap();
5

            
5
    // Add the role and group.
5
    server
5
        .add_permission_group_to_user(user_id, &group)
5
        .await
5
        .unwrap();
5
    server.add_role_to_user(user_id, &role).await.unwrap();

            
    // Test the results
    {
5
        let user = User::get_async(user_id, admin)
5
            .await
5
            .unwrap()
5
            .expect("user not found");
5
        assert_eq!(user.contents.groups, vec![group.header.id]);
5
        assert_eq!(user.contents.roles, vec![role.header.id]);
    }

            
    // Add the same things again (should not do anything). With names this time.
5
    server
5
        .add_permission_group_to_user(&username, &group)
5
        .await
5
        .unwrap();
5
    server.add_role_to_user(&username, &role).await.unwrap();
    {
10
        let user = User::load_async(&username, admin)
10
            .await
5
            .unwrap()
5
            .expect("user not found");
5
        assert_eq!(user.contents.groups, vec![group.header.id]);
5
        assert_eq!(user.contents.roles, vec![role.header.id]);
    }

            
    // Remove the group.
5
    server
5
        .remove_permission_group_from_user(user_id, &group)
5
        .await
5
        .unwrap();
5
    server.remove_role_from_user(user_id, &role).await.unwrap();
    {
5
        let user = User::get_async(user_id, admin)
5
            .await
5
            .unwrap()
5
            .expect("user not found");
5
        assert!(user.contents.groups.is_empty());
5
        assert!(user.contents.roles.is_empty());
    }

            
    // Removing again shouldn't cause an error.
5
    server
5
        .remove_permission_group_from_user(user_id, &group)
5
        .await?;
5
    server.remove_role_from_user(user_id, &role).await?;

            
    // Remove the user
5
    server.delete_user(user_id).await?;
    // Test if user is removed.

            
5
    assert!(User::get_async(user_id, admin).await.unwrap().is_none());

            
5
    Ok(())
5
}

            
3
pub fn blocking_user_management_tests<C: Connection, S: StorageConnection>(
3
    admin: &C,
3
    server: &S,
3
    server_name: &str,
3
) -> anyhow::Result<()> {
3
    let username = format!("user-management-tests-{}", server_name);
3
    let user_id = server.create_user(&username)?;
    // Test the default created user state.
    {
3
        let user = User::get(user_id, admin).unwrap().expect("user not found");
3
        assert_eq!(user.contents.username, username);
3
        assert!(user.contents.groups.is_empty());
3
        assert!(user.contents.roles.is_empty());
    }

            
3
    let role = Role::named(format!("role-{}", server_name))
3
        .push_into(admin)
3
        .unwrap();
3
    let group = PermissionGroup::named(format!("group-{}", server_name))
3
        .push_into(admin)
3
        .unwrap();
3

            
3
    // Add the role and group.
3
    server
3
        .add_permission_group_to_user(user_id, &group)
3
        .unwrap();
3
    server.add_role_to_user(user_id, &role).unwrap();
3

            
3
    // Test the results
3
    {
3
        let user = User::get(user_id, admin).unwrap().expect("user not found");
3
        assert_eq!(user.contents.groups, vec![group.header.id]);
3
        assert_eq!(user.contents.roles, vec![role.header.id]);
    }

            
    // Add the same things again (should not do anything). With names this time.
3
    server
3
        .add_permission_group_to_user(&username, &group)
3
        .unwrap();
3
    server.add_role_to_user(&username, &role).unwrap();
3
    {
3
        let user = User::load(&username, admin)
3
            .unwrap()
3
            .expect("user not found");
3
        assert_eq!(user.contents.groups, vec![group.header.id]);
3
        assert_eq!(user.contents.roles, vec![role.header.id]);
    }

            
    // Remove the group.
3
    server
3
        .remove_permission_group_from_user(user_id, &group)
3
        .unwrap();
3
    server.remove_role_from_user(user_id, &role).unwrap();
3
    {
3
        let user = User::get(user_id, admin).unwrap().expect("user not found");
3
        assert!(user.contents.groups.is_empty());
3
        assert!(user.contents.roles.is_empty());
    }

            
    // Removing again shouldn't cause an error.
3
    server.remove_permission_group_from_user(user_id, &group)?;
3
    server.remove_role_from_user(user_id, &role)?;

            
    // Remove the user
3
    server.delete_user(user_id)?;
    // Test if user is removed.

            
3
    assert!(User::get(user_id, admin).unwrap().is_none());

            
3
    Ok(())
3
}

            
/// Defines the `KeyValue` test suite
#[macro_export]
macro_rules! define_async_kv_test_suite {
    ($harness:ident) => {
        #[cfg(test)]
        mod r#async_kv {
            use super::$harness;
            #[tokio::test]
5
            async fn basic_kv_test() -> anyhow::Result<()> {
                use $crate::keyvalue::{AsyncKeyValue, KeyStatus};
                let harness = $harness::new($crate::test_util::HarnessTest::KvBasic).await?;
                let db = harness.connect().await?;
                assert_eq!(
                    db.set_key("akey", &String::from("avalue")).await?,
                    KeyStatus::Inserted
                );
                assert_eq!(
                    db.get_key("akey").into().await?,
                    Some(String::from("avalue"))
                );
                assert_eq!(
                    db.set_key("akey", &String::from("new_value"))
                        .returning_previous_as()
                        .await?,
                    Some(String::from("avalue"))
                );
                assert_eq!(
                    db.get_key("akey").into().await?,
                    Some(String::from("new_value"))
                );
                assert_eq!(
                    db.get_key("akey").and_delete().into().await?,
                    Some(String::from("new_value"))
                );
                assert_eq!(db.get_key("akey").await?, None);
                assert_eq!(
                    db.set_key("akey", &String::from("new_value"))
                        .returning_previous()
                        .await?,
                    None
                );
                assert_eq!(db.delete_key("akey").await?, KeyStatus::Deleted);
                assert_eq!(db.delete_key("akey").await?, KeyStatus::NotChanged);

            
                harness.shutdown().await?;

            
                Ok(())
            }

            
            #[tokio::test]
5
            async fn kv_concurrency() -> anyhow::Result<()> {
                use $crate::keyvalue::{AsyncKeyValue, KeyStatus};
                const WRITERS: usize = 100;
                const INCREMENTS: usize = 100;
                let harness = $harness::new($crate::test_util::HarnessTest::KvConcurrency).await?;
                let db = harness.connect().await?;

            
                let handles = (0..WRITERS).map(|_| {
                    let db = db.clone();
                    tokio::task::spawn(async move {
                        for _ in 0..INCREMENTS {
                            db.increment_key_by("concurrency", 1_u64).await.unwrap();
                        }
                    })
                });
                futures::future::join_all(handles).await;

            
                assert_eq!(
                    db.get_key("concurrency").into_u64().await.unwrap().unwrap(),
                    (WRITERS * INCREMENTS) as u64
                );

            
                harness.shutdown().await?;

            
                Ok(())
            }

            
            #[tokio::test]
5
            async fn kv_set_tests() -> anyhow::Result<()> {
                use $crate::keyvalue::{AsyncKeyValue, KeyStatus};
                let harness = $harness::new($crate::test_util::HarnessTest::KvSet).await?;
                let db = harness.connect().await?;
                let kv = db.with_key_namespace("set");

            
                assert_eq!(
                    kv.set_key("a", &0_u32).only_if_exists().await?,
                    KeyStatus::NotChanged
                );
                assert_eq!(
                    kv.set_key("a", &0_u32).only_if_vacant().await?,
                    KeyStatus::Inserted
                );
                assert_eq!(
                    kv.set_key("a", &1_u32).only_if_vacant().await?,
                    KeyStatus::NotChanged
                );
                assert_eq!(
                    kv.set_key("a", &2_u32).only_if_exists().await?,
                    KeyStatus::Updated,
                );
                assert_eq!(
                    kv.set_key("a", &3_u32).returning_previous_as().await?,
                    Some(2_u32),
                );

            
                harness.shutdown().await?;

            
                Ok(())
            }

            
            #[tokio::test]
5
            async fn kv_increment_decrement_tests() -> anyhow::Result<()> {
                use $crate::keyvalue::{AsyncKeyValue, KeyStatus};
                let harness =
                    $harness::new($crate::test_util::HarnessTest::KvIncrementDecrement).await?;
                let db = harness.connect().await?;
                let kv = db.with_key_namespace("increment_decrement");

            
                // Empty keys should be equal to 0
                assert_eq!(kv.increment_key_by("i64", 1_i64).await?, 1_i64);
                assert_eq!(kv.get_key("i64").into_i64().await?, Some(1_i64));
                assert_eq!(kv.increment_key_by("u64", 1_u64).await?, 1_u64);
                $crate::assert_f64_eq!(kv.increment_key_by("f64", 1_f64).await?, 1_f64);

            
                // Test float incrementing/decrementing an existing value
                $crate::assert_f64_eq!(kv.increment_key_by("f64", 1_f64).await?, 2_f64);
                $crate::assert_f64_eq!(kv.decrement_key_by("f64", 2_f64).await?, 0_f64);

            
                // Empty keys should be equal to 0
                assert_eq!(kv.decrement_key_by("i64_2", 1_i64).await?, -1_i64);
                assert_eq!(
                    kv.decrement_key_by("u64_2", 42_u64)
                        .allow_overflow()
                        .await?,
                    u64::MAX - 41
                );
                assert_eq!(kv.decrement_key_by("u64_3", 42_u64).await?, u64::MIN);
                $crate::assert_f64_eq!(kv.decrement_key_by("f64_2", 1_f64).await?, -1_f64);

            
                // Test decrement wrapping with overflow
                kv.set_numeric_key("i64", i64::MIN).await?;
                assert_eq!(
                    kv.decrement_key_by("i64", 1_i64).allow_overflow().await?,
                    i64::MAX
                );
                assert_eq!(
                    kv.decrement_key_by("u64", 2_u64).allow_overflow().await?,
                    u64::MAX
                );

            
                // Test increment wrapping with overflow
                assert_eq!(
                    kv.increment_key_by("i64", 1_i64).allow_overflow().await?,
                    i64::MIN
                );
                assert_eq!(
                    kv.increment_key_by("u64", 1_u64).allow_overflow().await?,
                    u64::MIN
                );

            
                // Test saturating increments.
                kv.set_numeric_key("i64", i64::MAX - 1).await?;
                kv.set_numeric_key("u64", u64::MAX - 1).await?;
                assert_eq!(kv.increment_key_by("i64", 2_i64).await?, i64::MAX);
                assert_eq!(kv.increment_key_by("u64", 2_u64).await?, u64::MAX);

            
                // Test saturating decrements.
                kv.set_numeric_key("i64", i64::MIN + 1).await?;
                kv.set_numeric_key("u64", u64::MIN + 1).await?;
                assert_eq!(kv.decrement_key_by("i64", 2_i64).await?, i64::MIN);
                assert_eq!(kv.decrement_key_by("u64", 2_u64).await?, u64::MIN);

            
                // Test numerical conversion safety using get
                {
                    // For i64 -> f64, the limit is 2^52 + 1 in either posive or
                    // negative directions.
                    kv.set_numeric_key("i64", (2_i64.pow(f64::MANTISSA_DIGITS)))
                        .await?;
                    $crate::assert_f64_eq!(
                        kv.get_key("i64").into_f64().await?.unwrap(),
                        9_007_199_254_740_992_f64
                    );
                    kv.set_numeric_key("i64", -(2_i64.pow(f64::MANTISSA_DIGITS)))
                        .await?;
                    $crate::assert_f64_eq!(
                        kv.get_key("i64").into_f64().await?.unwrap(),
                        -9_007_199_254_740_992_f64
                    );

            
                    kv.set_numeric_key("i64", (2_i64.pow(f64::MANTISSA_DIGITS) + 1))
                        .await?;
                    assert!(matches!(kv.get_key("i64").into_f64().await, Err(_)));
                    $crate::assert_f64_eq!(
                        kv.get_key("i64").into_f64_lossy().await?.unwrap(),
                        9_007_199_254_740_993_f64
                    );
                    kv.set_numeric_key("i64", -(2_i64.pow(f64::MANTISSA_DIGITS) + 1))
                        .await?;
                    assert!(matches!(kv.get_key("i64").into_f64().await, Err(_)));
                    $crate::assert_f64_eq!(
                        kv.get_key("i64").into_f64_lossy().await?.unwrap(),
                        -9_007_199_254_740_993_f64
                    );

            
                    // For i64 -> u64, the only limit is sign.
                    kv.set_numeric_key("i64", -1_i64).await?;
                    assert!(matches!(kv.get_key("i64").into_u64().await, Err(_)));
                    assert_eq!(
                        kv.get_key("i64").into_u64_lossy(true).await?.unwrap(),
                        0_u64
                    );
                    assert_eq!(
                        kv.get_key("i64").into_u64_lossy(false).await?.unwrap(),
                        u64::MAX
                    );

            
                    // For f64 -> i64, the limit is fractional numbers. Saturating isn't tested in this conversion path.
                    kv.set_numeric_key("f64", 1.1_f64).await?;
                    assert!(matches!(kv.get_key("f64").into_i64().await, Err(_)));
                    assert_eq!(
                        kv.get_key("f64").into_i64_lossy(false).await?.unwrap(),
                        1_i64
                    );
                    kv.set_numeric_key("f64", -1.1_f64).await?;
                    assert!(matches!(kv.get_key("f64").into_i64().await, Err(_)));
                    assert_eq!(
                        kv.get_key("f64").into_i64_lossy(false).await?.unwrap(),
                        -1_i64
                    );

            
                    // For f64 -> u64, the limit is fractional numbers or negative numbers. Saturating isn't tested in this conversion path.
                    kv.set_numeric_key("f64", 1.1_f64).await?;
                    assert!(matches!(kv.get_key("f64").into_u64().await, Err(_)));
                    assert_eq!(
                        kv.get_key("f64").into_u64_lossy(false).await?.unwrap(),
                        1_u64
                    );
                    kv.set_numeric_key("f64", -1.1_f64).await?;
                    assert!(matches!(kv.get_key("f64").into_u64().await, Err(_)));
                    assert_eq!(
                        kv.get_key("f64").into_u64_lossy(false).await?.unwrap(),
                        0_u64
                    );

            
                    // For u64 -> i64, the limit is > i64::MAX
                    kv.set_numeric_key("u64", i64::MAX as u64 + 1).await?;
                    assert!(matches!(kv.get_key("u64").into_i64().await, Err(_)));
                    assert_eq!(
                        kv.get_key("u64").into_i64_lossy(true).await?.unwrap(),
                        i64::MAX
                    );
                    assert_eq!(
                        kv.get_key("u64").into_i64_lossy(false).await?.unwrap(),
                        i64::MIN
                    );
                }

            
                // Test that non-numeric keys won't be changed when attempting to incr/decr
                kv.set_key("non-numeric", &String::from("test")).await?;
                assert!(matches!(
                    kv.increment_key_by("non-numeric", 1_i64).await,
                    Err(_)
                ));
                assert!(matches!(
                    kv.decrement_key_by("non-numeric", 1_i64).await,
                    Err(_)
                ));
                assert_eq!(
                    kv.get_key("non-numeric").into::<String>().await?.unwrap(),
                    String::from("test")
                );

            
                // Test that NaN cannot be stored
                kv.set_numeric_key("f64", 0_f64).await?;
                assert!(matches!(
                    kv.set_numeric_key("f64", f64::NAN).await,
                    Err(bonsaidb_core::Error::NotANumber)
                ));
                // Verify the value was unchanged.
                $crate::assert_f64_eq!(kv.get_key("f64").into_f64().await?.unwrap(), 0.);
                // Try to increment by nan
                assert!(matches!(
                    kv.increment_key_by("f64", f64::NAN).await,
                    Err(bonsaidb_core::Error::NotANumber)
                ));
                $crate::assert_f64_eq!(kv.get_key("f64").into_f64().await?.unwrap(), 0.);

            
                harness.shutdown().await?;

            
                Ok(())
            }

            
            #[tokio::test]
5
            async fn kv_expiration_tests() -> anyhow::Result<()> {
                use std::time::Duration;

            
                use $crate::keyvalue::{AsyncKeyValue, KeyStatus};

            
                let harness = $harness::new($crate::test_util::HarnessTest::KvExpiration).await?;
                let db = harness.connect().await?;

            
                loop {
                    let kv = db.with_key_namespace("expiration");

            
                    kv.delete_key("a").await?;
                    kv.delete_key("b").await?;

            
                    // Test that the expiration is updated for key a, but not for key b.
                    let timing = $crate::test_util::TimingTest::new(Duration::from_millis(500));
                    let (r1, r2) = tokio::join!(
                        kv.set_key("a", &0_u32).expire_in(Duration::from_secs(2)),
                        kv.set_key("b", &0_u32).expire_in(Duration::from_secs(2))
                    );
                    if timing.elapsed() > Duration::from_millis(500) {
                        println!(
                            "Restarting test {}. Took too long {:?}",
                            line!(),
                            timing.elapsed(),
                        );
                        continue;
                    }
                    assert_eq!(r1?, KeyStatus::Inserted);
                    assert_eq!(r2?, KeyStatus::Inserted);
                    let (r1, r2) = tokio::join!(
                        kv.set_key("a", &1_u32).expire_in(Duration::from_secs(4)),
                        kv.set_key("b", &1_u32)
                            .expire_in(Duration::from_secs(100))
                            .keep_existing_expiration()
                    );
                    if timing.elapsed() > Duration::from_secs(1) {
                        println!(
                            "Restarting test {}. Took too long {:?}",
                            line!(),
                            timing.elapsed(),
                        );
                        continue;
                    }

            
                    assert_eq!(r1?, KeyStatus::Updated, "a wasn't an update");
                    assert_eq!(r2?, KeyStatus::Updated, "b wasn't an update");

            
                    let a = kv.get_key("a").into().await?;
                    assert_eq!(a, Some(1u32), "a shouldn't have expired yet");

            
                    // Before checking the value, make sure we haven't elapsed too
                    // much time. If so, just restart the test.
                    if !timing.wait_until_async(Duration::from_secs_f32(3.)).await {
                        println!(
                            "Restarting test {}. Took too long {:?}",
                            line!(),
                            timing.elapsed()
                        );
                        continue;
                    }

            
                    assert_eq!(kv.get_key("b").await?, None, "b never expired");

            
                    timing.wait_until_async(Duration::from_secs_f32(5.)).await;
                    assert_eq!(kv.get_key("a").await?, None, "a never expired");
                    break;
                }
                harness.shutdown().await?;

            
                Ok(())
            }

            
            #[tokio::test]
5
            async fn delete_expire_tests() -> anyhow::Result<()> {
                use std::time::Duration;

            
                use $crate::keyvalue::{AsyncKeyValue, KeyStatus};

            
                let harness = $harness::new($crate::test_util::HarnessTest::KvDeleteExpire).await?;
                let db = harness.connect().await?;

            
                loop {
                    let kv = db.with_key_namespace("delete_expire");

            
                    kv.delete_key("a").await?;

            
                    let timing = $crate::test_util::TimingTest::new(Duration::from_millis(100));

            
                    // Create a key with an expiration. Delete the key. Set a new
                    // value at that key with no expiration. Ensure it doesn't
                    // expire.
                    kv.set_key("a", &0_u32)
                        .expire_in(Duration::from_secs(2))
                        .await?;
                    kv.delete_key("a").await?;
                    kv.set_key("a", &1_u32).await?;
                    if timing.elapsed() > Duration::from_secs(1) {
                        println!(
                            "Restarting test {}. Took too long {:?}",
                            line!(),
                            timing.elapsed(),
                        );
                        continue;
                    }
                    if !timing.wait_until_async(Duration::from_secs_f32(2.5)).await {
                        println!(
                            "Restarting test {}. Took too long {:?}",
                            line!(),
                            timing.elapsed()
                        );
                        continue;
                    }

            
                    assert_eq!(kv.get_key("a").into().await?, Some(1u32));

            
                    break;
                }
                harness.shutdown().await?;

            
                Ok(())
            }

            
            #[tokio::test]
            // This test can fail when the machine its running on is under high load or
            // constrained resources. We need a command that persists (and waits until
            // persist) to make this test fully deterministic.
5
            async fn kv_transaction_tests() -> anyhow::Result<()> {
                use std::time::Duration;

            
                use $crate::{
                    connection::AsyncConnection,
                    keyvalue::{AsyncKeyValue, KeyStatus},
                };
                // loop {
                let harness = $harness::new($crate::test_util::HarnessTest::KvTransactions).await?;
                let db = harness.connect().await?;
                // Generate several transactions that we can validate. Persisting
                // happens in the background, so we delay between each step to give
                // it a moment.
                db.set_key("expires", &0_u32)
                    .expire_in(Duration::from_secs(1))
                    .await?;
                db.set_key("akey", &String::from("avalue")).await?;
                db.set_numeric_key("nkey", 0_u64).await?;
                tokio::time::sleep(Duration::from_millis(500)).await;
                db.increment_key_by("nkey", 1_u64).await?;
                tokio::time::sleep(Duration::from_millis(500)).await;
                db.delete_key("nkey").await?;
                db.get_key("akey").and_delete().await?;
                tokio::time::sleep(Duration::from_millis(500)).await;
                // Ensure this doesn't generate a transaction.
                db.delete_key("nkey").await?;

            
                tokio::time::sleep(Duration::from_secs(1)).await;

            
                let transactions =
                    AsyncConnection::list_executed_transactions(&db, None, None).await?;
                let deleted_keys = transactions
                    .iter()
                    .filter_map(|tx| tx.changes.keys())
                    .flatten()
                    .filter(|changed_key| dbg!(changed_key).deleted)
                    .count();
                assert_eq!(deleted_keys, 3, "deleted keys wasn't 3");
                let akey_changes = transactions
                    .iter()
                    .filter_map(|tx| tx.changes.keys())
                    .flatten()
                    .filter(|changed_key| changed_key.key == "akey")
                    .count();
                assert_eq!(akey_changes, 2, "akey changes wasn't 2");
                let nkey_changes = transactions
                    .iter()
                    .filter_map(|tx| tx.changes.keys())
                    .flatten()
                    .filter(|changed_key| changed_key.key == "nkey")
                    .count();
                assert_eq!(nkey_changes, 3, "nkey changes wasn't 3");

            
                harness.shutdown().await?;
                // }

            
                Ok(())
            }
        }
    };
}

            
/// Defines the `KeyValue` test suite
#[macro_export]
macro_rules! define_blocking_kv_test_suite {
    ($harness:ident) => {
        #[cfg(test)]
        mod blocking_kv {
            use super::$harness;

            
            #[test]
3
            fn basic_kv_test() -> anyhow::Result<()> {
                use $crate::keyvalue::{KeyStatus, KeyValue};
3
                let harness = $harness::new($crate::test_util::HarnessTest::KvBasic)?;
3
                let db = harness.connect()?;
3
                assert_eq!(
3
                    db.set_key("akey", &String::from("avalue")).execute()?,
                    KeyStatus::Inserted
                );
3
                assert_eq!(db.get_key("akey").into()?, Some(String::from("avalue")));
3
                assert_eq!(
3
                    db.set_key("akey", &String::from("new_value"))
3
                        .returning_previous_as()?,
3
                    Some(String::from("avalue"))
                );
3
                assert_eq!(db.get_key("akey").into()?, Some(String::from("new_value")));
3
                assert_eq!(
3
                    db.get_key("akey").and_delete().into()?,
3
                    Some(String::from("new_value"))
                );
3
                assert_eq!(db.get_key("akey").query()?, None);
3
                assert_eq!(
3
                    db.set_key("akey", &String::from("new_value"))
3
                        .returning_previous()?,
                    None
                );
3
                assert_eq!(db.delete_key("akey")?, KeyStatus::Deleted);
3
                assert_eq!(db.delete_key("akey")?, KeyStatus::NotChanged);

            
3
                harness.shutdown()?;

            
3
                Ok(())
3
            }

            
            #[test]
3
            fn kv_concurrency() -> anyhow::Result<()> {
                use $crate::keyvalue::{KeyStatus, KeyValue};
                const WRITERS: usize = 100;
                const INCREMENTS: usize = 100;
3
                let harness = $harness::new($crate::test_util::HarnessTest::KvConcurrency)?;
3
                let db = harness.connect()?;

            
3
                let threads = (0..WRITERS)
3
                    .map(|_| {
                        let db = db.clone();
                        std::thread::spawn(move || {
                            for _ in 0..INCREMENTS {
                                db.increment_key_by("concurrency", 1_u64).execute().unwrap();
                            }
                        })
3
                    })
3
                    .collect::<Vec<_>>();
303
                for thread in threads {
300
                    thread.join().unwrap();
300
                }

            
3
                assert_eq!(
3
                    db.get_key("concurrency").into_u64().unwrap().unwrap(),
3
                    (WRITERS * INCREMENTS) as u64
3
                );

            
3
                harness.shutdown()?;

            
3
                Ok(())
3
            }

            
            #[test]
3
            fn kv_set_tests() -> anyhow::Result<()> {
                use $crate::keyvalue::{KeyStatus, KeyValue};
3
                let harness = $harness::new($crate::test_util::HarnessTest::KvSet)?;
3
                let db = harness.connect()?;
3
                let kv = db.with_key_namespace("set");

            
3
                assert_eq!(
3
                    kv.set_key("a", &0_u32).only_if_exists().execute()?,
                    KeyStatus::NotChanged
                );
3
                assert_eq!(
3
                    kv.set_key("a", &0_u32).only_if_vacant().execute()?,
                    KeyStatus::Inserted
                );
3
                assert_eq!(
3
                    kv.set_key("a", &1_u32).only_if_vacant().execute()?,
                    KeyStatus::NotChanged
                );
3
                assert_eq!(
3
                    kv.set_key("a", &2_u32).only_if_exists().execute()?,
                    KeyStatus::Updated,
                );
3
                assert_eq!(
3
                    kv.set_key("a", &3_u32).returning_previous_as()?,
                    Some(2_u32),
                );

            
3
                harness.shutdown()?;

            
3
                Ok(())
3
            }

            
            #[test]
3
            fn kv_increment_decrement_tests() -> anyhow::Result<()> {
                use $crate::keyvalue::{KeyStatus, KeyValue};
3
                let harness = $harness::new($crate::test_util::HarnessTest::KvIncrementDecrement)?;
3
                let db = harness.connect()?;
3
                let kv = db.with_key_namespace("increment_decrement");

            
                // Empty keys should be equal to 0
3
                assert_eq!(kv.increment_key_by("i64", 1_i64).execute()?, 1_i64);
3
                assert_eq!(kv.get_key("i64").into_i64()?, Some(1_i64));
3
                assert_eq!(kv.increment_key_by("u64", 1_u64).execute()?, 1_u64);
3
                $crate::assert_f64_eq!(kv.increment_key_by("f64", 1_f64).execute()?, 1_f64);

            
                // Test float incrementing/decrementing an existing value
3
                $crate::assert_f64_eq!(kv.increment_key_by("f64", 1_f64).execute()?, 2_f64);
3
                $crate::assert_f64_eq!(kv.decrement_key_by("f64", 2_f64).execute()?, 0_f64);

            
                // Empty keys should be equal to 0
3
                assert_eq!(kv.decrement_key_by("i64_2", 1_i64).execute()?, -1_i64);
3
                assert_eq!(
3
                    kv.decrement_key_by("u64_2", 42_u64)
3
                        .allow_overflow()
3
                        .execute()?,
3
                    u64::MAX - 41
                );
3
                assert_eq!(kv.decrement_key_by("u64_3", 42_u64).execute()?, u64::MIN);
3
                $crate::assert_f64_eq!(kv.decrement_key_by("f64_2", 1_f64).execute()?, -1_f64);

            
                // Test decrement wrapping with overflow
3
                kv.set_numeric_key("i64", i64::MIN).execute()?;
3
                assert_eq!(
3
                    kv.decrement_key_by("i64", 1_i64)
3
                        .allow_overflow()
3
                        .execute()?,
                    i64::MAX
                );
3
                assert_eq!(
3
                    kv.decrement_key_by("u64", 2_u64)
3
                        .allow_overflow()
3
                        .execute()?,
                    u64::MAX
                );

            
                // Test increment wrapping with overflow
3
                assert_eq!(
3
                    kv.increment_key_by("i64", 1_i64)
3
                        .allow_overflow()
3
                        .execute()?,
                    i64::MIN
                );
3
                assert_eq!(
3
                    kv.increment_key_by("u64", 1_u64)
3
                        .allow_overflow()
3
                        .execute()?,
                    u64::MIN
                );

            
                // Test saturating increments.
3
                kv.set_numeric_key("i64", i64::MAX - 1).execute()?;
3
                kv.set_numeric_key("u64", u64::MAX - 1).execute()?;
3
                assert_eq!(kv.increment_key_by("i64", 2_i64).execute()?, i64::MAX);
3
                assert_eq!(kv.increment_key_by("u64", 2_u64).execute()?, u64::MAX);

            
                // Test saturating decrements.
3
                kv.set_numeric_key("i64", i64::MIN + 1).execute()?;
3
                kv.set_numeric_key("u64", u64::MIN + 1).execute()?;
3
                assert_eq!(kv.decrement_key_by("i64", 2_i64).execute()?, i64::MIN);
3
                assert_eq!(kv.decrement_key_by("u64", 2_u64).execute()?, u64::MIN);

            
                // Test numerical conversion safety using get
                {
                    // For i64 -> f64, the limit is 2^52 + 1 in either posive or
                    // negative directions.
3
                    kv.set_numeric_key("i64", (2_i64.pow(f64::MANTISSA_DIGITS)))
3
                        .execute()?;
3
                    $crate::assert_f64_eq!(
3
                        kv.get_key("i64").into_f64()?.unwrap(),
3
                        9_007_199_254_740_992_f64
                    );
3
                    kv.set_numeric_key("i64", -(2_i64.pow(f64::MANTISSA_DIGITS)))
3
                        .execute()?;
3
                    $crate::assert_f64_eq!(
3
                        kv.get_key("i64").into_f64()?.unwrap(),
3
                        -9_007_199_254_740_992_f64
                    );

            
3
                    kv.set_numeric_key("i64", (2_i64.pow(f64::MANTISSA_DIGITS) + 1))
3
                        .execute()?;
3
                    assert!(matches!(kv.get_key("i64").into_f64(), Err(_)));
3
                    $crate::assert_f64_eq!(
3
                        kv.get_key("i64").into_f64_lossy()?.unwrap(),
3
                        9_007_199_254_740_993_f64
                    );
3
                    kv.set_numeric_key("i64", -(2_i64.pow(f64::MANTISSA_DIGITS) + 1))
3
                        .execute()?;
3
                    assert!(matches!(kv.get_key("i64").into_f64(), Err(_)));
3
                    $crate::assert_f64_eq!(
3
                        kv.get_key("i64").into_f64_lossy()?.unwrap(),
3
                        -9_007_199_254_740_993_f64
                    );

            
                    // For i64 -> u64, the only limit is sign.
3
                    kv.set_numeric_key("i64", -1_i64).execute()?;
3
                    assert!(matches!(kv.get_key("i64").into_u64(), Err(_)));
3
                    assert_eq!(kv.get_key("i64").into_u64_lossy(true)?.unwrap(), 0_u64);
3
                    assert_eq!(kv.get_key("i64").into_u64_lossy(false)?.unwrap(), u64::MAX);

            
                    // For f64 -> i64, the limit is fractional numbers. Saturating isn't tested in this conversion path.
3
                    kv.set_numeric_key("f64", 1.1_f64).execute()?;
3
                    assert!(matches!(kv.get_key("f64").into_i64(), Err(_)));
3
                    assert_eq!(kv.get_key("f64").into_i64_lossy(false)?.unwrap(), 1_i64);
3
                    kv.set_numeric_key("f64", -1.1_f64).execute()?;
3
                    assert!(matches!(kv.get_key("f64").into_i64(), Err(_)));
3
                    assert_eq!(kv.get_key("f64").into_i64_lossy(false)?.unwrap(), -1_i64);

            
                    // For f64 -> u64, the limit is fractional numbers or negative numbers. Saturating isn't tested in this conversion path.
3
                    kv.set_numeric_key("f64", 1.1_f64).execute()?;
3
                    assert!(matches!(kv.get_key("f64").into_u64(), Err(_)));
3
                    assert_eq!(kv.get_key("f64").into_u64_lossy(false)?.unwrap(), 1_u64);
3
                    kv.set_numeric_key("f64", -1.1_f64).execute()?;
3
                    assert!(matches!(kv.get_key("f64").into_u64(), Err(_)));
3
                    assert_eq!(kv.get_key("f64").into_u64_lossy(false)?.unwrap(), 0_u64);

            
                    // For u64 -> i64, the limit is > i64::MAX
3
                    kv.set_numeric_key("u64", i64::MAX as u64 + 1).execute()?;
3
                    assert!(matches!(kv.get_key("u64").into_i64(), Err(_)));
3
                    assert_eq!(kv.get_key("u64").into_i64_lossy(true)?.unwrap(), i64::MAX);
3
                    assert_eq!(kv.get_key("u64").into_i64_lossy(false)?.unwrap(), i64::MIN);
                }

            
                // Test that non-numeric keys won't be changed when attempting to incr/decr
3
                kv.set_key("non-numeric", &String::from("test")).execute()?;
3
                assert!(matches!(
3
                    kv.increment_key_by("non-numeric", 1_i64).execute(),
                    Err(_)
                ));
3
                assert!(matches!(
3
                    kv.decrement_key_by("non-numeric", 1_i64).execute(),
                    Err(_)
                ));
3
                assert_eq!(
3
                    kv.get_key("non-numeric").into::<String>()?.unwrap(),
3
                    String::from("test")
                );

            
                // Test that NaN cannot be stored
3
                kv.set_numeric_key("f64", 0_f64).execute()?;
3
                assert!(matches!(
3
                    kv.set_numeric_key("f64", f64::NAN).execute(),
                    Err(bonsaidb_core::Error::NotANumber)
                ));
                // Verify the value was unchanged.
3
                $crate::assert_f64_eq!(kv.get_key("f64").into_f64()?.unwrap(), 0.);
                // Try to increment by nan
3
                assert!(matches!(
3
                    kv.increment_key_by("f64", f64::NAN).execute(),
                    Err(bonsaidb_core::Error::NotANumber)
                ));
3
                $crate::assert_f64_eq!(kv.get_key("f64").into_f64()?.unwrap(), 0.);

            
3
                harness.shutdown()?;

            
3
                Ok(())
3
            }

            
            #[test]
3
            fn kv_expiration_tests() -> anyhow::Result<()> {
                use std::time::Duration;

            
                use $crate::keyvalue::{KeyStatus, KeyValue};

            
3
                let harness = $harness::new($crate::test_util::HarnessTest::KvExpiration)?;
3
                let db = harness.connect()?;

            
3
                loop {
3
                    let kv = db.with_key_namespace("expiration");
3

            
3
                    kv.delete_key("a")?;
3
                    kv.delete_key("b")?;

            
                    // Test that the expiration is updated for key a, but not for key b.
3
                    let timing = $crate::test_util::TimingTest::new(Duration::from_millis(500));
3
                    let r1 = kv
3
                        .set_key("a", &0_u32)
3
                        .expire_in(Duration::from_secs(2))
3
                        .execute()
3
                        .unwrap();
3
                    let r2 = kv
3
                        .set_key("b", &0_u32)
3
                        .expire_in(Duration::from_secs(2))
3
                        .execute()
3
                        .unwrap();
3
                    assert_eq!(r1, KeyStatus::Inserted);
3
                    assert_eq!(r2, KeyStatus::Inserted);
3
                    let r1 = kv
3
                        .set_key("a", &1_u32)
3
                        .expire_in(Duration::from_secs(4))
3
                        .execute()
3
                        .unwrap();
3
                    let r2 = kv
3
                        .set_key("b", &1_u32)
3
                        .expire_in(Duration::from_secs(100))
3
                        .keep_existing_expiration()
3
                        .execute()
3
                        .unwrap();
3

            
3
                    if timing.elapsed() > Duration::from_secs(1) {
                        println!(
                            "Restarting test {}. Took too long {:?}",
                            line!(),
                            timing.elapsed(),
                        );
                        continue;
3
                    }
3

            
3
                    assert_eq!(r1, KeyStatus::Updated, "a wasn't an update");
3
                    assert_eq!(r2, KeyStatus::Updated, "b wasn't an update");

            
3
                    let a = kv.get_key("a").into()?;
3
                    assert_eq!(a, Some(1u32), "a shouldn't have expired yet");

            
                    // Before checking the value, make sure we haven't elapsed too
                    // much time. If so, just restart the test.
3
                    if !timing.wait_until(Duration::from_secs_f32(3.)) {
                        println!(
                            "Restarting test {}. Took too long {:?}",
                            line!(),
                            timing.elapsed()
                        );
                        continue;
3
                    }

            
3
                    assert_eq!(kv.get_key("b").query()?, None, "b never expired");

            
3
                    timing.wait_until(Duration::from_secs_f32(5.));
3
                    assert_eq!(kv.get_key("a").query()?, None, "a never expired");
3
                    break;
3
                }
3
                harness.shutdown()?;

            
3
                Ok(())
3
            }

            
            #[test]
3
            fn delete_expire_tests() -> anyhow::Result<()> {
                use std::time::Duration;

            
                use $crate::keyvalue::{KeyStatus, KeyValue};

            
3
                let harness = $harness::new($crate::test_util::HarnessTest::KvDeleteExpire)?;
3
                let db = harness.connect()?;

            
3
                loop {
3
                    let kv = db.with_key_namespace("delete_expire");
3

            
3
                    kv.delete_key("a")?;

            
3
                    let timing = $crate::test_util::TimingTest::new(Duration::from_millis(100));
3

            
3
                    // Create a key with an expiration. Delete the key. Set a new
3
                    // value at that key with no expiration. Ensure it doesn't
3
                    // expire.
3
                    kv.set_key("a", &0_u32)
3
                        .expire_in(Duration::from_secs(2))
3
                        .execute()?;
3
                    kv.delete_key("a")?;
3
                    kv.set_key("a", &1_u32).execute()?;
3
                    if timing.elapsed() > Duration::from_secs(1) {
                        println!(
                            "Restarting test {}. Took too long {:?}",
                            line!(),
                            timing.elapsed(),
                        );
                        continue;
3
                    }
3
                    if !timing.wait_until(Duration::from_secs_f32(2.5)) {
                        println!(
                            "Restarting test {}. Took too long {:?}",
                            line!(),
                            timing.elapsed()
                        );
                        continue;
3
                    }

            
3
                    assert_eq!(kv.get_key("a").into()?, Some(1u32));

            
3
                    break;
3
                }
3
                harness.shutdown()?;

            
3
                Ok(())
3
            }

            
            #[test]
            // This test can fail when the machine its running on is under high load or
            // constrained resources. We need a command that persists (and waits until
            // persist) to make this test fully deterministic.
3
            fn kv_transaction_tests() -> anyhow::Result<()> {
                use std::time::Duration;

            
                use $crate::{
                    connection::Connection,
                    keyvalue::{KeyStatus, KeyValue},
                };
                // loop {
3
                let harness = $harness::new($crate::test_util::HarnessTest::KvTransactions)?;
3
                let db = harness.connect()?;
                // Generate several transactions that we can validate. Persisting
                // happens in the background, so we delay between each step to give
                // it a moment.
3
                db.set_key("expires", &0_u32)
3
                    .expire_in(Duration::from_secs(1))
3
                    .execute()?;
3
                db.set_key("akey", &String::from("avalue")).execute()?;
3
                db.set_numeric_key("nkey", 0_u64).execute()?;
3
                std::thread::sleep(Duration::from_millis(500));
3
                db.increment_key_by("nkey", 1_u64).execute()?;
3
                std::thread::sleep(Duration::from_millis(500));
3
                db.delete_key("nkey")?;
3
                db.get_key("akey").and_delete().query()?;
3
                std::thread::sleep(Duration::from_millis(500));
3
                // Ensure this doesn't generate a transaction.
3
                db.delete_key("nkey")?;

            
3
                std::thread::sleep(Duration::from_secs(1));

            
3
                let transactions = Connection::list_executed_transactions(&db, None, None)?;
3
                let deleted_keys = transactions
3
                    .iter()
3
                    .filter_map(|tx| tx.changes.keys())
3
                    .flatten()
3
                    .filter(|changed_key| dbg!(changed_key).deleted)
3
                    .count();
3
                assert_eq!(deleted_keys, 3, "deleted keys wasn't 3");
3
                let akey_changes = transactions
3
                    .iter()
3
                    .filter_map(|tx| tx.changes.keys())
3
                    .flatten()
3
                    .filter(|changed_key| changed_key.key == "akey")
3
                    .count();
3
                assert_eq!(akey_changes, 2, "akey changes wasn't 2");
3
                let nkey_changes = transactions
3
                    .iter()
3
                    .filter_map(|tx| tx.changes.keys())
3
                    .flatten()
3
                    .filter(|changed_key| changed_key.key == "nkey")
3
                    .count();
3
                assert_eq!(nkey_changes, 3, "nkey changes wasn't 3");

            
3
                harness.shutdown()?;
                // }

            
3
                Ok(())
3
            }
        }
    };
}

            
pub struct TimingTest {
    tolerance: Duration,
    start: Instant,
}

            
impl TimingTest {
    #[must_use]
713
    pub fn new(tolerance: Duration) -> Self {
713
        Self {
713
            tolerance,
713
            start: Instant::now(),
713
        }
713
    }

            
    #[allow(clippy::must_use_candidate)]
558
    pub fn wait_until(&self, absolute_duration: Duration) -> bool {
558
        let target = self.start + absolute_duration;
558
        let mut now = Instant::now();
558
        if let Some(sleep_duration) = target.checked_duration_since(now) {
558
            std::thread::sleep(sleep_duration);
558
            now = Instant::now();
558
        }
558
        let amount_past = now.checked_duration_since(target);
558
        // Return false if we're beyond the tolerance given
558
        amount_past.unwrap_or_default() < self.tolerance
558
    }

            
465
    pub async fn wait_until_async(&self, absolute_duration: Duration) -> bool {
15
        let target = self.start + absolute_duration;
15
        let mut now = Instant::now();
15
        if now < target {
15
            tokio::time::sleep_until(target.into()).await;
15
            now = Instant::now();
        }
15
        let amount_past = now.checked_duration_since(target);
15

            
15
        // Return false if we're beyond the tolerance given
15
        amount_past.unwrap_or_default() < self.tolerance
15
    }

            
    #[must_use]
899
    pub fn elapsed(&self) -> Duration {
899
        Instant::now()
899
            .checked_duration_since(self.start)
899
            .unwrap_or_default()
899
    }
}

            
5
pub async fn basic_server_connection_tests<C: AsyncStorageConnection>(
5
    server: C,
5
    newdb_name: &str,
5
) -> anyhow::Result<()> {
5
    let mut schemas = server.list_available_schemas().await?;
5
    schemas.sort();
5
    assert!(schemas.contains(&BasicSchema::schema_name()));
5
    assert!(schemas.contains(&SchemaName::new("khonsulabs", "bonsaidb-admin")));

            
5
    let databases = server.list_databases().await?;
31
    assert!(databases.iter().any(|db| db.name == "tests"));

            
5
    server
8
        .create_database::<BasicSchema>(newdb_name, false)
8
        .await?;
5
    server.delete_database(newdb_name).await?;

            
    assert!(matches!(
5
        server.delete_database(newdb_name).await,
        Err(Error::DatabaseNotFound(_))
    ));

            
    assert!(matches!(
5
        server.create_database::<BasicSchema>("tests", false).await,
        Err(Error::DatabaseNameAlreadyTaken(_))
    ));

            
    assert!(matches!(
8
        server.create_database::<BasicSchema>("tests", true).await,
        Ok(_)
    ));

            
    assert!(matches!(
5
        server
5
            .create_database::<BasicSchema>("|invalidname", false)
5
            .await,
        Err(Error::InvalidDatabaseName(_))
    ));

            
    assert!(matches!(
5
        server
5
            .create_database::<UnassociatedCollection>(newdb_name, false)
5
            .await,
        Err(Error::SchemaNotRegistered(_))
    ));

            
5
    Ok(())
5
}

            
3
pub fn blocking_basic_server_connection_tests<C: StorageConnection>(
3
    server: &C,
3
    newdb_name: &str,
3
) -> anyhow::Result<()> {
3
    let mut schemas = server.list_available_schemas()?;
3
    schemas.sort();
3
    assert!(schemas.contains(&BasicSchema::schema_name()));
3
    assert!(schemas.contains(&SchemaName::new("khonsulabs", "bonsaidb-admin")));

            
3
    let databases = server.list_databases()?;
19
    assert!(databases.iter().any(|db| db.name == "tests"));

            
3
    server.create_database::<BasicSchema>(newdb_name, false)?;
3
    server.delete_database(newdb_name)?;

            
3
    assert!(matches!(
3
        server.delete_database(newdb_name),
        Err(Error::DatabaseNotFound(_))
    ));

            
3
    assert!(matches!(
3
        server.create_database::<BasicSchema>("tests", false),
        Err(Error::DatabaseNameAlreadyTaken(_))
    ));

            
3
    assert!(matches!(
3
        server.create_database::<BasicSchema>("tests", true),
        Ok(_)
    ));

            
3
    assert!(matches!(
3
        server.create_database::<BasicSchema>("|invalidname", false),
        Err(Error::InvalidDatabaseName(_))
    ));

            
3
    assert!(matches!(
3
        server.create_database::<UnassociatedCollection>(newdb_name, false),
        Err(Error::SchemaNotRegistered(_))
    ));

            
3
    Ok(())
3
}