1
#![allow(clippy::missing_panics_doc)]
2

            
3
use std::borrow::Cow;
4
use std::fmt::{Debug, Display};
5
use std::io::ErrorKind;
6
use std::ops::Deref;
7
use std::path::{Path, PathBuf};
8
use std::sync::atomic::{AtomicU32, Ordering};
9
use std::sync::Arc;
10
use std::time::{Duration, Instant};
11

            
12
use itertools::Itertools;
13
use serde::{Deserialize, Serialize};
14
use transmog_pot::Pot;
15

            
16
use crate::admin::{PermissionGroup, Role, User};
17
use crate::connection::{
18
    AccessPolicy, AsyncConnection, AsyncStorageConnection, Connection, StorageConnection,
19
};
20
use crate::document::{
21
    BorrowedDocument, CollectionDocument, CollectionHeader, DocumentId, Emit, Header, KeyId,
22
};
23
use crate::keyvalue::{AsyncKeyValue, KeyValue};
24
use crate::limits::{LIST_TRANSACTIONS_DEFAULT_RESULT_COUNT, LIST_TRANSACTIONS_MAX_RESULTS};
25
use crate::schema::view::map::{Mappings, ViewMappedValue};
26
use crate::schema::view::{MapReduce, ReduceResult, SerializedView, ViewUpdatePolicy};
27
use crate::schema::{
28
    Collection, CollectionName, MappedValue, NamedCollection, Qualified, Schema, SchemaName,
29
    Schematic, SerializedCollection, View, ViewMapResult, ViewSchema,
30
};
31
use crate::transaction::{Operation, OperationResult, Transaction};
32
use crate::Error;
33
#[cfg(feature = "token-authentication")]
34
use crate::{
35
    admin::AuthenticationToken,
36
    connection::{HasSession, Identity, IdentityReference, Session},
37
};
38

            
39
5128210
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq, Default, Clone, Collection)]
40
// This collection purposely uses names with characters that need
41
// escaping, since it's used in backup/restore.
42
#[collection(name = "_basic", authority = "khonsulabs_", views = [BasicCount, BasicByParentId, BasicByParentIdEager, BasicByTag, BasicByCategory, BasicByCategoryCow], core = crate)]
43
#[must_use]
44
pub struct Basic {
45
    pub value: String,
46
    pub category: Option<String>,
47
    pub parent_id: Option<u64>,
48
    pub tags: Vec<String>,
49
}
50

            
51
impl Basic {
52
3285
    pub fn new(value: impl Into<String>) -> Self {
53
3285
        Self {
54
3285
            value: value.into(),
55
3285
            tags: Vec::default(),
56
3285
            category: None,
57
3285
            parent_id: None,
58
3285
        }
59
3285
    }
60

            
61
43
    pub fn with_category(mut self, category: impl Into<String>) -> Self {
62
43
        self.category = Some(category.into());
63
43
        self
64
43
    }
65

            
66
32
    pub fn with_tag(mut self, tag: impl Into<String>) -> Self {
67
32
        self.tags.push(tag.into());
68
32
        self
69
32
    }
70

            
71
22520
    pub const fn with_parent_id(mut self, parent_id: u64) -> Self {
72
22520
        self.parent_id = Some(parent_id);
73
22520
        self
74
22520
    }
75
}
76

            
77
2714003
#[derive(Debug, Clone, View, ViewSchema)]
78
#[view(collection = Basic, key = (), value = usize, name = "count", core = crate)]
79
#[view_schema(core = crate)]
80
pub struct BasicCount;
81

            
82
impl MapReduce for BasicCount {
83
40
    fn map<'doc>(&self, document: &'doc BorrowedDocument<'_>) -> ViewMapResult<'doc, Self> {
84
40
        document.header.emit_key_and_value((), 1)
85
40
    }
86

            
87
40
    fn reduce(
88
40
        &self,
89
40
        mappings: &[ViewMappedValue<'_, Self>],
90
40
        _rereduce: bool,
91
40
    ) -> ReduceResult<Self::View> {
92
40
        Ok(mappings.iter().map(|map| map.value).sum())
93
40
    }
94
}
95

            
96
2766721
#[derive(Debug, Clone, View)]
97
#[view(collection = Basic, key = Option<u64>, value = usize, name = "by-parent-id", core = crate)]
98
pub struct BasicByParentId;
99

            
100
impl ViewSchema for BasicByParentId {
101
    type MappedKey<'doc> = <Self::View as View>::Key;
102
    type View = Self;
103

            
104
3920
    fn version(&self) -> u64 {
105
3920
        1
106
3920
    }
107
}
108

            
109
impl MapReduce for BasicByParentId {
110
4200
    fn map<'doc>(&self, document: &'doc BorrowedDocument<'_>) -> ViewMapResult<'doc, Self> {
111
4200
        let contents = Basic::document_contents(document)?;
112
4200
        document.header.emit_key_and_value(contents.parent_id, 1)
113
4200
    }
114

            
115
5160
    fn reduce(
116
5160
        &self,
117
5160
        mappings: &[ViewMappedValue<'_, Self>],
118
5160
        _rereduce: bool,
119
5160
    ) -> ReduceResult<Self::View> {
120
5160
        Ok(mappings.iter().map(|map| map.value).sum())
121
5160
    }
122
}
123

            
124
3264001
#[derive(Debug, Clone, View, ViewSchema)]
125
#[view(collection = Basic, key = Option<u64>, value = usize, name = "by-parent-id-eager", core = crate)]
126
#[view_schema(core = crate, version = 1, policy = Eager)]
127
pub struct BasicByParentIdEager;
128

            
129
impl MapReduce for BasicByParentIdEager {
130
495040
    fn map<'doc>(&self, document: &'doc BorrowedDocument<'_>) -> ViewMapResult<'doc, Self> {
131
495040
        let contents = Basic::document_contents(document)?;
132
495040
        document.header.emit_key_and_value(contents.parent_id, 1)
133
495040
    }
134

            
135
496000
    fn reduce(
136
496000
        &self,
137
496000
        mappings: &[ViewMappedValue<'_, Self>],
138
496000
        _rereduce: bool,
139
496000
    ) -> ReduceResult<Self::View> {
140
160862800
        Ok(mappings.iter().map(|map| map.value).sum())
141
496000
    }
142
}
143

            
144
2715921
#[derive(Debug, Clone, View, ViewSchema)]
145
#[view(collection = Basic, key = String, value = usize, name = "by-category", core = crate)]
146
#[view_schema(core = crate)]
147
pub struct BasicByCategory;
148

            
149
impl MapReduce for BasicByCategory {
150
1640
    fn map<'doc>(&self, document: &'doc BorrowedDocument<'_>) -> ViewMapResult<'doc, Self> {
151
1640
        let contents = Basic::document_contents(document)?;
152
1640
        if let Some(category) = &contents.category {
153
960
            document
154
960
                .header
155
960
                .emit_key_and_value(category.to_lowercase(), 1)
156
        } else {
157
680
            Ok(Mappings::none())
158
        }
159
1640
    }
160

            
161
640
    fn reduce(
162
640
        &self,
163
640
        mappings: &[ViewMappedValue<'_, Self>],
164
640
        _rereduce: bool,
165
640
    ) -> ReduceResult<Self::View> {
166
960
        Ok(mappings.iter().map(|map| map.value).sum())
167
640
    }
168
}
169

            
170
2715441
#[derive(Debug, Clone, View, ViewSchema)]
171
#[view(collection = Basic, key = String, value = usize, name = "by-category-cow", core = crate)]
172
#[view_schema(core = crate, mapped_key = Cow<'doc, str>)]
173
pub struct BasicByCategoryCow;
174

            
175
impl MapReduce for BasicByCategoryCow {
176
160
    fn map<'doc>(&self, document: &'doc BorrowedDocument<'_>) -> ViewMapResult<'doc, Self> {
177
960
        #[derive(Deserialize, Debug)]
178
        struct BorrowedBasic<'a> {
179
            category: Option<&'a str>,
180
        }
181
160
        let contents: BorrowedBasic<'_> = pot::from_slice(&document.contents)?;
182
160
        if let Some(category) = &contents.category {
183
120
            document
184
120
                .header
185
120
                .emit_key_and_value(Cow::Borrowed(category), 1)
186
        } else {
187
40
            Ok(Mappings::none())
188
        }
189
160
    }
190

            
191
120
    fn reduce(
192
120
        &self,
193
120
        mappings: &[ViewMappedValue<'_, Self>],
194
120
        _rereduce: bool,
195
120
    ) -> ReduceResult<Self::View> {
196
120
        Ok(mappings
197
120
            .iter()
198
120
            .map(|map| {
199
120
                assert!(matches!(map.key, Cow::Borrowed(_)));
200
120
                map.value
201
120
            })
202
120
            .sum())
203
120
    }
204
}
205

            
206
2733201
#[derive(Debug, Clone, View, ViewSchema)]
207
#[view(collection = Basic, key = String, value = usize, name = "by-tag", core = crate)]
208
#[view_schema(core = crate)]
209
pub struct BasicByTag;
210

            
211
impl MapReduce for BasicByTag {
212
1320
    fn map<'doc>(&self, document: &'doc BorrowedDocument<'_>) -> ViewMapResult<'doc, Self> {
213
1320
        let contents = Basic::document_contents(document)?;
214
1320
        contents
215
1320
            .tags
216
1320
            .iter()
217
1920
            .map(|tag| document.header.emit_key_and_value(tag.clone(), 1))
218
1320
            .collect()
219
1320
    }
220

            
221
2240
    fn reduce(
222
2240
        &self,
223
2240
        mappings: &[ViewMappedValue<'_, Self>],
224
2240
        _rereduce: bool,
225
2240
    ) -> ReduceResult<Self::View> {
226
2880
        Ok(mappings.iter().map(|map| map.value).sum())
227
2240
    }
228
}
229

            
230
480
#[derive(Debug, Clone, View, ViewSchema)]
231
#[view(collection = Basic, key = (), value = (), name = "by-parent-id", core = crate)]
232
#[view_schema(core = crate)]
233
pub struct BasicByBrokenParentId;
234

            
235
impl MapReduce for BasicByBrokenParentId {
236
40
    fn map<'doc>(&self, document: &'doc BorrowedDocument<'_>) -> ViewMapResult<'doc, Self> {
237
40
        document.header.emit()
238
40
    }
239
}
240

            
241
3348880
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq, Default, Clone, Collection)]
242
#[collection(name = "encrypted-basic", authority = "khonsulabs", views = [EncryptedBasicCount, EncryptedBasicByParentId, EncryptedBasicByCategory])]
243
#[collection(encryption_key = Some(KeyId::Master), encryption_optional, core = crate)]
244
#[must_use]
245
pub struct EncryptedBasic {
246
    pub value: String,
247
    pub category: Option<String>,
248
    pub parent_id: Option<u64>,
249
}
250

            
251
impl EncryptedBasic {
252
1
    pub fn new(value: impl Into<String>) -> Self {
253
1
        Self {
254
1
            value: value.into(),
255
1
            category: None,
256
1
            parent_id: None,
257
1
        }
258
1
    }
259

            
260
    pub fn with_category(mut self, category: impl Into<String>) -> Self {
261
        self.category = Some(category.into());
262
        self
263
    }
264

            
265
    pub const fn with_parent_id(mut self, parent_id: u64) -> Self {
266
        self.parent_id = Some(parent_id);
267
        self
268
    }
269
}
270

            
271
1675160
#[derive(Debug, Clone, View, ViewSchema)]
272
#[view(collection = EncryptedBasic, key = (), value = usize, name = "count", core = crate)]
273
#[view_schema(core = crate)]
274
pub struct EncryptedBasicCount;
275

            
276
impl MapReduce for EncryptedBasicCount {
277
    fn map<'doc>(&self, document: &'doc BorrowedDocument<'_>) -> ViewMapResult<'doc, Self> {
278
        document.header.emit_key_and_value((), 1)
279
    }
280

            
281
    fn reduce(
282
        &self,
283
        mappings: &[ViewMappedValue<'_, Self>],
284
        _rereduce: bool,
285
    ) -> ReduceResult<Self::View> {
286
        Ok(mappings.iter().map(|map| map.value).sum())
287
    }
288
}
289

            
290
1675160
#[derive(Debug, Clone, View, ViewSchema)]
291
#[view(collection = EncryptedBasic, key = Option<u64>, value = usize, name = "by-parent-id", core = crate)]
292
#[view_schema(core = crate)]
293
pub struct EncryptedBasicByParentId;
294

            
295
impl MapReduce for EncryptedBasicByParentId {
296
    fn map<'doc>(&self, document: &'doc BorrowedDocument<'_>) -> ViewMapResult<'doc, Self> {
297
        let contents = EncryptedBasic::document_contents(document)?;
298
        document.header.emit_key_and_value(contents.parent_id, 1)
299
    }
300

            
301
    fn reduce(
302
        &self,
303
        mappings: &[ViewMappedValue<'_, Self>],
304
        _rereduce: bool,
305
    ) -> ReduceResult<Self::View> {
306
        Ok(mappings.iter().map(|map| map.value).sum())
307
    }
308
}
309

            
310
1675160
#[derive(Debug, Clone, View, ViewSchema)]
311
#[view(collection = EncryptedBasic, key = String, value = usize, name = "by-category", core = crate)]
312
#[view_schema(core = crate)]
313
pub struct EncryptedBasicByCategory;
314

            
315
impl MapReduce for EncryptedBasicByCategory {
316
    fn map<'doc>(&self, document: &'doc BorrowedDocument<'_>) -> ViewMapResult<'doc, Self> {
317
        let contents = EncryptedBasic::document_contents(document)?;
318
        if let Some(category) = &contents.category {
319
            document
320
                .header
321
                .emit_key_and_value(category.to_lowercase(), 1)
322
        } else {
323
            Ok(Mappings::none())
324
        }
325
    }
326

            
327
    fn reduce(
328
        &self,
329
        mappings: &[ViewMappedValue<'_, Self>],
330
        _rereduce: bool,
331
    ) -> ReduceResult<Self::View> {
332
        Ok(mappings.iter().map(|map| map.value).sum())
333
    }
334
}
335

            
336
1674440
#[derive(Debug, Schema)]
337
#[schema(name = "basic", collections = [Basic, EncryptedBasic, Unique], core = crate)]
338
pub struct BasicSchema;
339

            
340
1674440
#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Default, Collection)]
341
#[collection(name = "unique", authority = "khonsulabs", views = [UniqueValue], core = crate)]
342
pub struct Unique {
343
    pub value: String,
344
}
345

            
346
impl Unique {
347
40
    pub fn new(value: impl Display) -> Self {
348
40
        Self {
349
40
            value: value.to_string(),
350
40
        }
351
40
    }
352
}
353

            
354
1693000
#[derive(Debug, Clone, View, ViewSchema)]
355
#[view(collection = Unique, key = String, value = (), name = "unique-value", core = crate)]
356
#[view_schema(core = crate, policy = Unique)]
357
pub struct UniqueValue;
358

            
359
impl MapReduce for UniqueValue {
360
2560
    fn map<'doc>(&self, document: &'doc BorrowedDocument<'_>) -> ViewMapResult<'doc, Self> {
361
2560
        let entry = Unique::document_contents(document)?;
362
2560
        document.header.emit_key(entry.value)
363
2560
    }
364
}
365

            
366
impl NamedCollection for Unique {
367
    type ByNameView = UniqueValue;
368
}
369

            
370
#[derive(Debug)]
371
pub struct TestDirectory(pub PathBuf);
372

            
373
impl TestDirectory {
374
5
    pub fn absolute<S: AsRef<Path>>(path: S) -> Self {
375
5
        let path = path.as_ref().to_owned();
376
5
        if path.exists() {
377
            std::fs::remove_dir_all(&path).expect("error clearing temporary directory");
378
5
        }
379
5
        Self(path)
380
5
    }
381

            
382
222
    pub fn new<S: AsRef<Path>>(name: S) -> Self {
383
222
        let path = std::env::temp_dir().join(name);
384
222
        if path.exists() {
385
            std::fs::remove_dir_all(&path).expect("error clearing temporary directory");
386
222
        }
387
222
        Self(path)
388
222
    }
389
}
390

            
391
impl Drop for TestDirectory {
392
9000
    fn drop(&mut self) {
393
9000
        if let Err(err) = std::fs::remove_dir_all(&self.0) {
394
            if err.kind() != ErrorKind::NotFound {
395
                eprintln!("Failed to clean up temporary folder: {err:?}");
396
            }
397
9000
        }
398
9000
    }
399
}
400

            
401
impl AsRef<Path> for TestDirectory {
402
9760
    fn as_ref(&self) -> &Path {
403
9760
        &self.0
404
9760
    }
405
}
406

            
407
impl Deref for TestDirectory {
408
    type Target = PathBuf;
409

            
410
40
    fn deref(&self) -> &Self::Target {
411
40
        &self.0
412
40
    }
413
}
414

            
415
#[derive(Debug)]
416
pub struct BasicCollectionWithNoViews;
417

            
418
impl Collection for BasicCollectionWithNoViews {
419
    type PrimaryKey = u64;
420

            
421
320
    fn collection_name() -> CollectionName {
422
320
        Basic::collection_name()
423
320
    }
424

            
425
80
    fn define_views(_schema: &mut Schematic) -> Result<(), Error> {
426
80
        Ok(())
427
80
    }
428
}
429

            
430
impl SerializedCollection for BasicCollectionWithNoViews {
431
    type Contents = Basic;
432
    type Format = Pot;
433

            
434
40
    fn format() -> Self::Format {
435
40
        Pot::default()
436
40
    }
437
}
438

            
439
#[derive(Debug)]
440
pub struct BasicCollectionWithOnlyBrokenParentId;
441

            
442
impl Collection for BasicCollectionWithOnlyBrokenParentId {
443
    type PrimaryKey = u64;
444

            
445
280
    fn collection_name() -> CollectionName {
446
280
        Basic::collection_name()
447
280
    }
448

            
449
80
    fn define_views(schema: &mut Schematic) -> Result<(), Error> {
450
80
        schema.define_view(BasicByBrokenParentId)
451
80
    }
452
}
453

            
454
640
#[derive(Serialize, Deserialize, Clone, Debug, Collection)]
455
#[collection(name = "unassociated", authority = "khonsulabs", core = crate)]
456
pub struct UnassociatedCollection;
457

            
458
11320
#[derive(Copy, Clone, Debug)]
459
pub enum HarnessTest {
460
    ServerConnectionTests = 1,
461
    StoreRetrieveUpdate,
462
    Overwrite,
463
    NotFound,
464
    Conflict,
465
    BadUpdate,
466
    NoUpdate,
467
    GetMultiple,
468
    List,
469
    ListTransactions,
470
    Transactions,
471
    TransactionCheck,
472
    ViewQuery,
473
    UnassociatedCollection,
474
    Compact,
475
    ViewUpdate,
476
    ViewMultiEmit,
477
    ViewUnimplementedReduce,
478
    ViewAccessPolicies,
479
    ViewCow,
480
    Encryption,
481
    UniqueViews,
482
    NamedCollection,
483
    PubSubSimple,
484
    UserManagement,
485
    TokenAuthentication,
486
    PubSubMultipleSubscribers,
487
    PubSubDropAndSend,
488
    PubSubUnsubscribe,
489
    PubSubDropCleanup,
490
    PubSubPublishAll,
491
    KvBasic,
492
    KvConcurrency,
493
    KvSet,
494
    KvIncrementDecrement,
495
    KvExpiration,
496
    KvDeleteExpire,
497
    KvTransactions,
498
}
499

            
500
impl HarnessTest {
501
    #[must_use]
502
    pub const fn port(self, base: u16) -> u16 {
503
        base + self as u16
504
    }
505
}
506

            
507
impl Display for HarnessTest {
508
11320
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
509
11320
        Debug::fmt(&self, f)
510
11320
    }
511
}
512

            
513
/// Compares two f64's accounting for the epsilon.
514
#[macro_export]
515
macro_rules! assert_f64_eq {
516
    ($a:expr, $b:expr) => {{
517
        let a: f64 = $a;
518
        let b: f64 = $b;
519
        assert!((a - b).abs() <= f64::EPSILON, "{:?} <> {:?}", a, b);
520
    }};
521
}
522

            
523
/// Creates a test suite that tests methods available on [`AsyncConnection`]
524
#[macro_export]
525
macro_rules! define_async_connection_test_suite {
526
    ($harness:ident) => {
527
        #[cfg(test)]
528
        mod r#async_connection {
529
            use super::$harness;
530
            #[tokio::test]
531
5
            async fn server_connection_tests() -> anyhow::Result<()> {
532
5
                let harness =
533
5
                    $harness::new($crate::test_util::HarnessTest::ServerConnectionTests).await?;
534
5
                let db = harness.server();
535
5
                $crate::test_util::basic_server_connection_tests(
536
5
                    db.clone(),
537
5
                    &format!("server-connection-tests-{}", $harness::server_name()),
538
5
                )
539
5
                .await?;
540
5
                harness.shutdown().await
541
5
            }
542

            
543
            #[tokio::test]
544
5
            async fn store_retrieve_update_delete() -> anyhow::Result<()> {
545
5
                let harness =
546
5
                    $harness::new($crate::test_util::HarnessTest::StoreRetrieveUpdate).await?;
547
5
                let db = harness.connect().await?;
548
5
                $crate::test_util::store_retrieve_update_delete_tests(&db).await?;
549
5
                harness.shutdown().await
550
5
            }
551

            
552
            #[tokio::test]
553
5
            async fn overwrite() -> anyhow::Result<()> {
554
5
                let harness = $harness::new($crate::test_util::HarnessTest::Overwrite).await?;
555
5
                let db = harness.connect().await?;
556
5
                $crate::test_util::overwrite_tests(&db).await?;
557
5
                harness.shutdown().await
558
5
            }
559

            
560
            #[tokio::test]
561
5
            async fn not_found() -> anyhow::Result<()> {
562
5
                let harness = $harness::new($crate::test_util::HarnessTest::NotFound).await?;
563
5
                let db = harness.connect().await?;
564
5

            
565
5
                $crate::test_util::not_found_tests(&db).await?;
566
5
                harness.shutdown().await
567
5
            }
568

            
569
            #[tokio::test]
570
5
            async fn conflict() -> anyhow::Result<()> {
571
5
                let harness = $harness::new($crate::test_util::HarnessTest::Conflict).await?;
572
5
                let db = harness.connect().await?;
573
5

            
574
5
                $crate::test_util::conflict_tests(&db).await?;
575
5
                harness.shutdown().await
576
5
            }
577

            
578
            #[tokio::test]
579
5
            async fn bad_update() -> anyhow::Result<()> {
580
5
                let harness = $harness::new($crate::test_util::HarnessTest::BadUpdate).await?;
581
5
                let db = harness.connect().await?;
582
5

            
583
5
                $crate::test_util::bad_update_tests(&db).await?;
584
5
                harness.shutdown().await
585
5
            }
586

            
587
            #[tokio::test]
588
5
            async fn no_update() -> anyhow::Result<()> {
589
5
                let harness = $harness::new($crate::test_util::HarnessTest::NoUpdate).await?;
590
5
                let db = harness.connect().await?;
591
5

            
592
5
                $crate::test_util::no_update_tests(&db).await?;
593
5
                harness.shutdown().await
594
5
            }
595

            
596
            #[tokio::test]
597
5
            async fn get_multiple() -> anyhow::Result<()> {
598
5
                let harness = $harness::new($crate::test_util::HarnessTest::GetMultiple).await?;
599
5
                let db = harness.connect().await?;
600
5

            
601
5
                $crate::test_util::get_multiple_tests(&db).await?;
602
5
                harness.shutdown().await
603
5
            }
604

            
605
            #[tokio::test]
606
5
            async fn list() -> anyhow::Result<()> {
607
5
                let harness = $harness::new($crate::test_util::HarnessTest::List).await?;
608
5
                let db = harness.connect().await?;
609
5

            
610
5
                $crate::test_util::list_tests(&db).await?;
611
5
                harness.shutdown().await
612
5
            }
613

            
614
            #[tokio::test]
615
5
            async fn list_transactions() -> anyhow::Result<()> {
616
5
                let harness =
617
5
                    $harness::new($crate::test_util::HarnessTest::ListTransactions).await?;
618
5
                let db = harness.connect().await?;
619
5

            
620
5
                $crate::test_util::list_transactions_tests(&db).await?;
621
5
                harness.shutdown().await
622
5
            }
623

            
624
            #[tokio::test]
625
5
            async fn transactions() -> anyhow::Result<()> {
626
5
                let harness = $harness::new($crate::test_util::HarnessTest::Transactions).await?;
627
5
                let db = harness.connect().await?;
628
5

            
629
5
                $crate::test_util::transaction_tests(&db).await?;
630
5
                harness.shutdown().await
631
5
            }
632

            
633
            #[tokio::test]
634
5
            async fn transaction_check() -> anyhow::Result<()> {
635
5
                let harness =
636
5
                    $harness::new($crate::test_util::HarnessTest::TransactionCheck).await?;
637
5
                let db = harness.connect().await?;
638
5

            
639
5
                $crate::test_util::transaction_check_tests(&db).await?;
640
5
                harness.shutdown().await
641
5
            }
642

            
643
            #[tokio::test]
644
5
            async fn view_query() -> anyhow::Result<()> {
645
5
                let harness = $harness::new($crate::test_util::HarnessTest::ViewQuery).await?;
646
5
                let db = harness.connect().await?;
647
5

            
648
5
                $crate::test_util::view_query_tests(&db).await?;
649
5
                harness.shutdown().await
650
5
            }
651

            
652
            #[tokio::test]
653
5
            async fn unassociated_collection() -> anyhow::Result<()> {
654
5
                let harness =
655
5
                    $harness::new($crate::test_util::HarnessTest::UnassociatedCollection).await?;
656
5
                let db = harness.connect().await?;
657
5

            
658
5
                $crate::test_util::unassociated_collection_tests(&db).await?;
659
5
                harness.shutdown().await
660
5
            }
661

            
662
            #[tokio::test]
663
5
            async fn unimplemented_reduce() -> anyhow::Result<()> {
664
5
                let harness =
665
5
                    $harness::new($crate::test_util::HarnessTest::ViewUnimplementedReduce).await?;
666
5
                let db = harness.connect().await?;
667
5

            
668
5
                $crate::test_util::unimplemented_reduce(&db).await?;
669
5
                harness.shutdown().await
670
5
            }
671

            
672
            #[tokio::test]
673
5
            async fn view_update() -> anyhow::Result<()> {
674
5
                let harness = $harness::new($crate::test_util::HarnessTest::ViewUpdate).await?;
675
5
                let db = harness.connect().await?;
676
5

            
677
5
                $crate::test_util::view_update_tests(&db).await?;
678
5
                harness.shutdown().await
679
5
            }
680

            
681
            #[tokio::test]
682
5
            async fn view_multi_emit() -> anyhow::Result<()> {
683
5
                let harness = $harness::new($crate::test_util::HarnessTest::ViewMultiEmit).await?;
684
5
                let db = harness.connect().await?;
685
5

            
686
5
                $crate::test_util::view_multi_emit_tests(&db).await?;
687
5
                harness.shutdown().await
688
5
            }
689

            
690
            #[tokio::test]
691
5
            async fn view_access_policies() -> anyhow::Result<()> {
692
5
                let harness =
693
5
                    $harness::new($crate::test_util::HarnessTest::ViewAccessPolicies).await?;
694
5
                let db = harness.connect().await?;
695
5

            
696
5
                $crate::test_util::view_access_policy_tests(&db).await?;
697
5
                harness.shutdown().await
698
5
            }
699

            
700
            #[tokio::test]
701
5
            async fn unique_views() -> anyhow::Result<()> {
702
5
                let harness = $harness::new($crate::test_util::HarnessTest::UniqueViews).await?;
703
5
                let db = harness.connect().await?;
704
5

            
705
5
                $crate::test_util::unique_view_tests(&db).await?;
706
5
                harness.shutdown().await
707
5
            }
708

            
709
            #[tokio::test]
710
5
            async fn named_collection() -> anyhow::Result<()> {
711
5
                let harness =
712
5
                    $harness::new($crate::test_util::HarnessTest::NamedCollection).await?;
713
5
                let db = harness.connect().await?;
714
5

            
715
5
                $crate::test_util::named_collection_tests(&db).await?;
716
5
                harness.shutdown().await
717
5
            }
718

            
719
            #[tokio::test]
720
5
            async fn user_management() -> anyhow::Result<()> {
721
5
                use $crate::connection::AsyncStorageConnection;
722
5
                let harness = $harness::new($crate::test_util::HarnessTest::UserManagement).await?;
723
5
                let _db = harness.connect().await?;
724
5
                let server = harness.server();
725
5
                let admin = server
726
5
                    .database::<$crate::admin::Admin>($crate::admin::ADMIN_DATABASE_NAME)
727
5
                    .await?;
728
5

            
729
5
                $crate::test_util::user_management_tests(
730
5
                    &admin,
731
5
                    server.clone(),
732
5
                    $harness::server_name(),
733
5
                )
734
5
                .await?;
735
5
                harness.shutdown().await
736
5
            }
737

            
738
            #[tokio::test]
739
            #[cfg(feature = "token-authentication")]
740
5
            async fn token_authentication() -> anyhow::Result<()> {
741
5
                use $crate::connection::AsyncStorageConnection;
742
5
                let harness =
743
5
                    $harness::new($crate::test_util::HarnessTest::TokenAuthentication).await?;
744
5
                let _db = harness.connect().await?;
745
5
                let server = harness.server();
746
5
                let admin = server
747
5
                    .database::<$crate::admin::Admin>($crate::admin::ADMIN_DATABASE_NAME)
748
5
                    .await?;
749
5

            
750
5
                $crate::test_util::token_authentication_tests(
751
5
                    &admin,
752
5
                    server,
753
5
                    $harness::server_name(),
754
5
                )
755
5
                .await?;
756
5
                harness.shutdown().await
757
5
            }
758

            
759
            #[tokio::test]
760
5
            async fn compaction() -> anyhow::Result<()> {
761
5
                let harness = $harness::new($crate::test_util::HarnessTest::Compact).await?;
762
5
                let db = harness.connect().await?;
763
5

            
764
5
                $crate::test_util::compaction_tests(&db).await?;
765
5
                harness.shutdown().await
766
5
            }
767
        }
768
    };
769
}
770

            
771
/// Creates a test suite that tests methods available on [`AsyncConnection`]
772
#[macro_export]
773
macro_rules! define_blocking_connection_test_suite {
774
    ($harness:ident) => {
775
        #[cfg(test)]
776
        mod blocking_connection {
777
            use super::$harness;
778
            #[test]
779
3
            fn server_connection_tests() -> anyhow::Result<()> {
780
3
                let harness = $harness::new($crate::test_util::HarnessTest::ServerConnectionTests)?;
781
3
                let db = harness.server();
782
3
                $crate::test_util::blocking_basic_server_connection_tests(
783
3
                    db,
784
3
                    &format!("server-connection-tests-{}", $harness::server_name()),
785
3
                )?;
786
3
                harness.shutdown()
787
3
            }
788

            
789
            #[test]
790
3
            fn store_retrieve_update_delete() -> anyhow::Result<()> {
791
3
                let harness = $harness::new($crate::test_util::HarnessTest::StoreRetrieveUpdate)?;
792
3
                let db = harness.connect()?;
793
3
                $crate::test_util::blocking_store_retrieve_update_delete_tests(&db)?;
794
3
                harness.shutdown()
795
3
            }
796

            
797
            #[test]
798
3
            fn overwrite() -> anyhow::Result<()> {
799
3
                let harness = $harness::new($crate::test_util::HarnessTest::Overwrite)?;
800
3
                let db = harness.connect()?;
801
3
                $crate::test_util::blocking_overwrite_tests(&db)?;
802
3
                harness.shutdown()
803
3
            }
804

            
805
            #[test]
806
3
            fn not_found() -> anyhow::Result<()> {
807
3
                let harness = $harness::new($crate::test_util::HarnessTest::NotFound)?;
808
3
                let db = harness.connect()?;
809

            
810
3
                $crate::test_util::blocking_not_found_tests(&db)?;
811
3
                harness.shutdown()
812
3
            }
813

            
814
            #[test]
815
3
            fn conflict() -> anyhow::Result<()> {
816
3
                let harness = $harness::new($crate::test_util::HarnessTest::Conflict)?;
817
3
                let db = harness.connect()?;
818

            
819
3
                $crate::test_util::blocking_conflict_tests(&db)?;
820
3
                harness.shutdown()
821
3
            }
822

            
823
            #[test]
824
3
            fn bad_update() -> anyhow::Result<()> {
825
3
                let harness = $harness::new($crate::test_util::HarnessTest::BadUpdate)?;
826
3
                let db = harness.connect()?;
827

            
828
3
                $crate::test_util::blocking_bad_update_tests(&db)?;
829
3
                harness.shutdown()
830
3
            }
831

            
832
            #[test]
833
3
            fn no_update() -> anyhow::Result<()> {
834
3
                let harness = $harness::new($crate::test_util::HarnessTest::NoUpdate)?;
835
3
                let db = harness.connect()?;
836

            
837
3
                $crate::test_util::blocking_no_update_tests(&db)?;
838
3
                harness.shutdown()
839
3
            }
840

            
841
            #[test]
842
3
            fn get_multiple() -> anyhow::Result<()> {
843
3
                let harness = $harness::new($crate::test_util::HarnessTest::GetMultiple)?;
844
3
                let db = harness.connect()?;
845

            
846
3
                $crate::test_util::blocking_get_multiple_tests(&db)?;
847
3
                harness.shutdown()
848
3
            }
849

            
850
            #[test]
851
3
            fn list() -> anyhow::Result<()> {
852
3
                let harness = $harness::new($crate::test_util::HarnessTest::List)?;
853
3
                let db = harness.connect()?;
854

            
855
3
                $crate::test_util::blocking_list_tests(&db)?;
856
3
                harness.shutdown()
857
3
            }
858

            
859
            #[test]
860
3
            fn list_transactions() -> anyhow::Result<()> {
861
3
                let harness = $harness::new($crate::test_util::HarnessTest::ListTransactions)?;
862
3
                let db = harness.connect()?;
863

            
864
3
                $crate::test_util::blocking_list_transactions_tests(&db)?;
865
3
                harness.shutdown()
866
3
            }
867

            
868
            #[test]
869
3
            fn transaction_check() -> anyhow::Result<()> {
870
3
                let harness = $harness::new($crate::test_util::HarnessTest::TransactionCheck)?;
871
3
                let db = harness.connect()?;
872

            
873
3
                $crate::test_util::blocking_transaction_check_tests(&db)?;
874
3
                harness.shutdown()
875
3
            }
876

            
877
            #[test]
878
3
            fn transactions() -> anyhow::Result<()> {
879
3
                let harness = $harness::new($crate::test_util::HarnessTest::Transactions)?;
880
3
                let db = harness.connect()?;
881

            
882
3
                $crate::test_util::blocking_transaction_tests(&db)?;
883
3
                harness.shutdown()
884
3
            }
885

            
886
            #[test]
887
3
            fn view_query() -> anyhow::Result<()> {
888
3
                let harness = $harness::new($crate::test_util::HarnessTest::ViewQuery)?;
889
3
                let db = harness.connect()?;
890

            
891
3
                $crate::test_util::blocking_view_query_tests(&db)?;
892
3
                harness.shutdown()
893
3
            }
894

            
895
            #[test]
896
3
            fn unassociated_collection() -> anyhow::Result<()> {
897
3
                let harness =
898
3
                    $harness::new($crate::test_util::HarnessTest::UnassociatedCollection)?;
899
3
                let db = harness.connect()?;
900

            
901
3
                $crate::test_util::blocking_unassociated_collection_tests(&db)?;
902
3
                harness.shutdown()
903
3
            }
904

            
905
            #[test]
906
3
            fn unimplemented_reduce() -> anyhow::Result<()> {
907
3
                let harness =
908
3
                    $harness::new($crate::test_util::HarnessTest::ViewUnimplementedReduce)?;
909
3
                let db = harness.connect()?;
910

            
911
3
                $crate::test_util::blocking_unimplemented_reduce(&db)?;
912
3
                harness.shutdown()
913
3
            }
914

            
915
            #[test]
916
3
            fn view_update() -> anyhow::Result<()> {
917
3
                let harness = $harness::new($crate::test_util::HarnessTest::ViewUpdate)?;
918
3
                let db = harness.connect()?;
919

            
920
3
                $crate::test_util::blocking_view_update_tests(&db)?;
921
3
                harness.shutdown()
922
3
            }
923

            
924
            #[test]
925
3
            fn view_multi_emit() -> anyhow::Result<()> {
926
3
                let harness = $harness::new($crate::test_util::HarnessTest::ViewMultiEmit)?;
927
3
                let db = harness.connect()?;
928

            
929
3
                $crate::test_util::blocking_view_multi_emit_tests(&db)?;
930
3
                harness.shutdown()
931
3
            }
932

            
933
            #[test]
934
3
            fn view_access_policies() -> anyhow::Result<()> {
935
3
                let harness = $harness::new($crate::test_util::HarnessTest::ViewAccessPolicies)?;
936
3
                let db = harness.connect()?;
937

            
938
3
                $crate::test_util::blocking_view_access_policy_tests(&db)?;
939
3
                harness.shutdown()
940
3
            }
941

            
942
            #[test]
943
3
            fn unique_views() -> anyhow::Result<()> {
944
3
                let harness = $harness::new($crate::test_util::HarnessTest::UniqueViews)?;
945
3
                let db = harness.connect()?;
946

            
947
3
                $crate::test_util::blocking_unique_view_tests(&db)?;
948
3
                harness.shutdown()
949
3
            }
950

            
951
            #[test]
952
3
            fn cow_views() -> anyhow::Result<()> {
953
3
                let harness = $harness::new($crate::test_util::HarnessTest::ViewCow)?;
954
3
                let db = harness.connect()?;
955
3
                $crate::test_util::blocking_cow_views(&db)?;
956
3
                harness.shutdown()
957
3
            }
958

            
959
            #[test]
960
3
            fn named_collection() -> anyhow::Result<()> {
961
3
                let harness = $harness::new($crate::test_util::HarnessTest::NamedCollection)?;
962
3
                let db = harness.connect()?;
963

            
964
3
                $crate::test_util::blocking_named_collection_tests(&db)?;
965
3
                harness.shutdown()
966
3
            }
967

            
968
            #[test]
969
3
            fn user_management() -> anyhow::Result<()> {
970
                use $crate::connection::StorageConnection;
971
3
                let harness = $harness::new($crate::test_util::HarnessTest::UserManagement)?;
972
3
                let _db = harness.connect()?;
973
3
                let server = harness.server();
974
3
                let admin =
975
3
                    server.database::<$crate::admin::Admin>($crate::admin::ADMIN_DATABASE_NAME)?;
976

            
977
3
                $crate::test_util::blocking_user_management_tests(
978
3
                    &admin,
979
3
                    server,
980
3
                    $harness::server_name(),
981
3
                )?;
982
3
                harness.shutdown()
983
3
            }
984

            
985
            #[test]
986
            #[cfg(feature = "token-authentication")]
987
3
            fn token_authentication() -> anyhow::Result<()> {
988
                use $crate::connection::StorageConnection;
989
3
                let harness = $harness::new($crate::test_util::HarnessTest::TokenAuthentication)?;
990
3
                let _db = harness.connect()?;
991
3
                let server = harness.server();
992
3
                let admin =
993
3
                    server.database::<$crate::admin::Admin>($crate::admin::ADMIN_DATABASE_NAME)?;
994

            
995
3
                $crate::test_util::blocking_token_authentication_tests(
996
3
                    &admin,
997
3
                    server,
998
3
                    $harness::server_name(),
999
3
                )?;
3
                harness.shutdown()
3
            }

            
            #[test]
3
            fn compaction() -> anyhow::Result<()> {
3
                let harness = $harness::new($crate::test_util::HarnessTest::Compact)?;
3
                let db = harness.connect()?;

            
3
                $crate::test_util::blocking_compaction_tests(&db)?;
3
                harness.shutdown()
3
            }
        }
    };
}

            
506
pub async fn store_retrieve_update_delete_tests<C: AsyncConnection>(db: &C) -> anyhow::Result<()> {
506
    let original_value = Basic::new("initial_value");
506
    let collection = db.collection::<Basic>();
3827
    let header = collection.push(&original_value).await?;

            
506
    let mut doc = collection
506
        .get(&header.id)
1990
        .await?
506
        .expect("couldn't retrieve stored item");
506
    let mut value = Basic::document_contents(&doc)?;
506
    assert_eq!(original_value, value);
506
    let old_revision = doc.header.revision;
506

            
506
    // Update the value
506
    value.value = String::from("updated_value");
506
    Basic::set_document_contents(&mut doc, value.clone())?;
3166
    db.update::<Basic, _>(&mut doc).await?;

            
    // update should cause the revision to be changed
506
    assert_ne!(doc.header.revision, old_revision);

            
    // Check the value in the database to ensure it has the new document
506
    let doc = collection
506
        .get(&header.id)
1829
        .await?
506
        .expect("couldn't retrieve stored item");
506
    assert_eq!(Basic::document_contents(&doc)?, value);

            
    // These operations should have created two transactions with one change each
1920
    let transactions = db.list_executed_transactions(None, None).await?;
506
    assert_eq!(transactions.len(), 2);
506
    assert!(transactions[0].id < transactions[1].id);
1518
    for transaction in &transactions {
1012
        let changes = transaction
1012
            .changes
1012
            .documents()
1012
            .expect("incorrect transaction type");
1012
        assert_eq!(changes.documents.len(), 1);
1012
        assert_eq!(changes.collections.len(), 1);
1012
        assert_eq!(changes.collections[0], Basic::collection_name());
1012
        assert_eq!(changes.documents[0].collection, 0);
1012
        assert_eq!(header.id, changes.documents[0].id.deserialize()?);
1012
        assert!(!changes.documents[0].deleted);
    }

            
2978
    db.collection::<Basic>().delete(&doc).await?;
1914
    assert!(collection.get(&header.id).await?.is_none());
506
    let transactions = db
506
        .list_executed_transactions(Some(transactions.last().as_ref().unwrap().id + 1), None)
1978
        .await?;
506
    assert_eq!(transactions.len(), 1);
506
    let transaction = transactions.first().unwrap();
506
    let changes = transaction
506
        .changes
506
        .documents()
506
        .expect("incorrect transaction type");
506
    assert_eq!(changes.documents.len(), 1);
506
    assert_eq!(changes.collections[0], Basic::collection_name());
506
    assert_eq!(header.id, changes.documents[0].id.deserialize()?);
506
    assert!(changes.documents[0].deleted);

            
    // Use the Collection interface
2992
    let mut doc = original_value.clone().push_into_async(db).await?;
506
    doc.contents.category = Some(String::from("updated"));
3118
    doc.update_async(db).await?;
1877
    let reloaded = Basic::get_async(&doc.header.id, db).await?.unwrap();
506
    assert_eq!(doc.contents, reloaded.contents);

            
    // Test Connection::insert with a specified id
506
    let doc = BorrowedDocument::with_contents::<Basic, _>(&42, &Basic::new("42"))?;
506
    let document_42 = db
506
        .insert::<Basic, _, _>(Some(&doc.header.id), doc.contents.into_vec())
3170
        .await?;
506
    assert_eq!(document_42.id, 42);
3127
    let document_43 = Basic::new("43").insert_into_async(&43, db).await?;
506
    assert_eq!(document_43.header.id, 43);

            
    // Test that inserting a document with the same ID results in a conflict:
506
    let conflict_err = Basic::new("43")
506
        .with_parent_id(document_42.id)
506
        .insert_into_async(&doc.header.id, db)
2108
        .await
506
        .unwrap_err();
506
    assert!(matches!(conflict_err.error, Error::DocumentConflict(..)));

            
    // Test bulk insert
506
    let docs = Basic::push_all_async([Basic::new("44"), Basic::new("45")], db)
3988
        .await
506
        .unwrap();
506
    assert_eq!(docs[0].contents.value, "44");
506
    assert_eq!(docs[1].contents.value, "45");

            
506
    Ok(())
506
}

            
#[allow(clippy::cognitive_complexity)] // TODO should be split into multiple tests
3
pub fn blocking_store_retrieve_update_delete_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
3
    let original_value = Basic::new("initial_value");
3
    let collection = db.collection::<Basic>();
3
    let header = collection.push(&original_value)?;

            
3
    let mut doc = collection
3
        .get(&header.id)?
3
        .expect("couldn't retrieve stored item");
3
    let mut value = Basic::document_contents(&doc)?;
3
    assert_eq!(original_value, value);
3
    let old_revision = doc.header.revision;
3

            
3
    // Update the value
3
    value.value = String::from("updated_value");
3
    Basic::set_document_contents(&mut doc, value.clone())?;
3
    db.update::<Basic, _>(&mut doc)?;

            
    // update should cause the revision to be changed
3
    assert_ne!(doc.header.revision, old_revision);

            
    // Check the value in the database to ensure it has the new document
3
    let doc = collection
3
        .get(&header.id)?
3
        .expect("couldn't retrieve stored item");
3
    assert_eq!(Basic::document_contents(&doc)?, value);

            
    // These operations should have created two transactions with one change each
3
    let transactions = db.list_executed_transactions(None, None)?;
3
    assert_eq!(transactions.len(), 2);
3
    assert!(transactions[0].id < transactions[1].id);
9
    for transaction in &transactions {
6
        let changes = transaction
6
            .changes
6
            .documents()
6
            .expect("incorrect transaction type");
6
        assert_eq!(changes.documents.len(), 1);
6
        assert_eq!(changes.collections.len(), 1);
6
        assert_eq!(changes.collections[0], Basic::collection_name());
6
        assert_eq!(changes.documents[0].collection, 0);
6
        assert_eq!(header.id, changes.documents[0].id.deserialize()?);
6
        assert!(!changes.documents[0].deleted);
    }

            
3
    db.collection::<Basic>().delete(&doc)?;
3
    assert!(collection.get(&header.id)?.is_none());
3
    let transactions =
3
        db.list_executed_transactions(Some(transactions.last().as_ref().unwrap().id + 1), None)?;
3
    assert_eq!(transactions.len(), 1);
3
    let transaction = transactions.first().unwrap();
3
    let changes = transaction
3
        .changes
3
        .documents()
3
        .expect("incorrect transaction type");
3
    assert_eq!(changes.documents.len(), 1);
3
    assert_eq!(changes.collections[0], Basic::collection_name());
3
    assert_eq!(header.id, changes.documents[0].id.deserialize()?);
3
    assert!(changes.documents[0].deleted);

            
    // Use the Collection interface
3
    let mut doc = original_value.push_into(db)?;
3
    doc.contents.category = Some(String::from("updated"));
3
    doc.update(db)?;
3
    let reloaded = Basic::get(&doc.header.id, db)?.unwrap();
3
    assert_eq!(doc.contents, reloaded.contents);

            
    // Test Connection::insert with a specified id
3
    let doc = BorrowedDocument::with_contents::<Basic, _>(&42, &Basic::new("42"))?;
3
    let document_42 = db.insert::<Basic, _, _>(Some(&doc.header.id), doc.contents.into_vec())?;
3
    assert_eq!(document_42.id, 42);
3
    let document_43 = Basic::new("43").insert_into(&43, db)?;
3
    assert_eq!(document_43.header.id, 43);

            
    // Test that inserting a document with the same ID results in a conflict:
3
    let conflict_err = Basic::new("43")
3
        .insert_into(&doc.header.id, db)
3
        .unwrap_err();
3
    assert!(matches!(conflict_err.error, Error::DocumentConflict(..)));

            
    // Test bulk insert
3
    let docs = Basic::push_all([Basic::new("44"), Basic::new("45")], db).unwrap();
3
    assert_eq!(docs[0].contents.value, "44");
3
    assert_eq!(docs[1].contents.value, "45");

            
3
    Ok(())
3
}

            
5
pub async fn overwrite_tests<C: AsyncConnection>(db: &C) -> anyhow::Result<()> {
    // Test Connection::insert with a specified id
5
    let doc = BorrowedDocument::with_contents::<Basic, _>(&42, &Basic::new("42"))?;
5
    let document_42 = db
5
        .insert::<Basic, _, _>(Some(&doc.header.id), doc.contents.into_vec())
5
        .await?;
5
    assert_eq!(document_42.id, 42);
5
    let document_43 = Basic::new("43").insert_into_async(&43, db).await?;
5
    assert_eq!(document_43.header.id, 43);

            
    // Test that overwriting works
5
    let overwritten = Basic::new("43")
5
        .with_parent_id(document_42.id)
5
        .overwrite_into_async(&doc.header.id, db)
5
        .await
5
        .unwrap();
5
    assert!(overwritten.header.revision.id > doc.header.revision.id);
5
    let children = db
5
        .view::<BasicByParentIdEager>()
5
        .with_access_policy(AccessPolicy::NoUpdate)
5
        .with_key(&Some(document_42.id))
5
        .query()
5
        .await?;
5
    assert_eq!(children[0].source.revision, overwritten.header.revision);

            
    // Verify that unique/eager views are updated when overwriting into a non-existant key
5
    let insert_via_overwrite = Basic::new("1000")
5
        .with_parent_id(document_43.header.id)
5
        .overwrite_into_async(&1_000, db)
5
        .await
5
        .unwrap();
5
    let children = db
5
        .view::<BasicByParentIdEager>()
5
        .with_access_policy(AccessPolicy::NoUpdate)
5
        .with_key(&Some(document_43.header.id))
5
        .query()
5
        .await?;
5
    assert_eq!(
5
        children[0].source.revision,
5
        insert_via_overwrite.header.revision
5
    );

            
5
    Ok(())
5
}

            
3
pub fn blocking_overwrite_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
    // Test Connection::insert with a specified id
3
    let doc = BorrowedDocument::with_contents::<Basic, _>(&42, &Basic::new("42"))?;
3
    let document_42 = db.insert::<Basic, _, _>(Some(&doc.header.id), doc.contents.into_vec())?;
3
    assert_eq!(document_42.id, 42);
3
    let document_43 = Basic::new("43").insert_into(&43, db)?;
3
    assert_eq!(document_43.header.id, 43);

            
    // Test that overwriting works
3
    let overwritten = Basic::new("43")
3
        .with_parent_id(document_42.id)
3
        .overwrite_into(&doc.header.id, db)
3
        .unwrap();
3
    assert!(overwritten.header.revision.id > doc.header.revision.id);
3
    let children = db
3
        .view::<BasicByParentIdEager>()
3
        .with_access_policy(AccessPolicy::NoUpdate)
3
        .with_key(&Some(document_42.id))
3
        .query()?;
3
    assert_eq!(children[0].source.revision, overwritten.header.revision);

            
    // Verify that unique/eager views are updated when overwriting into a non-existant key
3
    let insert_via_overwrite = Basic::new("1000")
3
        .with_parent_id(document_43.header.id)
3
        .overwrite_into(&1_000, db)
3
        .unwrap();
3
    let children = db
3
        .view::<BasicByParentIdEager>()
3
        .with_access_policy(AccessPolicy::NoUpdate)
3
        .with_key(&Some(document_43.header.id))
3
        .query()?;
3
    assert_eq!(
3
        children[0].source.revision,
3
        insert_via_overwrite.header.revision
3
    );

            
3
    Ok(())
3
}

            
5
pub async fn not_found_tests<C: AsyncConnection>(db: &C) -> anyhow::Result<()> {
5
    assert!(db.collection::<Basic>().get(&1).await?.is_none());

            
5
    assert!(db.last_transaction_id().await?.is_none());

            
5
    Ok(())
5
}

            
3
pub fn blocking_not_found_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
3
    assert!(db.collection::<Basic>().get(&1)?.is_none());

            
3
    assert!(db.last_transaction_id()?.is_none());

            
3
    Ok(())
3
}

            
5
pub async fn conflict_tests<C: AsyncConnection>(db: &C) -> anyhow::Result<()> {
5
    let original_value = Basic::new("initial_value");
5
    let collection = db.collection::<Basic>();
5
    let header = collection.push(&original_value).await?;

            
5
    let mut doc = collection
5
        .get(&header.id)
5
        .await?
5
        .expect("couldn't retrieve stored item");
5
    let mut value = Basic::document_contents(&doc)?;
5
    value.value = String::from("updated_value");
5
    Basic::set_document_contents(&mut doc, value.clone())?;
5
    db.update::<Basic, _>(&mut doc).await?;

            
    // To generate a conflict, let's try to do the same update again by
    // reverting the header
5
    doc.header = Header::try_from(header).unwrap();
5
    let conflicting_header = db
5
        .update::<Basic, _>(&mut doc)
5
        .await
5
        .expect_err("conflict should have generated an error")
5
        .conflicting_document::<Basic>()
5
        .expect("conflict not detected");
5
    assert_eq!(conflicting_header.id, doc.header.id);

            
    // Let's force an update through overwrite. After this succeeds, the header
    // is updated to the new revision.
5
    db.collection::<Basic>().overwrite(&mut doc).await.unwrap();

            
    // Now, let's use the CollectionDocument API to modify the document through a refetch.
5
    let mut doc = CollectionDocument::<Basic>::try_from(&doc)?;
5
    doc.modify_async(db, |doc| {
5
        doc.contents.value = String::from("modify worked");
5
    })
5
    .await?;
5
    assert_eq!(doc.contents.value, "modify worked");
5
    let doc = Basic::get_async(&doc.header.id, db).await?.unwrap();
5
    assert_eq!(doc.contents.value, "modify worked");

            
5
    Ok(())
5
}

            
3
pub fn blocking_conflict_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
3
    let original_value = Basic::new("initial_value");
3
    let collection = db.collection::<Basic>();
3
    let header = collection.push(&original_value)?;

            
3
    let mut doc = collection
3
        .get(&header.id)?
3
        .expect("couldn't retrieve stored item");
3
    let mut value = Basic::document_contents(&doc)?;
3
    value.value = String::from("updated_value");
3
    Basic::set_document_contents(&mut doc, value)?;
3
    db.update::<Basic, _>(&mut doc)?;

            
    // To generate a conflict, let's try to do the same update again by
    // reverting the header
3
    doc.header = Header::try_from(header).unwrap();
3
    let conflicting_header = db
3
        .update::<Basic, _>(&mut doc)
3
        .expect_err("conflict should have generated an error")
3
        .conflicting_document::<Basic>()
3
        .expect("conflict not detected");
3
    assert_eq!(conflicting_header.id, doc.header.id);

            
    // Let's force an update through overwrite. After this succeeds, the header
    // is updated to the new revision.
3
    db.collection::<Basic>().overwrite(&mut doc).unwrap();

            
    // Now, let's use the CollectionDocument API to modify the document through a refetch.
3
    let mut doc = CollectionDocument::<Basic>::try_from(&doc)?;
3
    doc.modify(db, |doc| {
3
        doc.contents.value = String::from("modify worked");
3
    })?;
3
    assert_eq!(doc.contents.value, "modify worked");
3
    let doc = Basic::get(&doc.header.id, db)?.unwrap();
3
    assert_eq!(doc.contents.value, "modify worked");

            
3
    Ok(())
3
}

            
5
pub async fn bad_update_tests<C: AsyncConnection>(db: &C) -> anyhow::Result<()> {
5
    let mut doc = BorrowedDocument::with_contents::<Basic, _>(&1, &Basic::default())?;
5
    match db.update::<Basic, _>(&mut doc).await {
5
        Err(Error::DocumentNotFound(collection, id)) => {
5
            assert_eq!(collection, Basic::collection_name());
5
            assert_eq!(id.as_ref(), &DocumentId::from_u64(1));
5
            Ok(())
        }
        other => panic!("expected DocumentNotFound from update but got: {other:?}"),
    }
5
}

            
3
pub fn blocking_bad_update_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
3
    let mut doc = BorrowedDocument::with_contents::<Basic, _>(&1, &Basic::default())?;
3
    match db.update::<Basic, _>(&mut doc) {
3
        Err(Error::DocumentNotFound(collection, id)) => {
3
            assert_eq!(collection, Basic::collection_name());
3
            assert_eq!(id.as_ref(), &DocumentId::from_u64(1));
3
            Ok(())
        }
        other => panic!("expected DocumentNotFound from update but got: {other:?}"),
    }
3
}

            
5
pub async fn no_update_tests<C: AsyncConnection>(db: &C) -> anyhow::Result<()> {
5
    let original_value = Basic::new("initial_value");
5
    let collection = db.collection::<Basic>();
5
    let header = collection.push(&original_value).await?;

            
5
    let mut doc = collection
5
        .get(&header.id)
5
        .await?
5
        .expect("couldn't retrieve stored item");
5
    db.update::<Basic, _>(&mut doc).await?;

            
5
    assert_eq!(CollectionHeader::try_from(doc.header)?, header);

            
5
    Ok(())
5
}

            
3
pub fn blocking_no_update_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
3
    let original_value = Basic::new("initial_value");
3
    let collection = db.collection::<Basic>();
3
    let header = collection.push(&original_value)?;

            
3
    let mut doc = collection
3
        .get(&header.id)?
3
        .expect("couldn't retrieve stored item");
3
    db.update::<Basic, _>(&mut doc)?;

            
3
    assert_eq!(CollectionHeader::try_from(doc.header)?, header);

            
3
    Ok(())
3
}

            
5
pub async fn get_multiple_tests<C: AsyncConnection>(db: &C) -> anyhow::Result<()> {
5
    let collection = db.collection::<Basic>();
5
    let doc1_value = Basic::new("initial_value");
5
    let doc1 = collection.push(&doc1_value).await?;

            
5
    let doc2_value = Basic::new("second_value");
5
    let doc2 = collection.push(&doc2_value).await?;

            
5
    let both_docs = Basic::get_multiple_async([&doc1.id, &doc2.id], db).await?;
5
    assert_eq!(both_docs.len(), 2);

            
5
    let out_of_order = Basic::get_multiple_async([&doc2.id, &doc1.id], db).await?;
5
    assert_eq!(out_of_order.len(), 2);

            
    // The order of get_multiple isn't guaranteed, so these two checks are done
    // with iterators instead of direct indexing
5
    let doc1 = both_docs
5
        .iter()
5
        .find(|doc| doc.header.id == doc1.id)
5
        .expect("Couldn't find doc1");
5
    assert_eq!(doc1.contents.value, doc1_value.value);
5
    let doc2 = both_docs
5
        .iter()
10
        .find(|doc| doc.header.id == doc2.id)
5
        .expect("Couldn't find doc2");
5
    assert_eq!(doc2.contents.value, doc2_value.value);

            
5
    Ok(())
5
}

            
3
pub fn blocking_get_multiple_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
3
    let collection = db.collection::<Basic>();
3
    let doc1_value = Basic::new("initial_value");
3
    let doc1 = collection.push(&doc1_value)?;

            
3
    let doc2_value = Basic::new("second_value");
3
    let doc2 = collection.push(&doc2_value)?;

            
3
    let both_docs = Basic::get_multiple([&doc1.id, &doc2.id], db)?;
3
    assert_eq!(both_docs.len(), 2);

            
3
    let out_of_order = Basic::get_multiple([&doc2.id, &doc1.id], db)?;
3
    assert_eq!(out_of_order.len(), 2);

            
    // The order of get_multiple isn't guaranteed, so these two checks are done
    // with iterators instead of direct indexing
3
    let doc1 = both_docs
3
        .iter()
3
        .find(|doc| doc.header.id == doc1.id)
3
        .expect("Couldn't find doc1");
3
    assert_eq!(doc1.contents.value, doc1_value.value);
3
    let doc2 = both_docs
3
        .iter()
6
        .find(|doc| doc.header.id == doc2.id)
3
        .expect("Couldn't find doc2");
3
    assert_eq!(doc2.contents.value, doc2_value.value);

            
3
    Ok(())
3
}

            
5
pub async fn list_tests<C: AsyncConnection>(db: &C) -> anyhow::Result<()> {
5
    let collection = db.collection::<Basic>();
5
    let doc1_value = Basic::new("initial_value");
5
    let doc1 = collection.push(&doc1_value).await?;

            
5
    let doc2_value = Basic::new("second_value");
5
    let doc2 = collection.push(&doc2_value).await?;

            
5
    let all_docs = Basic::all_async(db).await?;
5
    assert_eq!(all_docs.len(), 2);
5
    assert_eq!(Basic::all_async(db).count().await?, 2);

            
5
    let both_docs = Basic::list_async(doc1.id..=doc2.id, db).await?;
5
    assert_eq!(both_docs.len(), 2);
5
    assert_eq!(Basic::list_async(doc1.id..=doc2.id, db).count().await?, 2);

            
5
    assert_eq!(both_docs[0].contents.value, doc1_value.value);
5
    assert_eq!(both_docs[1].contents.value, doc2_value.value);

            
5
    let both_headers = Basic::list_async(doc1.id..=doc2.id, db).headers().await?;

            
5
    assert_eq!(both_headers.len(), 2);

            
5
    let one_doc = Basic::list_async(doc1.id..doc2.id, db).await?;
5
    assert_eq!(one_doc.len(), 1);

            
5
    let limited = Basic::list_async(doc1.id..=doc2.id, db)
5
        .limit(1)
5
        .descending()
5
        .await?;
5
    assert_eq!(limited.len(), 1);
5
    assert_eq!(limited[0].contents.value, doc2_value.value);

            
5
    Ok(())
5
}

            
3
pub fn blocking_list_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
3
    let collection = db.collection::<Basic>();
3
    let doc1_value = Basic::new("initial_value");
3
    let doc1 = collection.push(&doc1_value)?;

            
3
    let doc2_value = Basic::new("second_value");
3
    let doc2 = collection.push(&doc2_value)?;

            
3
    let all_docs = Basic::all(db).query()?;
3
    assert_eq!(all_docs.len(), 2);
3
    assert_eq!(Basic::all(db).count()?, 2);

            
3
    let both_docs = Basic::list(doc1.id..=doc2.id, db).query()?;
3
    assert_eq!(both_docs.len(), 2);
3
    assert_eq!(Basic::list(doc1.id..=doc2.id, db).count()?, 2);

            
3
    assert_eq!(both_docs[0].contents.value, doc1_value.value);
3
    assert_eq!(both_docs[1].contents.value, doc2_value.value);

            
3
    let both_headers = Basic::list(doc1.id..=doc2.id, db).headers()?;

            
3
    assert_eq!(both_headers.len(), 2);

            
3
    let one_doc = Basic::list(doc1.id..doc2.id, db).query()?;
3
    assert_eq!(one_doc.len(), 1);

            
3
    let limited = Basic::list(doc1.id..=doc2.id, db)
3
        .limit(1)
3
        .descending()
3
        .query()?;
3
    assert_eq!(limited.len(), 1);
3
    assert_eq!(limited[0].contents.value, doc2_value.value);

            
3
    Ok(())
3
}

            
5
pub async fn list_transactions_tests<C: AsyncConnection>(db: &C) -> anyhow::Result<()> {
5
    let collection = db.collection::<Basic>();
5

            
5
    // create LIST_TRANSACTIONS_MAX_RESULTS + 1 items, giving us just enough
5
    // transactions to test the edge cases of `list_transactions`
5
    futures::future::join_all(
5
        (0..=(LIST_TRANSACTIONS_MAX_RESULTS))
5005
            .map(|_| async { collection.push(&Basic::default()).await.unwrap() }),
5
    )
4957
    .await;

            
    // Test defaults
5
    let transactions = db.list_executed_transactions(None, None).await?;
5
    assert_eq!(
5
        u32::try_from(transactions.len()).unwrap(),
5
        LIST_TRANSACTIONS_DEFAULT_RESULT_COUNT
5
    );

            
    // Test max results limit
5
    let transactions = db
5
        .list_executed_transactions(None, Some(LIST_TRANSACTIONS_MAX_RESULTS + 1))
5
        .await?;
5
    assert_eq!(
5
        u32::try_from(transactions.len()).unwrap(),
5
        LIST_TRANSACTIONS_MAX_RESULTS
5
    );

            
    // Test requesting 0 items
5
    let transactions = db.list_executed_transactions(None, Some(0)).await?;
5
    assert!(transactions.is_empty());

            
    // Test doing a loop fetching until we get no more results
5
    let mut transactions = Vec::new();
5
    let mut starting_id = None;
    loop {
60
        let chunk = db
60
            .list_executed_transactions(starting_id, Some(100))
60
            .await?;
60
        if chunk.is_empty() {
5
            break;
55
        }
55

            
55
        let max_id = chunk.last().map(|tx| tx.id).unwrap();
55
        starting_id = Some(max_id + 1);
55
        transactions.extend(chunk);
    }

            
5
    assert_eq!(
5
        u32::try_from(transactions.len()).unwrap(),
5
        LIST_TRANSACTIONS_MAX_RESULTS + 1
5
    );

            
5
    Ok(())
5
}

            
3
pub fn blocking_list_transactions_tests<C: Connection + Clone + 'static>(
3
    db: &C,
3
) -> anyhow::Result<()> {
3
    // create LIST_TRANSACTIONS_MAX_RESULTS + 1 items, giving us just enough
3
    // transactions to test the edge cases of `list_transactions`
3
    let mut threads = Vec::with_capacity(num_cpus::get());
3
    let transaction_counter = Arc::new(AtomicU32::new(0));
12
    for _ in 0..threads.capacity() {
12
        let db = db.clone();
12
        let transaction_counter = transaction_counter.clone();
12
        threads.push(std::thread::spawn(move || {
12
            let collection = db.collection::<Basic>();
3015
            while transaction_counter.fetch_add(1, Ordering::SeqCst)
3015
                <= LIST_TRANSACTIONS_MAX_RESULTS
3003
            {
3003
                collection.push(&Basic::default()).unwrap();
3003
            }
12
        }));
12
    }

            
15
    for thread in threads {
12
        thread.join().unwrap();
12
    }

            
    // Test defaults
3
    let transactions = db.list_executed_transactions(None, None)?;
3
    assert_eq!(
3
        u32::try_from(transactions.len()).unwrap(),
3
        LIST_TRANSACTIONS_DEFAULT_RESULT_COUNT
3
    );

            
    // Test max results limit
3
    let transactions =
3
        db.list_executed_transactions(None, Some(LIST_TRANSACTIONS_MAX_RESULTS + 1))?;
3
    assert_eq!(
3
        u32::try_from(transactions.len()).unwrap(),
3
        LIST_TRANSACTIONS_MAX_RESULTS
3
    );

            
    // Test requesting 0 items
3
    let transactions = db.list_executed_transactions(None, Some(0))?;
3
    assert!(transactions.is_empty());

            
    // Test doing a loop fetching until we get no more results
3
    let mut transactions = Vec::new();
3
    let mut starting_id = None;
    loop {
36
        let chunk = db.list_executed_transactions(starting_id, Some(100))?;
36
        if chunk.is_empty() {
3
            break;
33
        }
33

            
33
        let max_id = chunk.last().map(|tx| tx.id).unwrap();
33
        starting_id = Some(max_id + 1);
33
        transactions.extend(chunk);
    }

            
3
    assert_eq!(
3
        u32::try_from(transactions.len()).unwrap(),
3
        LIST_TRANSACTIONS_MAX_RESULTS + 1
3
    );

            
3
    Ok(())
3
}

            
5
pub async fn transaction_tests<C: AsyncConnection + 'static>(db: &C) -> anyhow::Result<()> {
5
    let mut tx = Transaction::new();
5
    Basic::new("test").push_in_transaction(&mut tx)?;
5
    let results = tx.apply_async(db).await?;
5
    let OperationResult::DocumentUpdated { header, .. } = &results[0] else {
        unreachable!("unexpected tx result")
    };
5
    let id: u64 = header.id.deserialize()?;

            
    // Update the doc
5
    let mut tx = Transaction::new();
5
    let mut doc = Basic::get_async(&id, db).await?.expect("doc not found");
5
    doc.contents.category = Some(String::from("cat"));
5
    doc.update_in_transaction(&mut tx)?;
5
    tx.apply_async(db).await?;
5
    let doc = Basic::get_async(&id, db).await?.expect("doc not found");
5
    assert_eq!(doc.contents.category.as_deref(), Some("cat"));

            
    // Overwrite the document.
5
    let mut tx = Transaction::new();
5
    Basic::new("test").overwrite_in_transaction(&id, &mut tx)?;
5
    tx.apply_async(db).await?;
5
    let doc = Basic::get_async(&id, db).await?.expect("doc not found");
5
    assert_eq!(doc.contents.category, None);

            
    // Delete the document
5
    let mut tx = Transaction::new();
5
    doc.delete_in_transaction(&mut tx)?;
5
    tx.apply_async(db).await?;

            
5
    Ok(())
5
}

            
3
pub fn blocking_transaction_tests<C: Connection + 'static>(db: &C) -> anyhow::Result<()> {
3
    let mut tx = Transaction::new();
3
    Basic::new("test").push_in_transaction(&mut tx)?;
3
    let results = tx.apply(db)?;
3
    let OperationResult::DocumentUpdated { header, .. } = &results[0] else {
        unreachable!("unexpected tx result")
    };
3
    let id: u64 = header.id.deserialize()?;

            
    // Update the doc
3
    let mut tx = Transaction::new();
3
    let mut doc = Basic::get(&id, db)?.expect("doc not found");
3
    doc.contents.category = Some(String::from("cat"));
3
    doc.update_in_transaction(&mut tx)?;
3
    tx.apply(db)?;
3
    let doc = Basic::get(&id, db)?.expect("doc not found");
3
    assert_eq!(doc.contents.category.as_deref(), Some("cat"));

            
    // Overwrite the document.
3
    let mut tx = Transaction::new();
3
    Basic::new("test").overwrite_in_transaction(&id, &mut tx)?;
3
    tx.apply(db)?;
3
    let doc = Basic::get(&id, db)?.expect("doc not found");
3
    assert_eq!(doc.contents.category, None);

            
    // Delete the document
3
    let mut tx = Transaction::new();
3
    doc.delete_in_transaction(&mut tx)?;
3
    tx.apply(db)?;

            
3
    Ok(())
3
}

            
5
pub async fn transaction_check_tests<C: AsyncConnection + 'static>(db: &C) -> anyhow::Result<()> {
5
    let mut doc = Basic::new("test").push_into_async(db).await?;
5
    let mut refreshable = doc.clone();
5
    let initial_header = doc.header;
5
    doc.contents.value = String::from("updated");
5
    doc.update_async(db).await?;

            
5
    refreshable.refresh_async(db).await?;
5
    assert_eq!(refreshable.contents.value, "updated");

            
    // Positive flow, check id, as well as id + header.
5
    let mut tx = Transaction::new();
5
    tx.push(Operation::check_document_exists::<Basic>(&doc.header.id)?);
5
    tx.push(Operation::check_document_is_current::<Basic, _>(&doc)?);
5
    tx.push(Operation::insert_serialized::<Basic>(
5
        None,
5
        &Basic::new("new doc"),
5
    )?);
5
    tx.apply_async(db).await?;

            
    // Error flows. Ensure the first violation is the error returned.
5
    let mut tx = Transaction::new();
5
    tx.push(Operation::check_document_is_current::<Basic, _>(
5
        &initial_header,
5
    )?);
5
    tx.push(Operation::check_document_exists::<Basic>(&42)?);
5
    let result = tx.apply_async(db).await.unwrap_err();
5
    assert!(matches!(result, Error::DocumentConflict(_, _)));

            
5
    let mut tx = Transaction::new();
5
    tx.push(Operation::check_document_exists::<Basic>(&42)?);
5
    tx.push(Operation::check_document_is_current::<Basic, _>(
5
        &initial_header,
5
    )?);
5
    let result = tx.apply_async(db).await.unwrap_err();
5
    assert!(matches!(result, Error::DocumentNotFound(_, _)));

            
5
    Ok(())
5
}

            
3
pub fn blocking_transaction_check_tests<C: Connection + 'static>(db: &C) -> anyhow::Result<()> {
3
    let mut doc = Basic::new("test").push_into(db)?;
3
    let mut refreshable = doc.clone();
3
    let initial_header = doc.header;
3
    doc.contents.value = String::from("updated");
3
    doc.update(db)?;

            
3
    refreshable.refresh(db)?;
3
    assert_eq!(refreshable.contents.value, "updated");

            
    // Positive flow, check id, as well as id + header.
3
    let mut tx = Transaction::new();
3
    tx.push(Operation::check_document_exists::<Basic>(&doc.header.id)?);
3
    tx.push(Operation::check_document_is_current::<Basic, _>(&doc)?);
3
    tx.push(Operation::insert_serialized::<Basic>(
3
        None,
3
        &Basic::new("new doc"),
3
    )?);
3
    tx.apply(db)?;

            
    // Error flows. Ensure the first violation is the error returned.
3
    let mut tx = Transaction::new();
3
    tx.push(Operation::check_document_is_current::<Basic, _>(
3
        &initial_header,
3
    )?);
3
    tx.push(Operation::check_document_exists::<Basic>(&42)?);
3
    let result = tx.apply(db).unwrap_err();
3
    assert!(matches!(result, Error::DocumentConflict(_, _)));

            
3
    let mut tx = Transaction::new();
3
    tx.push(Operation::check_document_exists::<Basic>(&42)?);
3
    tx.push(Operation::check_document_is_current::<Basic, _>(
3
        &initial_header,
3
    )?);
3
    let result = tx.apply(db).unwrap_err();
3
    assert!(matches!(result, Error::DocumentNotFound(_, _)));

            
3
    Ok(())
3
}

            
5
pub async fn view_query_tests<C: AsyncConnection>(db: &C) -> anyhow::Result<()> {
5
    let collection = db.collection::<Basic>();
5
    let a = collection.push(&Basic::new("A")).await?;
5
    let b = collection.push(&Basic::new("B")).await?;
5
    let a_child = collection
5
        .push(
5
            &Basic::new("A.1")
5
                .with_parent_id(a.id)
5
                .with_category("Alpha"),
5
        )
5
        .await?;
5
    collection
5
        .push(&Basic::new("B.1").with_parent_id(b.id).with_category("Beta"))
5
        .await?;
5
    collection
5
        .push(&Basic::new("B.2").with_parent_id(b.id).with_category("beta"))
5
        .await?;

            
5
    let a_children = db
5
        .view::<BasicByParentId>()
5
        .with_key(&Some(a.id))
5
        .query()
5
        .await?;
5
    assert_eq!(a_children.len(), 1);

            
5
    let a_children = db
5
        .view::<BasicByParentId>()
5
        .with_key(&Some(a.id))
5
        .query_with_collection_docs()
10
        .await?;
5
    assert_eq!(a_children.len(), 1);
5
    assert_eq!(a_children.get(0).unwrap().document.header, a_child);

            
5
    let b_children = db
5
        .view::<BasicByParentId>()
5
        .with_key(&Some(b.id))
5
        .query()
5
        .await?;
5
    assert_eq!(b_children.len(), 2);

            
5
    let a_and_b_children = db
5
        .view::<BasicByParentId>()
5
        .with_keys([&Some(a.id), &Some(b.id)])
5
        .query()
5
        .await?;
5
    assert_eq!(a_and_b_children.len(), 3);

            
    // Test out of order keys
5
    let a_and_b_children = db
5
        .view::<BasicByParentId>()
5
        .with_keys([&Some(b.id), &Some(a.id)])
5
        .query()
5
        .await?;
5
    assert_eq!(a_and_b_children.len(), 3);

            
5
    let has_parent = db
5
        .view::<BasicByParentId>()
5
        .with_key_range(Some(0)..=Some(u64::MAX))
5
        .query()
5
        .await?;
5
    assert_eq!(has_parent.len(), 3);
    // Verify the result is sorted ascending
5
    assert!(has_parent
5
        .windows(2)
10
        .all(|window| window[0].key <= window[1].key));

            
    // Test limiting and descending order
5
    let last_with_parent = db
5
        .view::<BasicByParentId>()
5
        .with_key_range(Some(0)..=Some(u64::MAX))
5
        .descending()
5
        .limit(1)
5
        .query()
5
        .await?;
10
    assert_eq!(last_with_parent.iter().map(|m| m.key).unique().count(), 1);
5
    assert_eq!(last_with_parent[0].key, has_parent[2].key);

            
5
    let items_with_categories = db.view::<BasicByCategory>().query().await?;
5
    assert_eq!(items_with_categories.len(), 3);

            
    // Test deleting
5
    let deleted_count = db
5
        .view::<BasicByParentId>()
5
        .with_key(&Some(b.id))
5
        .delete_docs()
5
        .await?;
5
    assert_eq!(b_children.len() as u64, deleted_count);
    assert_eq!(
5
        db.view::<BasicByParentId>()
5
            .with_key(&Some(b.id))
5
            .query()
5
            .await?
5
            .len(),
        0
    );

            
5
    Ok(())
5
}

            
3
pub fn blocking_view_query_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
3
    let collection = db.collection::<Basic>();
3
    let a = collection.push(&Basic::new("A"))?;
3
    let b = collection.push(&Basic::new("B"))?;
3
    let a_child = collection.push(
3
        &Basic::new("A.1")
3
            .with_parent_id(a.id)
3
            .with_category("Alpha"),
3
    )?;
3
    collection.push(&Basic::new("B.1").with_parent_id(b.id).with_category("Beta"))?;
3
    collection.push(&Basic::new("B.2").with_parent_id(b.id).with_category("beta"))?;

            
3
    let a_children = db.view::<BasicByParentId>().with_key(&Some(a.id)).query()?;
3
    assert_eq!(a_children.len(), 1);

            
3
    let a_children = db
3
        .view::<BasicByParentId>()
3
        .with_key(&Some(a.id))
3
        .query_with_collection_docs()?;
3
    assert_eq!(a_children.len(), 1);
3
    assert_eq!(a_children.get(0).unwrap().document.header, a_child);

            
3
    let b_children = db.view::<BasicByParentId>().with_key(&Some(b.id)).query()?;
3
    assert_eq!(b_children.len(), 2);

            
3
    let a_and_b_children = db
3
        .view::<BasicByParentId>()
3
        .with_keys(&[Some(a.id), Some(b.id)])
3
        .query()?;
3
    assert_eq!(a_and_b_children.len(), 3);

            
    // Test out of order keys
3
    let a_and_b_children = db
3
        .view::<BasicByParentId>()
3
        .with_keys(&[Some(b.id), Some(a.id)])
3
        .query()?;
3
    assert_eq!(a_and_b_children.len(), 3);

            
3
    let has_parent = db
3
        .view::<BasicByParentId>()
3
        .with_key_range(Some(0)..=Some(u64::MAX))
3
        .query()?;
3
    assert_eq!(has_parent.len(), 3);
    // Verify the result is sorted ascending
3
    assert!(has_parent
3
        .windows(2)
6
        .all(|window| window[0].key <= window[1].key));

            
    // Test limiting and descending order
3
    let last_with_parent = db
3
        .view::<BasicByParentId>()
3
        .with_key_range(Some(0)..=Some(u64::MAX))
3
        .descending()
3
        .limit(1)
3
        .query()?;
6
    assert_eq!(last_with_parent.iter().map(|m| m.key).unique().count(), 1);
3
    assert_eq!(last_with_parent[0].key, has_parent[2].key);

            
3
    let items_with_categories = db.view::<BasicByCategory>().query()?;
3
    assert_eq!(items_with_categories.len(), 3);

            
    // Test deleting
3
    let deleted_count = db
3
        .view::<BasicByParentId>()
3
        .with_key(&Some(b.id))
3
        .delete_docs()?;
3
    assert_eq!(b_children.len() as u64, deleted_count);
3
    assert_eq!(
3
        db.view::<BasicByParentId>()
3
            .with_key(&Some(b.id))
3
            .query()?
3
            .len(),
        0
    );

            
3
    Ok(())
3
}

            
5
pub async fn unassociated_collection_tests<C: AsyncConnection>(db: &C) -> anyhow::Result<()> {
5
    let result = db
5
        .collection::<UnassociatedCollection>()
5
        .push(&UnassociatedCollection)
5
        .await;
5
    match result {
5
        Err(Error::CollectionNotFound) => {}
        other => unreachable!("unexpected result: {:?}", other),
    }

            
5
    Ok(())
5
}

            
3
pub fn blocking_unassociated_collection_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
3
    let result = db
3
        .collection::<UnassociatedCollection>()
3
        .push(&UnassociatedCollection);
3
    match result {
3
        Err(Error::CollectionNotFound) => {}
        other => unreachable!("unexpected result: {:?}", other),
    }

            
3
    Ok(())
3
}

            
5
pub async fn unimplemented_reduce<C: AsyncConnection>(db: &C) -> anyhow::Result<()> {
5
    assert!(matches!(
5
        db.view::<UniqueValue>().reduce().await,
        Err(Error::ReduceUnimplemented)
    ));
5
    Ok(())
5
}

            
3
pub fn blocking_unimplemented_reduce<C: Connection>(db: &C) -> anyhow::Result<()> {
3
    assert!(matches!(
3
        db.view::<UniqueValue>().reduce(),
        Err(Error::ReduceUnimplemented)
    ));
3
    Ok(())
3
}

            
#[allow(clippy::too_many_lines)] // The non-async version is less than the limit :)
5
pub async fn view_update_tests<C: AsyncConnection>(db: &C) -> anyhow::Result<()> {
5
    let collection = db.collection::<Basic>();
5
    let a = collection.push(&Basic::new("A")).await?;

            
5
    let a_children = db
5
        .view::<BasicByParentId>()
5
        .with_key(&Some(a.id))
5
        .query()
5
        .await?;
5
    assert_eq!(a_children.len(), 0);
    // The reduce function of `BasicByParentId` acts as a "count" of records.
    assert_eq!(
5
        db.view::<BasicByParentId>()
5
            .with_key(&Some(a.id))
5
            .reduce()
5
            .await?,
        0
    );

            
    // Verify the eager view is available without any updates
5
    let a_children = db
5
        .view::<BasicByParentId>()
5
        .with_key(&Some(a.id))
5
        .with_access_policy(AccessPolicy::NoUpdate)
5
        .query()
5
        .await?;
5
    assert_eq!(a_children.len(), 0);
    assert_eq!(
5
        db.view::<BasicByParentId>()
5
            .with_key(&Some(a.id))
5
            .with_access_policy(AccessPolicy::NoUpdate)
5
            .reduce()
5
            .await?,
        0
    );

            
    // Test inserting a new record and the view being made available
5
    let a_child = collection
5
        .push(
5
            &Basic::new("A.1")
5
                .with_parent_id(a.id)
5
                .with_category("Alpha"),
5
        )
5
        .await?;

            
5
    let a_children = db
5
        .view::<BasicByParentId>()
5
        .with_key(&Some(a.id))
5
        .query()
5
        .await?;
5
    assert_eq!(a_children.len(), 1);
    assert_eq!(
5
        db.view::<BasicByParentId>()
5
            .with_key(&Some(a.id))
5
            .reduce()
5
            .await?,
        1
    );

            
    // Verify reduce_grouped matches our expectations.
    assert_eq!(
5
        db.view::<BasicByParentId>().reduce_grouped().await?,
5
        vec![MappedValue::new(None, 1,), MappedValue::new(Some(a.id), 1,),]
    );

            
    // Test updating the record and the view being updated appropriately
5
    let b = collection.push(&Basic::new("B")).await?;
5
    let mut doc = db.collection::<Basic>().get(&a_child.id).await?.unwrap();
5
    let mut basic = Basic::document_contents(&doc)?;
5
    basic.parent_id = Some(b.id);
5
    Basic::set_document_contents(&mut doc, basic)?;
5
    db.update::<Basic, _>(&mut doc).await?;

            
5
    let b_children = db
5
        .view::<BasicByParentId>()
5
        .with_key(&Some(b.id))
5
        .query()
5
        .await?;
5
    assert_eq!(b_children.len(), 1);
5
    assert_eq!(
5
        b_children[0].source,
5
        CollectionHeader::try_from(&doc.header)?
    );
    assert_eq!(
5
        db.view::<BasicByParentId>()
5
            .with_key(&Some(a.id))
5
            .reduce()
5
            .await?,
        0
    );
5
    assert_eq!(db.view::<BasicByParentId>().reduce().await?, 3);

            
    // Update the record, but don't change its mapping. Ensure the source's
    // header is updated.
5
    let mut basic = Basic::document_contents(&doc)?;
5
    basic.value = String::from("new value");
5
    Basic::set_document_contents(&mut doc, basic)?;
5
    db.update::<Basic, _>(&mut doc).await?;
5
    let b_children = db
5
        .view::<BasicByParentId>()
5
        .with_key(&Some(b.id))
5
        .query()
5
        .await?;
5
    assert_eq!(b_children.len(), 1);
5
    assert_eq!(
5
        b_children[0].source,
5
        CollectionHeader::try_from(&doc.header)?
    );

            
    // Test deleting a record and ensuring it goes away
5
    db.collection::<Basic>().delete(&doc).await?;

            
5
    let all_entries = db.view::<BasicByParentId>().query().await?;
5
    assert_eq!(all_entries.len(), 2);

            
    // Verify reduce_grouped matches our expectations.
    assert_eq!(
5
        db.view::<BasicByParentId>().reduce_grouped().await?,
5
        vec![MappedValue::new(None, 2,),]
    );

            
    // Remove the final document, which has a parent id of None. We'll add a new
    // document with None to verify that the mapper handles the edge case of a
    // delete/insert in the same mapping operation.
5
    db.view::<BasicByParentId>().delete_docs().await?;
5
    collection.push(&Basic::new("B")).await?;

            
5
    let all_entries = db.view::<BasicByParentId>().query().await?;
5
    assert_eq!(all_entries.len(), 1);

            
5
    Ok(())
5
}

            
3
pub fn blocking_view_update_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
3
    let collection = db.collection::<Basic>();
3
    let a = collection.push(&Basic::new("A"))?;

            
3
    let a_children = db.view::<BasicByParentId>().with_key(&Some(a.id)).query()?;
3
    assert_eq!(a_children.len(), 0);
    // The reduce function of `BasicByParentId` acts as a "count" of records.
3
    assert_eq!(
3
        db.view::<BasicByParentId>()
3
            .with_key(&Some(a.id))
3
            .reduce()?,
        0
    );

            
    // Verify the eager view is available without any updates
3
    let a_children = db
3
        .view::<BasicByParentId>()
3
        .with_key(&Some(a.id))
3
        .with_access_policy(AccessPolicy::NoUpdate)
3
        .query()?;
3
    assert_eq!(a_children.len(), 0);
3
    assert_eq!(
3
        db.view::<BasicByParentId>()
3
            .with_key(&Some(a.id))
3
            .with_access_policy(AccessPolicy::NoUpdate)
3
            .reduce()?,
        0
    );

            
    // Test inserting a new record and the view being made available
3
    let a_child = collection.push(
3
        &Basic::new("A.1")
3
            .with_parent_id(a.id)
3
            .with_category("Alpha"),
3
    )?;

            
3
    let a_children = db.view::<BasicByParentId>().with_key(&Some(a.id)).query()?;
3
    assert_eq!(a_children.len(), 1);
3
    assert_eq!(
3
        db.view::<BasicByParentId>()
3
            .with_key(&Some(a.id))
3
            .reduce()?,
        1
    );

            
    // Verify reduce_grouped matches our expectations.
3
    assert_eq!(
3
        db.view::<BasicByParentId>().reduce_grouped()?,
3
        vec![MappedValue::new(None, 1,), MappedValue::new(Some(a.id), 1,),]
    );

            
    // Test updating the record and the view being updated appropriately
3
    let b = collection.push(&Basic::new("B"))?;
3
    let mut doc = db.collection::<Basic>().get(&a_child.id)?.unwrap();
3
    let mut basic = Basic::document_contents(&doc)?;
3
    basic.parent_id = Some(b.id);
3
    Basic::set_document_contents(&mut doc, basic)?;
3
    db.update::<Basic, _>(&mut doc)?;

            
3
    let b_children = db.view::<BasicByParentId>().with_key(&Some(b.id)).query()?;
3
    assert_eq!(b_children.len(), 1);
3
    assert_eq!(
3
        b_children[0].source,
3
        CollectionHeader::try_from(&doc.header)?
    );
3
    assert_eq!(
3
        db.view::<BasicByParentId>()
3
            .with_key(&Some(a.id))
3
            .reduce()?,
        0
    );
3
    assert_eq!(db.view::<BasicByParentId>().reduce()?, 3);

            
    // Update the record, but don't change its mapping. Ensure the source's
    // header is updated.
3
    let mut basic = Basic::document_contents(&doc)?;
3
    basic.value = String::from("new value");
3
    Basic::set_document_contents(&mut doc, basic)?;
3
    db.update::<Basic, _>(&mut doc)?;
3
    let b_children = db.view::<BasicByParentId>().with_key(&Some(b.id)).query()?;
3
    assert_eq!(b_children.len(), 1);
3
    assert_eq!(
3
        b_children[0].source,
3
        CollectionHeader::try_from(&doc.header)?
    );

            
    // Test deleting a record and ensuring it goes away
3
    db.collection::<Basic>().delete(&doc)?;

            
3
    let all_entries = db.view::<BasicByParentId>().query()?;
3
    assert_eq!(all_entries.len(), 2);

            
    // Verify reduce_grouped matches our expectations.
3
    assert_eq!(
3
        db.view::<BasicByParentId>().reduce_grouped()?,
3
        vec![MappedValue::new(None, 2,),]
    );

            
    // Remove the final document, which has a parent id of None. We'll add a new
    // document with None to verify that the mapper handles the edge case of a
    // delete/insert in the same mapping operation.
3
    db.view::<BasicByParentId>().delete_docs()?;
3
    collection.push(&Basic::new("B"))?;

            
3
    let all_entries = db.view::<BasicByParentId>().query()?;
3
    assert_eq!(all_entries.len(), 1);

            
3
    Ok(())
3
}

            
5
pub async fn view_multi_emit_tests<C: AsyncConnection>(db: &C) -> anyhow::Result<()> {
5
    let mut a = Basic::new("A")
5
        .with_tag("red")
5
        .with_tag("green")
5
        .push_into_async(db)
5
        .await?;
5
    let mut b = Basic::new("B")
5
        .with_tag("blue")
5
        .with_tag("green")
5
        .push_into_async(db)
5
        .await?;

            
5
    assert_eq!(db.view::<BasicByTag>().query().await?.len(), 4);

            
    assert_eq!(
5
        db.view::<BasicByTag>()
5
            .with_key("green")
5
            .query()
5
            .await?
5
            .len(),
        2
    );

            
    assert_eq!(
5
        db.view::<BasicByTag>().with_key("red").query().await?.len(),
        1
    );

            
    assert_eq!(
5
        db.view::<BasicByTag>()
5
            .with_key("blue")
5
            .query()
5
            .await?
5
            .len(),
        1
    );

            
    // Change tags
5
    a.contents.tags = vec![String::from("red"), String::from("blue")];
5
    a.update_async(db).await?;

            
    assert_eq!(
5
        db.view::<BasicByTag>()
5
            .with_key("green")
5
            .query()
5
            .await?
5
            .len(),
        1
    );

            
    assert_eq!(
5
        db.view::<BasicByTag>().with_key("red").query().await?.len(),
        1
    );

            
    assert_eq!(
5
        db.view::<BasicByTag>()
5
            .with_key("blue")
5
            .query()
5
            .await?
5
            .len(),
        2
    );
5
    b.contents.tags.clear();
5
    b.update_async(db).await?;

            
    assert_eq!(
5
        db.view::<BasicByTag>()
5
            .with_key("green")
5
            .query()
5
            .await?
5
            .len(),
        0
    );

            
    assert_eq!(
5
        db.view::<BasicByTag>().with_key("red").query().await?.len(),
        1
    );

            
    assert_eq!(
5
        db.view::<BasicByTag>()
5
            .with_key("blue")
5
            .query()
5
            .await?
5
            .len(),
        1
    );

            
5
    Ok(())
5
}

            
3
pub fn blocking_view_multi_emit_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
3
    let mut a = Basic::new("A")
3
        .with_tag("red")
3
        .with_tag("green")
3
        .push_into(db)?;
3
    let mut b = Basic::new("B")
3
        .with_tag("blue")
3
        .with_tag("green")
3
        .push_into(db)?;

            
3
    assert_eq!(db.view::<BasicByTag>().query()?.len(), 4);

            
3
    assert_eq!(db.view::<BasicByTag>().with_key("green").query()?.len(), 2);

            
3
    assert_eq!(db.view::<BasicByTag>().with_key("red").query()?.len(), 1);

            
3
    assert_eq!(db.view::<BasicByTag>().with_key("blue").query()?.len(), 1);

            
    // Change tags
3
    a.contents.tags = vec![String::from("red"), String::from("blue")];
3
    a.update(db)?;

            
3
    assert_eq!(db.view::<BasicByTag>().with_key("green").query()?.len(), 1);

            
3
    assert_eq!(db.view::<BasicByTag>().with_key("red").query()?.len(), 1);

            
3
    assert_eq!(db.view::<BasicByTag>().with_key("blue").query()?.len(), 2);
3
    b.contents.tags.clear();
3
    b.update(db)?;

            
3
    assert_eq!(db.view::<BasicByTag>().with_key("green").query()?.len(), 0);

            
3
    assert_eq!(db.view::<BasicByTag>().with_key("red").query()?.len(), 1);

            
3
    assert_eq!(db.view::<BasicByTag>().with_key("blue").query()?.len(), 1);

            
3
    Ok(())
3
}

            
5
pub async fn view_access_policy_tests<C: AsyncConnection>(db: &C) -> anyhow::Result<()> {
5
    let collection = db.collection::<Basic>();

            
    // Query the view to get it into a consistent, mapped state. If we don't
    // query it, the integrity checker can end up spawning a mapping job. This
    // job would run in the background and can include the next documents being
    // inserted.
5
    let all_entries = db.view::<BasicByParentId>().query().await?;
5
    assert_eq!(all_entries.len(), 0);

            
5
    let a = collection.push(&Basic::new("A")).await?;

            
    // Test inserting a record that should match the view, but ask for it to be
    // NoUpdate. Verify we get no matches.
5
    collection
5
        .push(
5
            &Basic::new("A.1")
5
                .with_parent_id(a.id)
5
                .with_category("Alpha"),
5
        )
5
        .await?;

            
5
    let a_children = db
5
        .view::<BasicByParentId>()
5
        .with_key(&Some(a.id))
5
        .with_access_policy(AccessPolicy::NoUpdate)
5
        .query()
5
        .await?;
5
    assert_eq!(a_children.len(), 0);

            
5
    tokio::time::sleep(Duration::from_millis(20)).await;

            
    // Verify the view still have no value, but this time ask for it to be
    // updated after returning
5
    let a_children = db
5
        .view::<BasicByParentId>()
5
        .with_key(&Some(a.id))
5
        .with_access_policy(AccessPolicy::UpdateAfter)
5
        .query()
5
        .await?;
5
    assert_eq!(a_children.len(), 0);

            
    // Waiting on background jobs can be unreliable in a CI environment
5
    for _ in 0..10_u8 {
5
        tokio::time::sleep(Duration::from_millis(20)).await;

            
        // Now, the view should contain the entry.
5
        let a_children = db
5
            .view::<BasicByParentId>()
5
            .with_key(&Some(a.id))
5
            .with_access_policy(AccessPolicy::NoUpdate)
5
            .query()
5
            .await?;
5
        if a_children.len() == 1 {
5
            return Ok(());
        }
    }
    panic!("view never updated")
5
}

            
3
pub fn blocking_view_access_policy_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
    // Query the view to get it into a consistent, mapped state. If we don't
    // query it, the integrity checker can end up spawning a mapping job. This
    // job would run in the background and can include the next documents being
    // inserted.
3
    let all_entries = db.view::<BasicByParentId>().query()?;
3
    assert_eq!(all_entries.len(), 0);

            
3
    let collection = db.collection::<Basic>();
3
    let a = collection.push(&Basic::new("A"))?;

            
    // Test inserting a record that should match the view, but ask for it to be
    // NoUpdate. Verify we get no matches.
3
    collection.push(
3
        &Basic::new("A.1")
3
            .with_parent_id(a.id)
3
            .with_category("Alpha"),
3
    )?;

            
3
    let a_children = db
3
        .view::<BasicByParentId>()
3
        .with_key(&Some(a.id))
3
        .with_access_policy(AccessPolicy::NoUpdate)
3
        .query()?;
3
    assert_eq!(a_children.len(), 0);

            
3
    std::thread::sleep(Duration::from_millis(20));

            
    // Verify the view still have no value, but this time ask for it to be
    // updated after returning
3
    let a_children = db
3
        .view::<BasicByParentId>()
3
        .with_key(&Some(a.id))
3
        .with_access_policy(AccessPolicy::UpdateAfter)
3
        .query()?;
3
    assert_eq!(a_children.len(), 0);

            
    // Waiting on background jobs can be unreliable in a CI environment
3
    for _ in 0..10_u8 {
3
        std::thread::sleep(Duration::from_millis(20));

            
        // Now, the view should contain the entry.
3
        let a_children = db
3
            .view::<BasicByParentId>()
3
            .with_key(&Some(a.id))
3
            .with_access_policy(AccessPolicy::NoUpdate)
3
            .query()?;
3
        if a_children.len() == 1 {
3
            return Ok(());
        }
    }
    panic!("view never updated")
3
}

            
5
pub async fn unique_view_tests<C: AsyncConnection>(db: &C) -> anyhow::Result<()> {
5
    let first_doc = db.collection::<Unique>().push(&Unique::new("1")).await?;

            
    if let Err(Error::UniqueKeyViolation {
5
        view,
5
        existing_document,
5
        conflicting_document,
5
    }) = db.collection::<Unique>().push(&Unique::new("1")).await
    {
5
        assert_eq!(view, UniqueValue.view_name());
5
        assert_eq!(first_doc.id, existing_document.id.deserialize()?);
        // We can't predict the conflicting document id since it's generated
        // inside of the transaction, but we can assert that it's different than
        // the document that was previously stored.
5
        assert_ne!(conflicting_document, existing_document);
    } else {
        unreachable!("unique key violation not triggered");
    }

            
5
    let second_doc = db.collection::<Unique>().push(&Unique::new("2")).await?;
5
    let mut second_doc = db
5
        .collection::<Unique>()
5
        .get(&second_doc.id)
5
        .await?
5
        .unwrap();
5
    let mut contents = Unique::document_contents(&second_doc)?;
5
    contents.value = String::from("1");
5
    Unique::set_document_contents(&mut second_doc, contents)?;
    if let Err(Error::UniqueKeyViolation {
5
        view,
5
        existing_document,
5
        conflicting_document,
5
    }) = db.update::<Unique, _>(&mut second_doc).await
    {
5
        assert_eq!(view, UniqueValue.view_name());
5
        assert_eq!(first_doc.id, existing_document.id.deserialize()?);
5
        assert_eq!(conflicting_document.id, second_doc.header.id);
    } else {
        unreachable!("unique key violation not triggered");
    }

            
5
    Ok(())
5
}

            
3
pub fn blocking_unique_view_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
3
    let first_doc = db.collection::<Unique>().push(&Unique::new("1"))?;

            
    if let Err(Error::UniqueKeyViolation {
3
        view,
3
        existing_document,
3
        conflicting_document,
3
    }) = db.collection::<Unique>().push(&Unique::new("1"))
    {
3
        assert_eq!(view, UniqueValue.view_name());
3
        assert_eq!(first_doc.id, existing_document.id.deserialize()?);
        // We can't predict the conflicting document id since it's generated
        // inside of the transaction, but we can assert that it's different than
        // the document that was previously stored.
3
        assert_ne!(conflicting_document, existing_document);
    } else {
        unreachable!("unique key violation not triggered");
    }

            
3
    let second_doc = db.collection::<Unique>().push(&Unique::new("2"))?;
3
    let mut second_doc = db.collection::<Unique>().get(&second_doc.id)?.unwrap();
3
    let mut contents = Unique::document_contents(&second_doc)?;
3
    contents.value = String::from("1");
3
    Unique::set_document_contents(&mut second_doc, contents)?;
    if let Err(Error::UniqueKeyViolation {
3
        view,
3
        existing_document,
3
        conflicting_document,
3
    }) = db.update::<Unique, _>(&mut second_doc)
    {
3
        assert_eq!(view, UniqueValue.view_name());
3
        assert_eq!(first_doc.id, existing_document.id.deserialize()?);
3
        assert_eq!(conflicting_document.id, second_doc.header.id);
    } else {
        unreachable!("unique key violation not triggered");
    }

            
3
    Ok(())
3
}

            
3
pub fn blocking_cow_views<C: Connection>(db: &C) -> anyhow::Result<()> {
3
    db.collection::<Basic>()
3
        .push(&Basic::new("test").with_category("category"))?;

            
3
    let mapping = BasicByCategoryCow::entries(db)
3
        .query()?
3
        .into_iter()
3
        .next()
3
        .expect("mapping not found");
3
    assert_eq!(mapping.key, "category");
3
    assert_eq!(BasicByCategoryCow::entries(db).reduce()?, 1);

            
3
    Ok(())
3
}

            
5
pub async fn named_collection_tests<C: AsyncConnection>(db: &C) -> anyhow::Result<()> {
5
    Unique::new("0").push_into_async(db).await?;
5
    let original_entry = Unique::entry_async("1", db)
5
        .update_with(|_existing: &mut Unique| unreachable!())
5
        .or_insert_with(|| Unique::new("1"))
15
        .await?
5
        .expect("Document not inserted");

            
5
    let updated = Unique::entry_async("1", db)
5
        .update_with(|existing: &mut Unique| {
5
            existing.value = String::from("2");
5
        })
5
        .or_insert_with(|| unreachable!())
15
        .await?
5
        .unwrap();
5
    assert_eq!(original_entry.header.id, updated.header.id);
5
    assert_ne!(original_entry.contents.value, updated.contents.value);

            
10
    let retrieved = Unique::entry_async("2", db).await?.unwrap();
5
    assert_eq!(retrieved.contents.value, updated.contents.value);

            
5
    let conflict = Unique::entry_async("2", db)
5
        .update_with(|existing: &mut Unique| {
5
            existing.value = String::from("0");
5
        })
15
        .await;
5
    assert!(matches!(conflict, Err(Error::UniqueKeyViolation { .. })));

            
5
    Ok(())
5
}

            
3
pub fn blocking_named_collection_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
3
    Unique::new("0").push_into(db)?;
3
    let original_entry = Unique::entry("1", db)
3
        .update_with(|_existing: &mut Unique| unreachable!())
3
        .or_insert_with(|| Unique::new("1"))
3
        .execute()?
3
        .expect("Document not inserted");

            
3
    let updated = Unique::entry("1", db)
3
        .update_with(|existing: &mut Unique| {
3
            existing.value = String::from("2");
3
        })
3
        .or_insert_with(|| unreachable!())
3
        .execute()?
3
        .unwrap();
3
    assert_eq!(original_entry.header.id, updated.header.id);
3
    assert_ne!(original_entry.contents.value, updated.contents.value);

            
3
    let retrieved = Unique::entry("2", db).execute()?.unwrap();
3
    assert_eq!(retrieved.contents.value, updated.contents.value);

            
3
    let conflict = Unique::entry("2", db)
3
        .update_with(|existing: &mut Unique| {
3
            existing.value = String::from("0");
3
        })
3
        .execute();
3
    assert!(matches!(conflict, Err(Error::UniqueKeyViolation { .. })));

            
3
    Ok(())
3
}

            
5
pub async fn compaction_tests<C: AsyncConnection + AsyncKeyValue>(db: &C) -> anyhow::Result<()> {
5
    let original_value = Basic::new("initial_value");
5
    let collection = db.collection::<Basic>();
5
    collection.push(&original_value).await?;

            
    // Test a collection compaction
5
    db.compact_collection::<Basic>().await?;

            
    // Test the key value store compaction
5
    db.set_key("foo", &1_u32).await?;
5
    db.compact_key_value_store().await?;

            
    // Compact everything... again...
5
    db.compact().await?;

            
5
    Ok(())
5
}

            
3
pub fn blocking_compaction_tests<C: Connection + KeyValue>(db: &C) -> anyhow::Result<()> {
3
    let original_value = Basic::new("initial_value");
3
    let collection = db.collection::<Basic>();
3
    collection.push(&original_value)?;

            
    // Test a collection compaction
3
    db.compact_collection::<Basic>()?;

            
    // Test the key value store compaction
3
    db.set_key("foo", &1_u32).execute()?;
3
    db.compact_key_value_store()?;

            
    // Compact everything... again...
3
    db.compact()?;

            
3
    Ok(())
3
}

            
5
pub async fn user_management_tests<C: AsyncConnection, S: AsyncStorageConnection>(
5
    admin: &C,
5
    server: S,
5
    server_name: &str,
5
) -> anyhow::Result<()> {
5
    let username = format!("user-management-tests-{server_name}");
5
    let user_id = server.create_user(&username).await?;
    // Test the default created user state.
    {
5
        let user = User::get_async(&user_id, admin)
5
            .await
5
            .unwrap()
5
            .expect("user not found");
5
        assert_eq!(user.contents.username, username);
5
        assert!(user.contents.groups.is_empty());
5
        assert!(user.contents.roles.is_empty());
    }

            
5
    let role = Role::named(format!("role-{server_name}"))
5
        .push_into_async(admin)
5
        .await
5
        .unwrap();
5
    let group = PermissionGroup::named(format!("group-{server_name}"))
5
        .push_into_async(admin)
5
        .await
5
        .unwrap();
5

            
5
    // Add the role and group.
5
    server
5
        .add_permission_group_to_user(user_id, &group)
5
        .await
5
        .unwrap();
5
    server.add_role_to_user(user_id, &role).await.unwrap();

            
    // Test the results
    {
5
        let user = User::get_async(&user_id, admin)
5
            .await
5
            .unwrap()
5
            .expect("user not found");
5
        assert_eq!(user.contents.groups, vec![group.header.id]);
5
        assert_eq!(user.contents.roles, vec![role.header.id]);
    }

            
    // Add the same things again (should not do anything). With names this time.
5
    server
5
        .add_permission_group_to_user(&username, &group)
5
        .await
5
        .unwrap();
5
    server.add_role_to_user(&username, &role).await.unwrap();
    {
5
        let user = User::load_async(&username, admin)
10
            .await
5
            .unwrap()
5
            .expect("user not found");
5
        assert_eq!(user.contents.groups, vec![group.header.id]);
5
        assert_eq!(user.contents.roles, vec![role.header.id]);
    }

            
    // Remove the group.
5
    server
5
        .remove_permission_group_from_user(user_id, &group)
5
        .await
5
        .unwrap();
5
    server.remove_role_from_user(user_id, &role).await.unwrap();
    {
5
        let user = User::get_async(&user_id, admin)
5
            .await
5
            .unwrap()
5
            .expect("user not found");
5
        assert!(user.contents.groups.is_empty());
5
        assert!(user.contents.roles.is_empty());
    }

            
    // Removing again shouldn't cause an error.
5
    server
5
        .remove_permission_group_from_user(user_id, &group)
5
        .await?;
5
    server.remove_role_from_user(user_id, &role).await?;

            
    // Remove the user
5
    server.delete_user(user_id).await?;
    // Test if user is removed.

            
5
    assert!(User::get_async(&user_id, admin).await.unwrap().is_none());

            
5
    Ok(())
5
}

            
3
pub fn blocking_user_management_tests<C: Connection, S: StorageConnection>(
3
    admin: &C,
3
    server: &S,
3
    server_name: &str,
3
) -> anyhow::Result<()> {
3
    let username = format!("user-management-tests-{server_name}");
3
    let user_id = server.create_user(&username)?;
    // Test the default created user state.
    {
3
        let user = User::get(&user_id, admin).unwrap().expect("user not found");
3
        assert_eq!(user.contents.username, username);
3
        assert!(user.contents.groups.is_empty());
3
        assert!(user.contents.roles.is_empty());
    }

            
3
    let role = Role::named(format!("role-{server_name}"))
3
        .push_into(admin)
3
        .unwrap();
3
    let group = PermissionGroup::named(format!("group-{server_name}"))
3
        .push_into(admin)
3
        .unwrap();
3

            
3
    // Add the role and group.
3
    server
3
        .add_permission_group_to_user(user_id, &group)
3
        .unwrap();
3
    server.add_role_to_user(user_id, &role).unwrap();
3

            
3
    // Test the results
3
    {
3
        let user = User::get(&user_id, admin).unwrap().expect("user not found");
3
        assert_eq!(user.contents.groups, vec![group.header.id]);
3
        assert_eq!(user.contents.roles, vec![role.header.id]);
    }

            
    // Add the same things again (should not do anything). With names this time.
3
    server
3
        .add_permission_group_to_user(&username, &group)
3
        .unwrap();
3
    server.add_role_to_user(&username, &role).unwrap();
3
    {
3
        let user = User::load(&username, admin)
3
            .unwrap()
3
            .expect("user not found");
3
        assert_eq!(user.contents.groups, vec![group.header.id]);
3
        assert_eq!(user.contents.roles, vec![role.header.id]);
    }

            
    // Remove the group.
3
    server
3
        .remove_permission_group_from_user(user_id, &group)
3
        .unwrap();
3
    server.remove_role_from_user(user_id, &role).unwrap();
3
    {
3
        let user = User::get(&user_id, admin).unwrap().expect("user not found");
3
        assert!(user.contents.groups.is_empty());
3
        assert!(user.contents.roles.is_empty());
    }

            
    // Removing again shouldn't cause an error.
3
    server.remove_permission_group_from_user(user_id, &group)?;
3
    server.remove_role_from_user(user_id, &role)?;

            
    // Remove the user
3
    server.delete_user(user_id)?;
    // Test if user is removed.

            
3
    assert!(User::get(&user_id, admin).unwrap().is_none());

            
3
    Ok(())
3
}

            
#[cfg(feature = "token-authentication")]
5
pub async fn token_authentication_tests<C: AsyncConnection, S: AsyncStorageConnection>(
5
    admin: &C,
5
    server: &S,
5
    server_name: &str,
5
) -> anyhow::Result<()> {
5
    let username = format!("token-authentication-tests-{server_name}");
5
    let user_id = server.create_user(&username).await?;
5
    let user_token =
10
        AuthenticationToken::create_async(IdentityReference::user(&username)?, admin).await?;

            
5
    let as_user = server
5
        .authenticate_with_token(user_token.header.id, &user_token.contents.token)
10
        .await?;
5
    let identity = as_user.session().and_then(Session::identity);
5
    if let Some(Identity::User { id, .. }) = identity {
5
        assert_eq!(*id, user_id);
    }

            
5
    let role = Role::named(format!("token-role-{server_name}"))
5
        .push_into_async(admin)
5
        .await
5
        .unwrap();
5
    let role_token =
5
        AuthenticationToken::create_async(IdentityReference::role(role.header.id)?, admin).await?;

            
5
    let as_role = server
5
        .authenticate_with_token(role_token.header.id, &role_token.contents.token)
10
        .await?;
5
    let identity = as_role.session().and_then(Session::identity);
5
    if let Some(Identity::Role { id, .. }) = identity {
5
        assert_eq!(*id, role.header.id);
    }

            
5
    Ok(())
5
}

            
#[cfg(feature = "token-authentication")]
3
pub fn blocking_token_authentication_tests<C: Connection, S: StorageConnection>(
3
    admin: &C,
3
    server: &S,
3
    server_name: &str,
3
) -> anyhow::Result<()> {
3
    let username = format!("blocking-token-authentication-tests-{server_name}");
3
    let user_id = server.create_user(&username)?;
3
    let user_token = AuthenticationToken::create(&IdentityReference::user(&username)?, admin)?;

            
3
    let as_user =
3
        server.authenticate_with_token(user_token.header.id, &user_token.contents.token)?;
3
    let identity = as_user.session().and_then(Session::identity);
3
    if let Some(Identity::User { id, .. }) = identity {
3
        assert_eq!(*id, user_id);
    }

            
3
    let role = Role::named(format!("token-role-{server_name}"))
3
        .push_into(admin)
3
        .unwrap();
3
    let role_token = AuthenticationToken::create(&IdentityReference::role(role.header.id)?, admin)?;

            
3
    let as_role =
3
        server.authenticate_with_token(role_token.header.id, &role_token.contents.token)?;
3
    let identity = as_role.session().and_then(Session::identity);
3
    if let Some(Identity::Role { id, .. }) = identity {
3
        assert_eq!(*id, role.header.id);
    }

            
3
    Ok(())
3
}

            
/// Defines the `KeyValue` test suite
#[macro_export]
macro_rules! define_async_kv_test_suite {
    ($harness:ident) => {
        #[cfg(test)]
        mod r#async_kv {
            use super::$harness;
            #[tokio::test]
5
            async fn basic_kv_test() -> anyhow::Result<()> {
5
                use $crate::keyvalue::{AsyncKeyValue, KeyStatus};
5
                let harness = $harness::new($crate::test_util::HarnessTest::KvBasic).await?;
5
                let db = harness.connect().await?;
5
                assert_eq!(
5
                    db.set_key("akey", &String::from("avalue")).await?,
5
                    KeyStatus::Inserted
5
                );
5
                assert_eq!(
5
                    db.get_key("akey").into().await?,
5
                    Some(String::from("avalue"))
5
                );
5
                assert_eq!(
5
                    db.set_key("akey", &String::from("new_value"))
5
                        .returning_previous_as()
5
                        .await?,
5
                    Some(String::from("avalue"))
5
                );
5
                assert_eq!(
5
                    db.get_key("akey").into().await?,
5
                    Some(String::from("new_value"))
5
                );
5
                assert_eq!(
5
                    db.get_key("akey").and_delete().into().await?,
5
                    Some(String::from("new_value"))
5
                );
5
                assert_eq!(db.get_key("akey").await?, None);
5
                assert_eq!(
5
                    db.set_key("akey", &String::from("new_value"))
5
                        .returning_previous()
5
                        .await?,
5
                    None
5
                );
5
                assert_eq!(db.delete_key("akey").await?, KeyStatus::Deleted);
5
                assert_eq!(db.delete_key("akey").await?, KeyStatus::NotChanged);
5

            
5
                harness.shutdown().await?;
5

            
5
                Ok(())
5
            }

            
            #[tokio::test]
5
            async fn kv_concurrency() -> anyhow::Result<()> {
5
                use $crate::keyvalue::{AsyncKeyValue, KeyStatus};
5
                const WRITERS: usize = 100;
5
                const INCREMENTS: usize = 100;
5
                let harness = $harness::new($crate::test_util::HarnessTest::KvConcurrency).await?;
5
                let db = harness.connect().await?;
5

            
5
                let handles = (0..WRITERS).map(|_| {
5
                    let db = db.clone();
5
                    tokio::task::spawn(async move {
5
                        for _ in 0..INCREMENTS {
5
                            db.increment_key_by("concurrency", 1_u64).await.unwrap();
5
                        }
5
                    })
5
                });
5
                futures::future::join_all(handles).await;
5

            
5
                assert_eq!(
5
                    db.get_key("concurrency").into_u64().await.unwrap().unwrap(),
5
                    (WRITERS * INCREMENTS) as u64
5
                );
5

            
5
                harness.shutdown().await?;
5

            
5
                Ok(())
5
            }

            
            #[tokio::test]
5
            async fn kv_set_tests() -> anyhow::Result<()> {
5
                use $crate::keyvalue::{AsyncKeyValue, KeyStatus};
5
                let harness = $harness::new($crate::test_util::HarnessTest::KvSet).await?;
5
                let db = harness.connect().await?;
5
                let kv = db.with_key_namespace("set");
5

            
5
                assert_eq!(
5
                    kv.set_key("a", &0_u32).only_if_exists().await?,
5
                    KeyStatus::NotChanged
5
                );
5
                assert_eq!(
5
                    kv.set_key("a", &0_u32).only_if_vacant().await?,
5
                    KeyStatus::Inserted
5
                );
5
                assert_eq!(
5
                    kv.set_key("a", &1_u32).only_if_vacant().await?,
5
                    KeyStatus::NotChanged
5
                );
5
                assert_eq!(
5
                    kv.set_key("a", &2_u32).only_if_exists().await?,
5
                    KeyStatus::Updated,
5
                );
5
                assert_eq!(
5
                    kv.set_key("a", &3_u32).returning_previous_as().await?,
5
                    Some(2_u32),
5
                );
5

            
5
                harness.shutdown().await?;
5

            
5
                Ok(())
5
            }

            
            #[tokio::test]
5
            async fn kv_increment_decrement_tests() -> anyhow::Result<()> {
5
                use $crate::keyvalue::{AsyncKeyValue, KeyStatus};
5
                let harness =
5
                    $harness::new($crate::test_util::HarnessTest::KvIncrementDecrement).await?;
5
                let db = harness.connect().await?;
5
                let kv = db.with_key_namespace("increment_decrement");
5

            
5
                // Empty keys should be equal to 0
5
                assert_eq!(kv.increment_key_by("i64", 1_i64).await?, 1_i64);
5
                assert_eq!(kv.get_key("i64").into_i64().await?, Some(1_i64));
5
                assert_eq!(kv.increment_key_by("u64", 1_u64).await?, 1_u64);
5
                $crate::assert_f64_eq!(kv.increment_key_by("f64", 1_f64).await?, 1_f64);
5

            
5
                // Test float incrementing/decrementing an existing value
5
                $crate::assert_f64_eq!(kv.increment_key_by("f64", 1_f64).await?, 2_f64);
5
                $crate::assert_f64_eq!(kv.decrement_key_by("f64", 2_f64).await?, 0_f64);
5

            
5
                // Empty keys should be equal to 0
5
                assert_eq!(kv.decrement_key_by("i64_2", 1_i64).await?, -1_i64);
5
                assert_eq!(
5
                    kv.decrement_key_by("u64_2", 42_u64)
5
                        .allow_overflow()
5
                        .await?,
5
                    u64::MAX - 41
5
                );
5
                assert_eq!(kv.decrement_key_by("u64_3", 42_u64).await?, u64::MIN);
5
                $crate::assert_f64_eq!(kv.decrement_key_by("f64_2", 1_f64).await?, -1_f64);
5

            
5
                // Test decrement wrapping with overflow
5
                kv.set_numeric_key("i64", i64::MIN).await?;
5
                assert_eq!(
5
                    kv.decrement_key_by("i64", 1_i64).allow_overflow().await?,
5
                    i64::MAX
5
                );
5
                assert_eq!(
5
                    kv.decrement_key_by("u64", 2_u64).allow_overflow().await?,
5
                    u64::MAX
5
                );
5

            
5
                // Test increment wrapping with overflow
5
                assert_eq!(
5
                    kv.increment_key_by("i64", 1_i64).allow_overflow().await?,
5
                    i64::MIN
5
                );
5
                assert_eq!(
5
                    kv.increment_key_by("u64", 1_u64).allow_overflow().await?,
5
                    u64::MIN
5
                );
5

            
5
                // Test saturating increments.
5
                kv.set_numeric_key("i64", i64::MAX - 1).await?;
5
                kv.set_numeric_key("u64", u64::MAX - 1).await?;
5
                assert_eq!(kv.increment_key_by("i64", 2_i64).await?, i64::MAX);
5
                assert_eq!(kv.increment_key_by("u64", 2_u64).await?, u64::MAX);
5

            
5
                // Test saturating decrements.
5
                kv.set_numeric_key("i64", i64::MIN + 1).await?;
5
                kv.set_numeric_key("u64", u64::MIN + 1).await?;
5
                assert_eq!(kv.decrement_key_by("i64", 2_i64).await?, i64::MIN);
5
                assert_eq!(kv.decrement_key_by("u64", 2_u64).await?, u64::MIN);
5

            
5
                // Test numerical conversion safety using get
5
                {
5
                    // For i64 -> f64, the limit is 2^52 + 1 in either posive or
5
                    // negative directions.
5
                    kv.set_numeric_key("i64", (2_i64.pow(f64::MANTISSA_DIGITS)))
5
                        .await?;
5
                    $crate::assert_f64_eq!(
5
                        kv.get_key("i64").into_f64().await?.unwrap(),
5
                        9_007_199_254_740_992_f64
5
                    );
5
                    kv.set_numeric_key("i64", -(2_i64.pow(f64::MANTISSA_DIGITS)))
5
                        .await?;
5
                    $crate::assert_f64_eq!(
5
                        kv.get_key("i64").into_f64().await?.unwrap(),
5
                        -9_007_199_254_740_992_f64
5
                    );
5

            
5
                    kv.set_numeric_key("i64", (2_i64.pow(f64::MANTISSA_DIGITS) + 1))
5
                        .await?;
5
                    assert!(kv.get_key("i64").into_f64().await.is_err());
5
                    $crate::assert_f64_eq!(
5
                        kv.get_key("i64").into_f64_lossy().await?.unwrap(),
5
                        9_007_199_254_740_993_f64
5
                    );
5
                    kv.set_numeric_key("i64", -(2_i64.pow(f64::MANTISSA_DIGITS) + 1))
5
                        .await?;
5
                    assert!(kv.get_key("i64").into_f64().await.is_err());
5
                    $crate::assert_f64_eq!(
5
                        kv.get_key("i64").into_f64_lossy().await?.unwrap(),
5
                        -9_007_199_254_740_993_f64
5
                    );
5

            
5
                    // For i64 -> u64, the only limit is sign.
5
                    kv.set_numeric_key("i64", -1_i64).await?;
5
                    assert!(kv.get_key("i64").into_u64().await.is_err());
5
                    assert_eq!(
5
                        kv.get_key("i64").into_u64_lossy(true).await?.unwrap(),
5
                        0_u64
5
                    );
5
                    assert_eq!(
5
                        kv.get_key("i64").into_u64_lossy(false).await?.unwrap(),
5
                        u64::MAX
5
                    );
5

            
5
                    // For f64 -> i64, the limit is fractional numbers. Saturating isn't tested in this conversion path.
5
                    kv.set_numeric_key("f64", 1.1_f64).await?;
5
                    assert!(kv.get_key("f64").into_i64().await.is_err());
5
                    assert_eq!(
5
                        kv.get_key("f64").into_i64_lossy(false).await?.unwrap(),
5
                        1_i64
5
                    );
5
                    kv.set_numeric_key("f64", -1.1_f64).await?;
5
                    assert!(kv.get_key("f64").into_i64().await.is_err());
5
                    assert_eq!(
5
                        kv.get_key("f64").into_i64_lossy(false).await?.unwrap(),
5
                        -1_i64
5
                    );
5

            
5
                    // For f64 -> u64, the limit is fractional numbers or negative numbers. Saturating isn't tested in this conversion path.
5
                    kv.set_numeric_key("f64", 1.1_f64).await?;
5
                    assert!(kv.get_key("f64").into_u64().await.is_err());
5
                    assert_eq!(
5
                        kv.get_key("f64").into_u64_lossy(false).await?.unwrap(),
5
                        1_u64
5
                    );
5
                    kv.set_numeric_key("f64", -1.1_f64).await?;
5
                    assert!(kv.get_key("f64").into_u64().await.is_err());
5
                    assert_eq!(
5
                        kv.get_key("f64").into_u64_lossy(false).await?.unwrap(),
5
                        0_u64
5
                    );
5

            
5
                    // For u64 -> i64, the limit is > i64::MAX
5
                    kv.set_numeric_key("u64", i64::MAX as u64 + 1).await?;
5
                    assert!(kv.get_key("u64").into_i64().await.is_err());
5
                    assert_eq!(
5
                        kv.get_key("u64").into_i64_lossy(true).await?.unwrap(),
5
                        i64::MAX
5
                    );
5
                    assert_eq!(
5
                        kv.get_key("u64").into_i64_lossy(false).await?.unwrap(),
5
                        i64::MIN
5
                    );
5
                }
5

            
5
                // Test that non-numeric keys won't be changed when attempting to incr/decr
5
                kv.set_key("non-numeric", &String::from("test")).await?;
5
                assert!(kv.increment_key_by("non-numeric", 1_i64).await.is_err());
5
                assert!(kv.decrement_key_by("non-numeric", 1_i64).await.is_err());
5
                assert_eq!(
5
                    kv.get_key("non-numeric").into::<String>().await?.unwrap(),
5
                    String::from("test")
5
                );
5

            
5
                // Test that NaN cannot be stored
5
                kv.set_numeric_key("f64", 0_f64).await?;
5
                assert!(matches!(
5
                    kv.set_numeric_key("f64", f64::NAN).await,
5
                    Err(bonsaidb_core::Error::NotANumber)
5
                ));
5
                // Verify the value was unchanged.
5
                $crate::assert_f64_eq!(kv.get_key("f64").into_f64().await?.unwrap(), 0.);
5
                // Try to increment by nan
5
                assert!(matches!(
5
                    kv.increment_key_by("f64", f64::NAN).await,
5
                    Err(bonsaidb_core::Error::NotANumber)
5
                ));
5
                $crate::assert_f64_eq!(kv.get_key("f64").into_f64().await?.unwrap(), 0.);
5

            
5
                harness.shutdown().await?;
5

            
5
                Ok(())
5
            }

            
            #[tokio::test]
5
            async fn kv_expiration_tests() -> anyhow::Result<()> {
5
                use std::time::Duration;
5

            
5
                use $crate::keyvalue::{AsyncKeyValue, KeyStatus};
5

            
5
                let harness = $harness::new($crate::test_util::HarnessTest::KvExpiration).await?;
5
                let db = harness.connect().await?;
5

            
5
                loop {
5
                    let kv = db.with_key_namespace("expiration");
5

            
5
                    kv.delete_key("a").await?;
5
                    kv.delete_key("b").await?;
5

            
5
                    // Test that the expiration is updated for key a, but not for key b.
5
                    let timing = $crate::test_util::TimingTest::new(Duration::from_millis(500));
5
                    let (r1, r2) = tokio::join!(
5
                        kv.set_key("a", &0_u32).expire_in(Duration::from_secs(2)),
5
                        kv.set_key("b", &0_u32).expire_in(Duration::from_secs(2))
5
                    );
5
                    if timing.elapsed() > Duration::from_millis(500) {
5
                        println!(
5
                            "Restarting test {}. Took too long {:?}",
5
                            line!(),
5
                            timing.elapsed(),
5
                        );
5
                        continue;
5
                    }
5
                    assert_eq!(r1?, KeyStatus::Inserted);
5
                    assert_eq!(r2?, KeyStatus::Inserted);
5
                    let (r1, r2) = tokio::join!(
5
                        kv.set_key("a", &1_u32).expire_in(Duration::from_secs(4)),
5
                        kv.set_key("b", &1_u32)
5
                            .expire_in(Duration::from_secs(100))
5
                            .keep_existing_expiration()
5
                    );
5
                    if timing.elapsed() > Duration::from_secs(1) {
5
                        println!(
5
                            "Restarting test {}. Took too long {:?}",
5
                            line!(),
5
                            timing.elapsed(),
5
                        );
5
                        continue;
5
                    }
5

            
5
                    assert_eq!(r1?, KeyStatus::Updated, "a wasn't an update");
5
                    assert_eq!(r2?, KeyStatus::Updated, "b wasn't an update");
5

            
5
                    let a = kv.get_key("a").into().await?;
5
                    assert_eq!(a, Some(1u32), "a shouldn't have expired yet");
5

            
5
                    // Before checking the value, make sure we haven't elapsed too
5
                    // much time. If so, just restart the test.
5
                    if !timing.wait_until_async(Duration::from_secs_f32(3.)).await {
5
                        println!(
5
                            "Restarting test {}. Took too long {:?}",
5
                            line!(),
5
                            timing.elapsed()
5
                        );
5
                        continue;
5
                    }
5

            
5
                    assert_eq!(kv.get_key("b").await?, None, "b never expired");
5

            
5
                    timing.wait_until_async(Duration::from_secs_f32(5.)).await;
5
                    assert_eq!(kv.get_key("a").await?, None, "a never expired");
5
                    break;
5
                }
5
                harness.shutdown().await?;
5

            
5
                Ok(())
5
            }

            
            #[tokio::test]
5
            async fn delete_expire_tests() -> anyhow::Result<()> {
5
                use std::time::Duration;
5

            
5
                use $crate::keyvalue::{AsyncKeyValue, KeyStatus};
5

            
5
                let harness = $harness::new($crate::test_util::HarnessTest::KvDeleteExpire).await?;
5
                let db = harness.connect().await?;
5

            
5
                loop {
5
                    let kv = db.with_key_namespace("delete_expire");
5

            
5
                    kv.delete_key("a").await?;
5

            
5
                    let timing = $crate::test_util::TimingTest::new(Duration::from_millis(100));
5

            
5
                    // Create a key with an expiration. Delete the key. Set a new
5
                    // value at that key with no expiration. Ensure it doesn't
5
                    // expire.
5
                    kv.set_key("a", &0_u32)
5
                        .expire_in(Duration::from_secs(2))
5
                        .await?;
5
                    kv.delete_key("a").await?;
5
                    kv.set_key("a", &1_u32).await?;
5
                    if timing.elapsed() > Duration::from_secs(1) {
5
                        println!(
5
                            "Restarting test {}. Took too long {:?}",
5
                            line!(),
5
                            timing.elapsed(),
5
                        );
5
                        continue;
5
                    }
5
                    if !timing.wait_until_async(Duration::from_secs_f32(2.5)).await {
5
                        println!(
5
                            "Restarting test {}. Took too long {:?}",
5
                            line!(),
5
                            timing.elapsed()
5
                        );
5
                        continue;
5
                    }
5

            
5
                    assert_eq!(kv.get_key("a").into().await?, Some(1u32));
5

            
5
                    break;
5
                }
5
                harness.shutdown().await?;
5

            
5
                Ok(())
5
            }

            
            #[tokio::test]
            // This test can fail when the machine its running on is under high load or
            // constrained resources. We need a command that persists (and waits until
            // persist) to make this test fully deterministic.
5
            async fn kv_transaction_tests() -> anyhow::Result<()> {
5
                use std::time::Duration;
5

            
5
                use $crate::connection::AsyncConnection;
5
                use $crate::keyvalue::{AsyncKeyValue, KeyStatus};
5
                // loop {
5
                let harness = $harness::new($crate::test_util::HarnessTest::KvTransactions).await?;
5
                let db = harness.connect().await?;
5
                // Generate several transactions that we can validate. Persisting
5
                // happens in the background, so we delay between each step to give
5
                // it a moment.
5
                db.set_key("expires", &0_u32)
5
                    .expire_in(Duration::from_secs(1))
5
                    .await?;
5
                db.set_key("akey", &String::from("avalue")).await?;
5
                db.set_numeric_key("nkey", 0_u64).await?;
5
                tokio::time::sleep(Duration::from_millis(500)).await;
5
                db.increment_key_by("nkey", 1_u64).await?;
5
                tokio::time::sleep(Duration::from_millis(500)).await;
5
                db.delete_key("nkey").await?;
5
                db.get_key("akey").and_delete().await?;
5
                tokio::time::sleep(Duration::from_millis(500)).await;
5
                // Ensure this doesn't generate a transaction.
5
                db.delete_key("nkey").await?;
5

            
5
                tokio::time::sleep(Duration::from_secs(1)).await;
5

            
5
                let transactions =
5
                    AsyncConnection::list_executed_transactions(&db, None, None).await?;
5
                let deleted_keys = transactions
5
                    .iter()
5
                    .filter_map(|tx| tx.changes.keys())
5
                    .flatten()
5
                    .filter(|changed_key| dbg!(changed_key).deleted)
5
                    .count();
5
                assert_eq!(deleted_keys, 3, "deleted keys wasn't 3");
5
                let akey_changes = transactions
5
                    .iter()
5
                    .filter_map(|tx| tx.changes.keys())
5
                    .flatten()
5
                    .filter(|changed_key| changed_key.key == "akey")
5
                    .count();
5
                assert_eq!(akey_changes, 2, "akey changes wasn't 2");
5
                let nkey_changes = transactions
5
                    .iter()
5
                    .filter_map(|tx| tx.changes.keys())
5
                    .flatten()
5
                    .filter(|changed_key| changed_key.key == "nkey")
5
                    .count();
5
                assert_eq!(nkey_changes, 3, "nkey changes wasn't 3");
5

            
5
                harness.shutdown().await?;
5
                // }
5

            
5
                Ok(())
5
            }
        }
    };
}

            
/// Defines the `KeyValue` test suite
#[macro_export]
macro_rules! define_blocking_kv_test_suite {
    ($harness:ident) => {
        #[cfg(test)]
        mod blocking_kv {
            use super::$harness;

            
            #[test]
3
            fn basic_kv_test() -> anyhow::Result<()> {
                use $crate::keyvalue::{KeyStatus, KeyValue};
3
                let harness = $harness::new($crate::test_util::HarnessTest::KvBasic)?;
3
                let db = harness.connect()?;
                assert_eq!(
3
                    db.set_key("akey", &String::from("avalue")).execute()?,
                    KeyStatus::Inserted
                );
3
                assert_eq!(db.get_key("akey").into()?, Some(String::from("avalue")));
                assert_eq!(
3
                    db.set_key("akey", &String::from("new_value"))
3
                        .returning_previous_as()?,
3
                    Some(String::from("avalue"))
                );
3
                assert_eq!(db.get_key("akey").into()?, Some(String::from("new_value")));
                assert_eq!(
3
                    db.get_key("akey").and_delete().into()?,
3
                    Some(String::from("new_value"))
                );
3
                assert_eq!(db.get_key("akey").query()?, None);
                assert_eq!(
3
                    db.set_key("akey", &String::from("new_value"))
3
                        .returning_previous()?,
                    None
                );
3
                assert_eq!(db.delete_key("akey")?, KeyStatus::Deleted);
3
                assert_eq!(db.delete_key("akey")?, KeyStatus::NotChanged);

            
3
                harness.shutdown()?;

            
3
                Ok(())
3
            }

            
            #[test]
3
            fn kv_concurrency() -> anyhow::Result<()> {
                use $crate::keyvalue::{KeyStatus, KeyValue};
                const WRITERS: usize = 100;
                const INCREMENTS: usize = 100;
3
                let harness = $harness::new($crate::test_util::HarnessTest::KvConcurrency)?;
3
                let db = harness.connect()?;

            
3
                let threads = (0..WRITERS)
3
                    .map(|_| {
                        let db = db.clone();
                        std::thread::spawn(move || {
                            for _ in 0..INCREMENTS {
                                db.increment_key_by("concurrency", 1_u64).execute().unwrap();
                            }
                        })
3
                    })
3
                    .collect::<Vec<_>>();
303
                for thread in threads {
300
                    thread.join().unwrap();
300
                }

            
3
                assert_eq!(
3
                    db.get_key("concurrency").into_u64().unwrap().unwrap(),
3
                    (WRITERS * INCREMENTS) as u64
3
                );

            
3
                harness.shutdown()?;

            
3
                Ok(())
3
            }

            
            #[test]
3
            fn kv_set_tests() -> anyhow::Result<()> {
                use $crate::keyvalue::{KeyStatus, KeyValue};
3
                let harness = $harness::new($crate::test_util::HarnessTest::KvSet)?;
3
                let db = harness.connect()?;
3
                let kv = db.with_key_namespace("set");
3

            
3
                assert_eq!(
3
                    kv.set_key("a", &0_u32).only_if_exists().execute()?,
                    KeyStatus::NotChanged
                );
                assert_eq!(
3
                    kv.set_key("a", &0_u32).only_if_vacant().execute()?,
                    KeyStatus::Inserted
                );
                assert_eq!(
3
                    kv.set_key("a", &1_u32).only_if_vacant().execute()?,
                    KeyStatus::NotChanged
                );
                assert_eq!(
3
                    kv.set_key("a", &2_u32).only_if_exists().execute()?,
                    KeyStatus::Updated,
                );
                assert_eq!(
3
                    kv.set_key("a", &3_u32).returning_previous_as()?,
                    Some(2_u32),
                );

            
3
                harness.shutdown()?;

            
3
                Ok(())
3
            }

            
            #[test]
3
            fn kv_increment_decrement_tests() -> anyhow::Result<()> {
                use $crate::keyvalue::{KeyStatus, KeyValue};
3
                let harness = $harness::new($crate::test_util::HarnessTest::KvIncrementDecrement)?;
3
                let db = harness.connect()?;
3
                let kv = db.with_key_namespace("increment_decrement");
3

            
3
                // Empty keys should be equal to 0
3
                assert_eq!(kv.increment_key_by("i64", 1_i64).execute()?, 1_i64);
3
                assert_eq!(kv.get_key("i64").into_i64()?, Some(1_i64));
3
                assert_eq!(kv.increment_key_by("u64", 1_u64).execute()?, 1_u64);
3
                $crate::assert_f64_eq!(kv.increment_key_by("f64", 1_f64).execute()?, 1_f64);

            
                // Test float incrementing/decrementing an existing value
3
                $crate::assert_f64_eq!(kv.increment_key_by("f64", 1_f64).execute()?, 2_f64);
3
                $crate::assert_f64_eq!(kv.decrement_key_by("f64", 2_f64).execute()?, 0_f64);

            
                // Empty keys should be equal to 0
3
                assert_eq!(kv.decrement_key_by("i64_2", 1_i64).execute()?, -1_i64);
                assert_eq!(
3
                    kv.decrement_key_by("u64_2", 42_u64)
3
                        .allow_overflow()
3
                        .execute()?,
3
                    u64::MAX - 41
                );
3
                assert_eq!(kv.decrement_key_by("u64_3", 42_u64).execute()?, u64::MIN);
3
                $crate::assert_f64_eq!(kv.decrement_key_by("f64_2", 1_f64).execute()?, -1_f64);

            
                // Test decrement wrapping with overflow
3
                kv.set_numeric_key("i64", i64::MIN).execute()?;
                assert_eq!(
3
                    kv.decrement_key_by("i64", 1_i64)
3
                        .allow_overflow()
3
                        .execute()?,
                    i64::MAX
                );
                assert_eq!(
3
                    kv.decrement_key_by("u64", 2_u64)
3
                        .allow_overflow()
3
                        .execute()?,
                    u64::MAX
                );

            
                // Test increment wrapping with overflow
                assert_eq!(
3
                    kv.increment_key_by("i64", 1_i64)
3
                        .allow_overflow()
3
                        .execute()?,
                    i64::MIN
                );
                assert_eq!(
3
                    kv.increment_key_by("u64", 1_u64)
3
                        .allow_overflow()
3
                        .execute()?,
                    u64::MIN
                );

            
                // Test saturating increments.
3
                kv.set_numeric_key("i64", i64::MAX - 1).execute()?;
3
                kv.set_numeric_key("u64", u64::MAX - 1).execute()?;
3
                assert_eq!(kv.increment_key_by("i64", 2_i64).execute()?, i64::MAX);
3
                assert_eq!(kv.increment_key_by("u64", 2_u64).execute()?, u64::MAX);

            
                // Test saturating decrements.
3
                kv.set_numeric_key("i64", i64::MIN + 1).execute()?;
3
                kv.set_numeric_key("u64", u64::MIN + 1).execute()?;
3
                assert_eq!(kv.decrement_key_by("i64", 2_i64).execute()?, i64::MIN);
3
                assert_eq!(kv.decrement_key_by("u64", 2_u64).execute()?, u64::MIN);

            
                // Test numerical conversion safety using get
                {
                    // For i64 -> f64, the limit is 2^52 + 1 in either posive or
                    // negative directions.
3
                    kv.set_numeric_key("i64", (2_i64.pow(f64::MANTISSA_DIGITS)))
3
                        .execute()?;
                    $crate::assert_f64_eq!(
3
                        kv.get_key("i64").into_f64()?.unwrap(),
3
                        9_007_199_254_740_992_f64
                    );
3
                    kv.set_numeric_key("i64", -(2_i64.pow(f64::MANTISSA_DIGITS)))
3
                        .execute()?;
                    $crate::assert_f64_eq!(
3
                        kv.get_key("i64").into_f64()?.unwrap(),
3
                        -9_007_199_254_740_992_f64
                    );

            
3
                    kv.set_numeric_key("i64", (2_i64.pow(f64::MANTISSA_DIGITS) + 1))
3
                        .execute()?;
3
                    assert!(kv.get_key("i64").into_f64().is_err());
                    $crate::assert_f64_eq!(
3
                        kv.get_key("i64").into_f64_lossy()?.unwrap(),
3
                        9_007_199_254_740_993_f64
                    );
3
                    kv.set_numeric_key("i64", -(2_i64.pow(f64::MANTISSA_DIGITS) + 1))
3
                        .execute()?;
3
                    assert!(kv.get_key("i64").into_f64().is_err());
                    $crate::assert_f64_eq!(
3
                        kv.get_key("i64").into_f64_lossy()?.unwrap(),
3
                        -9_007_199_254_740_993_f64
                    );

            
                    // For i64 -> u64, the only limit is sign.
3
                    kv.set_numeric_key("i64", -1_i64).execute()?;
3
                    assert!(kv.get_key("i64").into_u64().is_err());
3
                    assert_eq!(kv.get_key("i64").into_u64_lossy(true)?.unwrap(), 0_u64);
3
                    assert_eq!(kv.get_key("i64").into_u64_lossy(false)?.unwrap(), u64::MAX);

            
                    // For f64 -> i64, the limit is fractional numbers. Saturating isn't tested in this conversion path.
3
                    kv.set_numeric_key("f64", 1.1_f64).execute()?;
3
                    assert!(kv.get_key("f64").into_i64().is_err());
3
                    assert_eq!(kv.get_key("f64").into_i64_lossy(false)?.unwrap(), 1_i64);
3
                    kv.set_numeric_key("f64", -1.1_f64).execute()?;
3
                    assert!(kv.get_key("f64").into_i64().is_err());
3
                    assert_eq!(kv.get_key("f64").into_i64_lossy(false)?.unwrap(), -1_i64);

            
                    // For f64 -> u64, the limit is fractional numbers or negative numbers. Saturating isn't tested in this conversion path.
3
                    kv.set_numeric_key("f64", 1.1_f64).execute()?;
3
                    assert!(kv.get_key("f64").into_u64().is_err());
3
                    assert_eq!(kv.get_key("f64").into_u64_lossy(false)?.unwrap(), 1_u64);
3
                    kv.set_numeric_key("f64", -1.1_f64).execute()?;
3
                    assert!(kv.get_key("f64").into_u64().is_err());
3
                    assert_eq!(kv.get_key("f64").into_u64_lossy(false)?.unwrap(), 0_u64);

            
                    // For u64 -> i64, the limit is > i64::MAX
3
                    kv.set_numeric_key("u64", i64::MAX as u64 + 1).execute()?;
3
                    assert!(kv.get_key("u64").into_i64().is_err());
3
                    assert_eq!(kv.get_key("u64").into_i64_lossy(true)?.unwrap(), i64::MAX);
3
                    assert_eq!(kv.get_key("u64").into_i64_lossy(false)?.unwrap(), i64::MIN);
                }

            
                // Test that non-numeric keys won't be changed when attempting to incr/decr
3
                kv.set_key("non-numeric", &String::from("test")).execute()?;
3
                assert!(kv.increment_key_by("non-numeric", 1_i64).execute().is_err());
3
                assert!(kv.decrement_key_by("non-numeric", 1_i64).execute().is_err());
                assert_eq!(
3
                    kv.get_key("non-numeric").into::<String>()?.unwrap(),
3
                    String::from("test")
                );

            
                // Test that NaN cannot be stored
3
                kv.set_numeric_key("f64", 0_f64).execute()?;
                assert!(matches!(
3
                    kv.set_numeric_key("f64", f64::NAN).execute(),
                    Err(bonsaidb_core::Error::NotANumber)
                ));
                // Verify the value was unchanged.
3
                $crate::assert_f64_eq!(kv.get_key("f64").into_f64()?.unwrap(), 0.);
                // Try to increment by nan
                assert!(matches!(
3
                    kv.increment_key_by("f64", f64::NAN).execute(),
                    Err(bonsaidb_core::Error::NotANumber)
                ));
3
                $crate::assert_f64_eq!(kv.get_key("f64").into_f64()?.unwrap(), 0.);

            
3
                harness.shutdown()?;

            
3
                Ok(())
3
            }

            
            #[test]
3
            fn kv_expiration_tests() -> anyhow::Result<()> {
                use std::time::Duration;

            
                use $crate::keyvalue::{KeyStatus, KeyValue};

            
3
                let harness = $harness::new($crate::test_util::HarnessTest::KvExpiration)?;
3
                let db = harness.connect()?;

            
3
                loop {
3
                    let kv = db.with_key_namespace("expiration");
3

            
3
                    kv.delete_key("a")?;
3
                    kv.delete_key("b")?;

            
                    // Test that the expiration is updated for key a, but not for key b.
3
                    let timing = $crate::test_util::TimingTest::new(Duration::from_millis(500));
3
                    let r1 = kv
3
                        .set_key("a", &0_u32)
3
                        .expire_in(Duration::from_secs(2))
3
                        .execute()
3
                        .unwrap();
3
                    let r2 = kv
3
                        .set_key("b", &0_u32)
3
                        .expire_in(Duration::from_secs(2))
3
                        .execute()
3
                        .unwrap();
3
                    assert_eq!(r1, KeyStatus::Inserted);
3
                    assert_eq!(r2, KeyStatus::Inserted);
3
                    let r1 = kv
3
                        .set_key("a", &1_u32)
3
                        .expire_in(Duration::from_secs(4))
3
                        .execute()
3
                        .unwrap();
3
                    let r2 = kv
3
                        .set_key("b", &1_u32)
3
                        .expire_in(Duration::from_secs(100))
3
                        .keep_existing_expiration()
3
                        .execute()
3
                        .unwrap();
3

            
3
                    if timing.elapsed() > Duration::from_secs(1) {
                        println!(
                            "Restarting test {}. Took too long {:?}",
                            line!(),
                            timing.elapsed(),
                        );
                        continue;
3
                    }

            
                    assert_eq!(r1, KeyStatus::Updated, "a wasn't an update");
                    assert_eq!(r2, KeyStatus::Updated, "b wasn't an update");

            
3
                    let a = kv.get_key("a").into()?;
                    assert_eq!(a, Some(1u32), "a shouldn't have expired yet");

            
                    // Before checking the value, make sure we haven't elapsed too
                    // much time. If so, just restart the test.
3
                    if !timing.wait_until(Duration::from_secs_f32(3.)) {
                        println!(
                            "Restarting test {}. Took too long {:?}",
                            line!(),
                            timing.elapsed()
                        );
                        continue;
3
                    }
3

            
3
                    assert_eq!(kv.get_key("b").query()?, None, "b never expired");

            
3
                    timing.wait_until(Duration::from_secs_f32(5.));
3
                    assert_eq!(kv.get_key("a").query()?, None, "a never expired");
3
                    break;
3
                }
3
                harness.shutdown()?;

            
3
                Ok(())
3
            }

            
            #[test]
3
            fn delete_expire_tests() -> anyhow::Result<()> {
                use std::time::Duration;

            
                use $crate::keyvalue::{KeyStatus, KeyValue};

            
3
                let harness = $harness::new($crate::test_util::HarnessTest::KvDeleteExpire)?;
3
                let db = harness.connect()?;

            
3
                loop {
3
                    let kv = db.with_key_namespace("delete_expire");
3

            
3
                    kv.delete_key("a")?;

            
3
                    let timing = $crate::test_util::TimingTest::new(Duration::from_millis(100));
3

            
3
                    // Create a key with an expiration. Delete the key. Set a new
3
                    // value at that key with no expiration. Ensure it doesn't
3
                    // expire.
3
                    kv.set_key("a", &0_u32)
3
                        .expire_in(Duration::from_secs(2))
3
                        .execute()?;
3
                    kv.delete_key("a")?;
3
                    kv.set_key("a", &1_u32).execute()?;
3
                    if timing.elapsed() > Duration::from_secs(1) {
                        println!(
                            "Restarting test {}. Took too long {:?}",
                            line!(),
                            timing.elapsed(),
                        );
                        continue;
3
                    }
3
                    if !timing.wait_until(Duration::from_secs_f32(2.5)) {
                        println!(
                            "Restarting test {}. Took too long {:?}",
                            line!(),
                            timing.elapsed()
                        );
                        continue;
3
                    }
3

            
3
                    assert_eq!(kv.get_key("a").into()?, Some(1u32));

            
3
                    break;
3
                }
3
                harness.shutdown()?;

            
3
                Ok(())
3
            }

            
            #[test]
            // This test can fail when the machine its running on is under high load or
            // constrained resources. We need a command that persists (and waits until
            // persist) to make this test fully deterministic.
3
            fn kv_transaction_tests() -> anyhow::Result<()> {
                use std::time::Duration;

            
                use $crate::connection::Connection;
                use $crate::keyvalue::{KeyStatus, KeyValue};
                // loop {
3
                let harness = $harness::new($crate::test_util::HarnessTest::KvTransactions)?;
3
                let db = harness.connect()?;
                // Generate several transactions that we can validate. Persisting
                // happens in the background, so we delay between each step to give
                // it a moment.
3
                db.set_key("expires", &0_u32)
3
                    .expire_in(Duration::from_secs(1))
3
                    .execute()?;
3
                db.set_key("akey", &String::from("avalue")).execute()?;
3
                db.set_numeric_key("nkey", 0_u64).execute()?;
3
                std::thread::sleep(Duration::from_millis(500));
3
                db.increment_key_by("nkey", 1_u64).execute()?;
3
                std::thread::sleep(Duration::from_millis(500));
3
                db.delete_key("nkey")?;
3
                db.get_key("akey").and_delete().query()?;
3
                std::thread::sleep(Duration::from_millis(500));
3
                // Ensure this doesn't generate a transaction.
3
                db.delete_key("nkey")?;

            
3
                std::thread::sleep(Duration::from_secs(1));

            
3
                let transactions = Connection::list_executed_transactions(&db, None, None)?;
3
                let deleted_keys = transactions
3
                    .iter()
3
                    .filter_map(|tx| tx.changes.keys())
3
                    .flatten()
3
                    .filter(|changed_key| dbg!(changed_key).deleted)
3
                    .count();
                assert_eq!(deleted_keys, 3, "deleted keys wasn't 3");
3
                let akey_changes = transactions
3
                    .iter()
3
                    .filter_map(|tx| tx.changes.keys())
3
                    .flatten()
3
                    .filter(|changed_key| changed_key.key == "akey")
3
                    .count();
                assert_eq!(akey_changes, 2, "akey changes wasn't 2");
3
                let nkey_changes = transactions
3
                    .iter()
3
                    .filter_map(|tx| tx.changes.keys())
3
                    .flatten()
3
                    .filter(|changed_key| changed_key.key == "nkey")
3
                    .count();
                assert_eq!(nkey_changes, 3, "nkey changes wasn't 3");

            
3
                harness.shutdown()?;
                // }

            
3
                Ok(())
3
            }
        }
    };
}

            
pub struct TimingTest {
    tolerance: Duration,
    start: Instant,
}

            
impl TimingTest {
    #[must_use]
800
    pub fn new(tolerance: Duration) -> Self {
800
        Self {
800
            tolerance,
800
            start: Instant::now(),
800
        }
800
    }

            
    #[allow(clippy::must_use_candidate)]
440
    pub fn wait_until(&self, absolute_duration: Duration) -> bool {
440
        let target = self.start + absolute_duration;
440
        let mut now = Instant::now();
440
        if let Some(sleep_duration) = target.checked_duration_since(now) {
440
            std::thread::sleep(sleep_duration);
440
            now = Instant::now();
440
        }
440
        let amount_past = now.checked_duration_since(target);
440
        // Return false if we're beyond the tolerance given
440
        amount_past.unwrap_or_default() < self.tolerance
440
    }

            
600
    pub async fn wait_until_async(&self, absolute_duration: Duration) -> bool {
15
        let target = self.start + absolute_duration;
15
        let mut now = Instant::now();
15
        if now < target {
15
            tokio::time::sleep_until(target.into()).await;
15
            now = Instant::now();
        }
15
        let amount_past = now.checked_duration_since(target);
15

            
15
        // Return false if we're beyond the tolerance given
15
        amount_past.unwrap_or_default() < self.tolerance
15
    }

            
    #[must_use]
1120
    pub fn elapsed(&self) -> Duration {
1120
        Instant::now()
1120
            .checked_duration_since(self.start)
1120
            .unwrap_or_default()
1120
    }
}

            
5
pub async fn basic_server_connection_tests<C: AsyncStorageConnection>(
5
    server: C,
5
    newdb_name: &str,
5
) -> anyhow::Result<()> {
5
    let schemas = server.list_available_schemas().await?;

            
5
    let basic_schema = schemas
5
        .iter()
8
        .find(|s| s.name == BasicSchema::schema_name())
5
        .unwrap();
5
    assert!(basic_schema
5
        .collections()
11
        .any(|c| c.name == Basic::collection_name()));
5
    let basic_collection = basic_schema.collection(&Basic::collection_name()).unwrap();
5
    assert!(basic_collection
5
        .views()
14
        .any(|v| v.name == BasicByParentId.view_name()));
5
    let by_parent_id = basic_collection.view(&BasicByParentId.view_name()).unwrap();
5
    assert_eq!(by_parent_id.policy, ViewUpdatePolicy::Lazy);

            
5
    assert!(schemas
5
        .iter()
8
        .any(|s| s.name == SchemaName::new("khonsulabs", "bonsaidb-admin")));

            
5
    let databases = server.list_databases().await?;
16
    assert!(databases.iter().any(|db| db.name == "tests"));

            
5
    server
5
        .create_database::<BasicSchema>(newdb_name, false)
8
        .await?;
5
    server.delete_database(newdb_name).await?;

            
    assert!(matches!(
5
        server.delete_database(newdb_name).await,
        Err(Error::DatabaseNotFound(_))
    ));

            
    assert!(matches!(
5
        server.create_database::<BasicSchema>("tests", false).await,
        Err(Error::DatabaseNameAlreadyTaken(_))
    ));

            
5
    assert!(server
5
        .create_database::<BasicSchema>("tests", true)
8
        .await
5
        .is_ok());

            
    assert!(matches!(
5
        server
5
            .create_database::<BasicSchema>("|invalidname", false)
5
            .await,
        Err(Error::InvalidDatabaseName(_))
    ));

            
    assert!(matches!(
5
        server
5
            .create_database::<UnassociatedCollection>(newdb_name, false)
5
            .await,
        Err(Error::SchemaNotRegistered(_))
    ));

            
5
    Ok(())
5
}

            
3
pub fn blocking_basic_server_connection_tests<C: StorageConnection>(
3
    server: &C,
3
    newdb_name: &str,
3
) -> anyhow::Result<()> {
3
    let schemas = server.list_available_schemas()?;
3
    let basic_schema = schemas
3
        .iter()
4
        .find(|s| s.name == BasicSchema::schema_name())
3
        .unwrap();
3
    assert!(basic_schema
3
        .collections()
5
        .any(|c| c.name == Basic::collection_name()));
3
    let basic_collection = basic_schema.collection(&Basic::collection_name()).unwrap();
3
    assert!(basic_collection
3
        .views()
12
        .any(|v| v.name == BasicByParentId.view_name()));
3
    let by_parent_id = basic_collection.view(&BasicByParentId.view_name()).unwrap();
3
    assert_eq!(by_parent_id.policy, ViewUpdatePolicy::Lazy);

            
3
    assert!(schemas
3
        .iter()
5
        .any(|s| s.name == SchemaName::new("khonsulabs", "bonsaidb-admin")));

            
3
    let databases = server.list_databases()?;
7
    assert!(databases.iter().any(|db| db.name == "tests"));

            
3
    server.create_database::<BasicSchema>(newdb_name, false)?;
3
    server.delete_database(newdb_name)?;

            
3
    assert!(matches!(
3
        server.delete_database(newdb_name),
        Err(Error::DatabaseNotFound(_))
    ));

            
3
    assert!(matches!(
3
        server.create_database::<BasicSchema>("tests", false),
        Err(Error::DatabaseNameAlreadyTaken(_))
    ));

            
3
    assert!(server.create_database::<BasicSchema>("tests", true).is_ok());

            
3
    assert!(matches!(
3
        server.create_database::<BasicSchema>("|invalidname", false),
        Err(Error::InvalidDatabaseName(_))
    ));

            
3
    assert!(matches!(
3
        server.create_database::<UnassociatedCollection>(newdb_name, false),
        Err(Error::SchemaNotRegistered(_))
    ));

            
3
    Ok(())
3
}