1
#![allow(clippy::missing_panics_doc)]
2

            
3
use std::{
4
    fmt::{Debug, Display},
5
    io::ErrorKind,
6
    ops::Deref,
7
    path::{Path, PathBuf},
8
    sync::{
9
        atomic::{AtomicU32, Ordering},
10
        Arc,
11
    },
12
    time::{Duration, Instant},
13
};
14

            
15
use itertools::Itertools;
16
use serde::{Deserialize, Serialize};
17
use transmog_pot::Pot;
18

            
19
use crate::{
20
    admin::{PermissionGroup, Role, User},
21
    connection::{
22
        AccessPolicy, AsyncConnection, AsyncStorageConnection, Connection, StorageConnection,
23
    },
24
    document::{
25
        BorrowedDocument, CollectionDocument, CollectionHeader, DocumentId, Emit, Header, KeyId,
26
    },
27
    keyvalue::{AsyncKeyValue, KeyValue},
28
    limits::{LIST_TRANSACTIONS_DEFAULT_RESULT_COUNT, LIST_TRANSACTIONS_MAX_RESULTS},
29
    schema::{
30
        view::{
31
            map::{Mappings, ViewMappedValue},
32
            ReduceResult, ViewSchema,
33
        },
34
        Collection, CollectionName, MappedValue, NamedCollection, Qualified, Schema, SchemaName,
35
        Schematic, SerializedCollection, View, ViewMapResult,
36
    },
37
    Error,
38
};
39

            
40
1293694
#[derive(Serialize, Deserialize, Debug, PartialEq, Default, Clone, Collection)]
41
// This collection purposely uses names with characters that need
42
// escaping, since it's used in backup/restore.
43
#[collection(name = "_basic", authority = "khonsulabs_", views = [BasicCount, BasicByParentId, BasicByTag, BasicByCategory], core = crate)]
44
#[must_use]
45
pub struct Basic {
46
    pub value: String,
47
    pub category: Option<String>,
48
    pub parent_id: Option<u64>,
49
    pub tags: Vec<String>,
50
}
51

            
52
impl Basic {
53
2700
    pub fn new(value: impl Into<String>) -> Self {
54
2700
        Self {
55
2700
            value: value.into(),
56
2700
            tags: Vec::default(),
57
2700
            category: None,
58
2700
            parent_id: None,
59
2700
        }
60
2700
    }
61

            
62
40
    pub fn with_category(mut self, category: impl Into<String>) -> Self {
63
40
        self.category = Some(category.into());
64
40
        self
65
40
    }
66

            
67
32
    pub fn with_tag(mut self, tag: impl Into<String>) -> Self {
68
32
        self.tags.push(tag.into());
69
32
        self
70
32
    }
71

            
72
1271
    pub const fn with_parent_id(mut self, parent_id: u64) -> Self {
73
1271
        self.parent_id = Some(parent_id);
74
1271
        self
75
1271
    }
76
}
77

            
78
2074368
#[derive(Debug, Clone, View)]
79
#[view(collection = Basic, key = (), value = usize, name = "count", core = crate)]
80
pub struct BasicCount;
81

            
82
impl ViewSchema for BasicCount {
83
    type View = Self;
84

            
85
31
    fn map(&self, document: &BorrowedDocument<'_>) -> ViewMapResult<Self::View> {
86
31
        document.header.emit_key_and_value((), 1)
87
31
    }
88

            
89
31
    fn reduce(
90
31
        &self,
91
31
        mappings: &[ViewMappedValue<Self::View>],
92
31
        _rereduce: bool,
93
31
    ) -> ReduceResult<Self::View> {
94
31
        Ok(mappings.iter().map(|map| map.value).sum())
95
31
    }
96
}
97

            
98
2108404
#[derive(Debug, Clone, View)]
99
#[view(collection = Basic, key = Option<u64>, value = usize, name = "by-parent-id", core = crate)]
100
pub struct BasicByParentId;
101

            
102
impl ViewSchema for BasicByParentId {
103
    type View = Self;
104

            
105
2542
    fn version(&self) -> u64 {
106
2542
        1
107
2542
    }
108

            
109
2759
    fn map(&self, document: &BorrowedDocument<'_>) -> ViewMapResult<Self::View> {
110
2759
        let contents = Basic::document_contents(document)?;
111
2759
        document.header.emit_key_and_value(contents.parent_id, 1)
112
2759
    }
113

            
114
3255
    fn reduce(
115
3255
        &self,
116
3255
        mappings: &[ViewMappedValue<Self::View>],
117
3255
        _rereduce: bool,
118
3255
    ) -> ReduceResult<Self::View> {
119
3255
        Ok(mappings.iter().map(|map| map.value).sum())
120
3255
    }
121
}
122

            
123
2075854
#[derive(Debug, Clone, View)]
124
#[view(collection = Basic, key = String, value = usize, name = "by-category", core = crate)]
125
pub struct BasicByCategory;
126

            
127
impl ViewSchema for BasicByCategory {
128
    type View = Self;
129

            
130
1271
    fn map(&self, document: &BorrowedDocument<'_>) -> ViewMapResult<Self::View> {
131
1271
        let contents = Basic::document_contents(document)?;
132
1271
        if let Some(category) = &contents.category {
133
744
            document
134
744
                .header
135
744
                .emit_key_and_value(category.to_lowercase(), 1)
136
        } else {
137
527
            Ok(Mappings::none())
138
        }
139
1271
    }
140

            
141
496
    fn reduce(
142
496
        &self,
143
496
        mappings: &[ViewMappedValue<Self::View>],
144
496
        _rereduce: bool,
145
496
    ) -> ReduceResult<Self::View> {
146
744
        Ok(mappings.iter().map(|map| map.value).sum())
147
496
    }
148
}
149

            
150
2089215
#[derive(Debug, Clone, View)]
151
#[view(collection = Basic, key = String, value = usize, name = "by-tag", core = crate)]
152
pub struct BasicByTag;
153

            
154
impl ViewSchema for BasicByTag {
155
    type View = Self;
156

            
157
1023
    fn map(&self, document: &BorrowedDocument<'_>) -> ViewMapResult<Self::View> {
158
1023
        let contents = Basic::document_contents(document)?;
159
1023
        contents
160
1023
            .tags
161
1023
            .iter()
162
1488
            .map(|tag| document.header.emit_key_and_value(tag.clone(), 1))
163
1023
            .collect()
164
1023
    }
165

            
166
1736
    fn reduce(
167
1736
        &self,
168
1736
        mappings: &[ViewMappedValue<Self::View>],
169
1736
        _rereduce: bool,
170
1736
    ) -> ReduceResult<Self::View> {
171
2232
        Ok(mappings.iter().map(|map| map.value).sum())
172
1736
    }
173
}
174

            
175
341
#[derive(Debug, Clone, View)]
176
#[view(collection = Basic, key = (), value = (), name = "by-parent-id", core = crate)]
177
pub struct BasicByBrokenParentId;
178

            
179
impl ViewSchema for BasicByBrokenParentId {
180
    type View = Self;
181

            
182
31
    fn map(&self, document: &BorrowedDocument<'_>) -> ViewMapResult<Self::View> {
183
31
        document.header.emit()
184
31
    }
185
}
186

            
187
2586764
#[derive(Serialize, Deserialize, Debug, PartialEq, Default, Clone, Collection)]
188
#[collection(name = "encrypted-basic", authority = "khonsulabs", views = [EncryptedBasicCount, EncryptedBasicByParentId, EncryptedBasicByCategory])]
189
#[collection(encryption_key = Some(KeyId::Master), encryption_optional, core = crate)]
190
#[must_use]
191
pub struct EncryptedBasic {
192
    pub value: String,
193
    pub category: Option<String>,
194
    pub parent_id: Option<u64>,
195
}
196

            
197
impl EncryptedBasic {
198
1
    pub fn new(value: impl Into<String>) -> Self {
199
1
        Self {
200
1
            value: value.into(),
201
1
            category: None,
202
1
            parent_id: None,
203
1
        }
204
1
    }
205

            
206
    pub fn with_category(mut self, category: impl Into<String>) -> Self {
207
        self.category = Some(category.into());
208
        self
209
    }
210

            
211
    pub const fn with_parent_id(mut self, parent_id: u64) -> Self {
212
        self.parent_id = Some(parent_id);
213
        self
214
    }
215
}
216

            
217
1293692
#[derive(Debug, Clone, View)]
218
#[view(collection = EncryptedBasic, key = (), value = usize, name = "count", core = crate)]
219
pub struct EncryptedBasicCount;
220

            
221
impl ViewSchema for EncryptedBasicCount {
222
    type View = Self;
223

            
224
    fn map(&self, document: &BorrowedDocument<'_>) -> ViewMapResult<Self::View> {
225
        document.header.emit_key_and_value((), 1)
226
    }
227

            
228
    fn reduce(
229
        &self,
230
        mappings: &[ViewMappedValue<Self::View>],
231
        _rereduce: bool,
232
    ) -> ReduceResult<Self::View> {
233
        Ok(mappings.iter().map(|map| map.value).sum())
234
    }
235
}
236

            
237
1293692
#[derive(Debug, Clone, View)]
238
#[view(collection = EncryptedBasic, key = Option<u64>, value = usize, name = "by-parent-id", core = crate)]
239
pub struct EncryptedBasicByParentId;
240

            
241
impl ViewSchema for EncryptedBasicByParentId {
242
    type View = Self;
243

            
244
    fn map(&self, document: &BorrowedDocument<'_>) -> ViewMapResult<Self::View> {
245
        let contents = EncryptedBasic::document_contents(document)?;
246
        document.header.emit_key_and_value(contents.parent_id, 1)
247
    }
248

            
249
    fn reduce(
250
        &self,
251
        mappings: &[ViewMappedValue<Self::View>],
252
        _rereduce: bool,
253
    ) -> ReduceResult<Self::View> {
254
        Ok(mappings.iter().map(|map| map.value).sum())
255
    }
256
}
257

            
258
1293692
#[derive(Debug, Clone, View)]
259
#[view(collection = EncryptedBasic, key = String, value = usize, name = "by-category", core = crate)]
260
pub struct EncryptedBasicByCategory;
261

            
262
impl ViewSchema for EncryptedBasicByCategory {
263
    type View = Self;
264

            
265
    fn map(&self, document: &BorrowedDocument<'_>) -> ViewMapResult<Self::View> {
266
        let contents = EncryptedBasic::document_contents(document)?;
267
        if let Some(category) = &contents.category {
268
            document
269
                .header
270
                .emit_key_and_value(category.to_lowercase(), 1)
271
        } else {
272
            Ok(Mappings::none())
273
        }
274
    }
275

            
276
    fn reduce(
277
        &self,
278
        mappings: &[ViewMappedValue<Self::View>],
279
        _rereduce: bool,
280
    ) -> ReduceResult<Self::View> {
281
        Ok(mappings.iter().map(|map| map.value).sum())
282
    }
283
}
284

            
285
1293382
#[derive(Debug, Schema)]
286
#[schema(name = "basic", collections = [Basic, EncryptedBasic, Unique], core = crate)]
287
pub struct BasicSchema;
288

            
289
1293382
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Default, Collection)]
290
#[collection(name = "unique", authority = "khonsulabs", views = [UniqueValue], core = crate)]
291
pub struct Unique {
292
    pub value: String,
293
}
294

            
295
impl Unique {
296
40
    pub fn new(value: impl Display) -> Self {
297
40
        Self {
298
40
            value: value.to_string(),
299
40
        }
300
40
    }
301
}
302

            
303
1307518
#[derive(Debug, Clone, View)]
304
#[view(collection = Unique, key = String, value = (), name = "unique-value", core = crate)]
305
pub struct UniqueValue;
306

            
307
impl ViewSchema for UniqueValue {
308
    type View = Self;
309

            
310
1300574
    fn unique(&self) -> bool {
311
1300574
        true
312
1300574
    }
313

            
314
1984
    fn map(&self, document: &BorrowedDocument<'_>) -> ViewMapResult<Self::View> {
315
1984
        let entry = Unique::document_contents(document)?;
316
1984
        document.header.emit_key(entry.value)
317
1984
    }
318
}
319

            
320
impl NamedCollection for Unique {
321
    type ByNameView = UniqueValue;
322
}
323

            
324
pub struct TestDirectory(pub PathBuf);
325

            
326
impl TestDirectory {
327
    pub fn absolute<S: AsRef<Path>>(path: S) -> Self {
328
        let path = path.as_ref().to_owned();
329
        if path.exists() {
330
            std::fs::remove_dir_all(&path).expect("error clearing temporary directory");
331
        }
332
        Self(path)
333
    }
334
184
    pub fn new<S: AsRef<Path>>(name: S) -> Self {
335
184
        let path = std::env::temp_dir().join(name);
336
184
        if path.exists() {
337
            std::fs::remove_dir_all(&path).expect("error clearing temporary directory");
338
184
        }
339
184
        Self(path)
340
184
    }
341
}
342

            
343
impl Drop for TestDirectory {
344
    fn drop(&mut self) {
345
5673
        if let Err(err) = std::fs::remove_dir_all(&self.0) {
346
            if err.kind() != ErrorKind::NotFound {
347
                eprintln!("Failed to clean up temporary folder: {:?}", err);
348
            }
349
5673
        }
350
5673
    }
351
}
352

            
353
impl AsRef<Path> for TestDirectory {
354
6138
    fn as_ref(&self) -> &Path {
355
6138
        &self.0
356
6138
    }
357
}
358

            
359
impl Deref for TestDirectory {
360
    type Target = PathBuf;
361

            
362
31
    fn deref(&self) -> &Self::Target {
363
31
        &self.0
364
31
    }
365
}
366

            
367
#[derive(Debug)]
368
pub struct BasicCollectionWithNoViews;
369

            
370
impl Collection for BasicCollectionWithNoViews {
371
    type PrimaryKey = u64;
372

            
373
248
    fn collection_name() -> CollectionName {
374
248
        Basic::collection_name()
375
248
    }
376

            
377
62
    fn define_views(_schema: &mut Schematic) -> Result<(), Error> {
378
62
        Ok(())
379
62
    }
380
}
381

            
382
impl SerializedCollection for BasicCollectionWithNoViews {
383
    type Contents = Basic;
384
    type Format = Pot;
385

            
386
31
    fn format() -> Self::Format {
387
31
        Pot::default()
388
31
    }
389
}
390

            
391
#[derive(Debug)]
392
pub struct BasicCollectionWithOnlyBrokenParentId;
393

            
394
impl Collection for BasicCollectionWithOnlyBrokenParentId {
395
    type PrimaryKey = u64;
396

            
397
217
    fn collection_name() -> CollectionName {
398
217
        Basic::collection_name()
399
217
    }
400

            
401
62
    fn define_views(schema: &mut Schematic) -> Result<(), Error> {
402
62
        schema.define_view(BasicByBrokenParentId)
403
62
    }
404
}
405

            
406
496
#[derive(Serialize, Deserialize, Clone, Debug, Collection)]
407
#[collection(name = "unassociated", authority = "khonsulabs", core = crate)]
408
pub struct UnassociatedCollection;
409

            
410
7657
#[derive(Copy, Clone, Debug)]
411
pub enum HarnessTest {
412
    ServerConnectionTests = 1,
413
    StoreRetrieveUpdate,
414
    NotFound,
415
    Conflict,
416
    BadUpdate,
417
    NoUpdate,
418
    GetMultiple,
419
    List,
420
    ListTransactions,
421
    ViewQuery,
422
    UnassociatedCollection,
423
    Compact,
424
    ViewUpdate,
425
    ViewMultiEmit,
426
    ViewUnimplementedReduce,
427
    ViewAccessPolicies,
428
    Encryption,
429
    UniqueViews,
430
    NamedCollection,
431
    PubSubSimple,
432
    UserManagement,
433
    PubSubMultipleSubscribers,
434
    PubSubDropAndSend,
435
    PubSubUnsubscribe,
436
    PubSubDropCleanup,
437
    PubSubPublishAll,
438
    KvBasic,
439
    KvConcurrency,
440
    KvSet,
441
    KvIncrementDecrement,
442
    KvExpiration,
443
    KvDeleteExpire,
444
    KvTransactions,
445
}
446

            
447
impl HarnessTest {
448
    #[must_use]
449
    pub const fn port(self, base: u16) -> u16 {
450
        base + self as u16
451
    }
452
}
453

            
454
impl Display for HarnessTest {
455
7688
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
456
7688
        Debug::fmt(&self, f)
457
7688
    }
458
}
459

            
460
/// Compares two f64's accounting for the epsilon.
461
#[macro_export]
462
macro_rules! assert_f64_eq {
463
    ($a:expr, $b:expr) => {{
464
        let a: f64 = $a;
465
        let b: f64 = $b;
466
        assert!((a - b).abs() <= f64::EPSILON, "{:?} <> {:?}", a, b);
467
    }};
468
}
469

            
470
/// Creates a test suite that tests methods available on [`AsyncConnection`]
471
#[macro_export]
472
macro_rules! define_async_connection_test_suite {
473
    ($harness:ident) => {
474
        #[cfg(test)]
475
        mod r#async_connection {
476
            use super::$harness;
477
            #[tokio::test]
478
5
            async fn server_connection_tests() -> anyhow::Result<()> {
479
                let harness =
480
                    $harness::new($crate::test_util::HarnessTest::ServerConnectionTests).await?;
481
                let db = harness.server();
482
                $crate::test_util::basic_server_connection_tests(
483
                    db.clone(),
484
                    &format!("server-connection-tests-{}", $harness::server_name()),
485
                )
486
                .await?;
487
                harness.shutdown().await
488
            }
489

            
490
            #[tokio::test]
491
5
            async fn store_retrieve_update_delete() -> anyhow::Result<()> {
492
                let harness =
493
                    $harness::new($crate::test_util::HarnessTest::StoreRetrieveUpdate).await?;
494
                let db = harness.connect().await?;
495
                $crate::test_util::store_retrieve_update_delete_tests(&db).await?;
496
                harness.shutdown().await
497
            }
498

            
499
            #[tokio::test]
500
5
            async fn not_found() -> anyhow::Result<()> {
501
                let harness = $harness::new($crate::test_util::HarnessTest::NotFound).await?;
502
                let db = harness.connect().await?;
503

            
504
                $crate::test_util::not_found_tests(&db).await?;
505
                harness.shutdown().await
506
            }
507

            
508
            #[tokio::test]
509
5
            async fn conflict() -> anyhow::Result<()> {
510
                let harness = $harness::new($crate::test_util::HarnessTest::Conflict).await?;
511
                let db = harness.connect().await?;
512

            
513
                $crate::test_util::conflict_tests(&db).await?;
514
                harness.shutdown().await
515
            }
516

            
517
            #[tokio::test]
518
5
            async fn bad_update() -> anyhow::Result<()> {
519
                let harness = $harness::new($crate::test_util::HarnessTest::BadUpdate).await?;
520
                let db = harness.connect().await?;
521

            
522
                $crate::test_util::bad_update_tests(&db).await?;
523
                harness.shutdown().await
524
            }
525

            
526
            #[tokio::test]
527
5
            async fn no_update() -> anyhow::Result<()> {
528
                let harness = $harness::new($crate::test_util::HarnessTest::NoUpdate).await?;
529
                let db = harness.connect().await?;
530

            
531
                $crate::test_util::no_update_tests(&db).await?;
532
                harness.shutdown().await
533
            }
534

            
535
            #[tokio::test]
536
5
            async fn get_multiple() -> anyhow::Result<()> {
537
                let harness = $harness::new($crate::test_util::HarnessTest::GetMultiple).await?;
538
                let db = harness.connect().await?;
539

            
540
                $crate::test_util::get_multiple_tests(&db).await?;
541
                harness.shutdown().await
542
            }
543

            
544
            #[tokio::test]
545
5
            async fn list() -> anyhow::Result<()> {
546
                let harness = $harness::new($crate::test_util::HarnessTest::List).await?;
547
                let db = harness.connect().await?;
548

            
549
                $crate::test_util::list_tests(&db).await?;
550
                harness.shutdown().await
551
            }
552

            
553
            #[tokio::test]
554
5
            async fn list_transactions() -> anyhow::Result<()> {
555
                let harness =
556
                    $harness::new($crate::test_util::HarnessTest::ListTransactions).await?;
557
                let db = harness.connect().await?;
558

            
559
                $crate::test_util::list_transactions_tests(&db).await?;
560
                harness.shutdown().await
561
            }
562

            
563
            #[tokio::test]
564
5
            async fn view_query() -> anyhow::Result<()> {
565
                let harness = $harness::new($crate::test_util::HarnessTest::ViewQuery).await?;
566
                let db = harness.connect().await?;
567

            
568
                $crate::test_util::view_query_tests(&db).await?;
569
                harness.shutdown().await
570
            }
571

            
572
            #[tokio::test]
573
5
            async fn unassociated_collection() -> anyhow::Result<()> {
574
                let harness =
575
                    $harness::new($crate::test_util::HarnessTest::UnassociatedCollection).await?;
576
                let db = harness.connect().await?;
577

            
578
                $crate::test_util::unassociated_collection_tests(&db).await?;
579
                harness.shutdown().await
580
            }
581

            
582
            #[tokio::test]
583
5
            async fn unimplemented_reduce() -> anyhow::Result<()> {
584
                let harness =
585
                    $harness::new($crate::test_util::HarnessTest::ViewUnimplementedReduce).await?;
586
                let db = harness.connect().await?;
587

            
588
                $crate::test_util::unimplemented_reduce(&db).await?;
589
                harness.shutdown().await
590
            }
591

            
592
            #[tokio::test]
593
5
            async fn view_update() -> anyhow::Result<()> {
594
                let harness = $harness::new($crate::test_util::HarnessTest::ViewUpdate).await?;
595
                let db = harness.connect().await?;
596

            
597
                $crate::test_util::view_update_tests(&db).await?;
598
                harness.shutdown().await
599
            }
600

            
601
            #[tokio::test]
602
5
            async fn view_multi_emit() -> anyhow::Result<()> {
603
                let harness = $harness::new($crate::test_util::HarnessTest::ViewMultiEmit).await?;
604
                let db = harness.connect().await?;
605

            
606
                $crate::test_util::view_multi_emit_tests(&db).await?;
607
                harness.shutdown().await
608
            }
609

            
610
            #[tokio::test]
611
5
            async fn view_access_policies() -> anyhow::Result<()> {
612
                let harness =
613
                    $harness::new($crate::test_util::HarnessTest::ViewAccessPolicies).await?;
614
                let db = harness.connect().await?;
615

            
616
                $crate::test_util::view_access_policy_tests(&db).await?;
617
                harness.shutdown().await
618
            }
619

            
620
            #[tokio::test]
621
5
            async fn unique_views() -> anyhow::Result<()> {
622
                let harness = $harness::new($crate::test_util::HarnessTest::UniqueViews).await?;
623
                let db = harness.connect().await?;
624

            
625
                $crate::test_util::unique_view_tests(&db).await?;
626
                harness.shutdown().await
627
            }
628

            
629
            #[tokio::test]
630
5
            async fn named_collection() -> anyhow::Result<()> {
631
                let harness =
632
                    $harness::new($crate::test_util::HarnessTest::NamedCollection).await?;
633
                let db = harness.connect().await?;
634

            
635
                $crate::test_util::named_collection_tests(&db).await?;
636
                harness.shutdown().await
637
            }
638

            
639
            #[tokio::test]
640
5
            async fn user_management() -> anyhow::Result<()> {
641
                use $crate::connection::AsyncStorageConnection;
642
                let harness = $harness::new($crate::test_util::HarnessTest::UserManagement).await?;
643
                let _db = harness.connect().await?;
644
                let server = harness.server();
645
                let admin = server
646
                    .database::<$crate::admin::Admin>($crate::admin::ADMIN_DATABASE_NAME)
647
                    .await?;
648

            
649
                $crate::test_util::user_management_tests(
650
                    &admin,
651
                    server.clone(),
652
                    $harness::server_name(),
653
                )
654
                .await?;
655
                harness.shutdown().await
656
            }
657

            
658
            #[tokio::test]
659
5
            async fn compaction() -> anyhow::Result<()> {
660
                let harness = $harness::new($crate::test_util::HarnessTest::Compact).await?;
661
                let db = harness.connect().await?;
662

            
663
                $crate::test_util::compaction_tests(&db).await?;
664
                harness.shutdown().await
665
            }
666
        }
667
    };
668
}
669

            
670
/// Creates a test suite that tests methods available on [`AsyncConnection`]
671
#[macro_export]
672
macro_rules! define_blocking_connection_test_suite {
673
    ($harness:ident) => {
674
        #[cfg(test)]
675
        mod blocking_connection {
676
            use super::$harness;
677
            #[test]
678
3
            fn server_connection_tests() -> anyhow::Result<()> {
679
3
                let harness = $harness::new($crate::test_util::HarnessTest::ServerConnectionTests)?;
680
3
                let db = harness.server();
681
3
                $crate::test_util::blocking_basic_server_connection_tests(
682
3
                    db,
683
3
                    &format!("server-connection-tests-{}", $harness::server_name()),
684
3
                )?;
685
3
                harness.shutdown()
686
3
            }
687

            
688
            #[test]
689
3
            fn store_retrieve_update_delete() -> anyhow::Result<()> {
690
3
                let harness = $harness::new($crate::test_util::HarnessTest::StoreRetrieveUpdate)?;
691
3
                let db = harness.connect()?;
692
3
                $crate::test_util::blocking_store_retrieve_update_delete_tests(&db)?;
693
3
                harness.shutdown()
694
3
            }
695

            
696
            #[test]
697
3
            fn not_found() -> anyhow::Result<()> {
698
3
                let harness = $harness::new($crate::test_util::HarnessTest::NotFound)?;
699
3
                let db = harness.connect()?;
700

            
701
3
                $crate::test_util::blocking_not_found_tests(&db)?;
702
3
                harness.shutdown()
703
3
            }
704

            
705
            #[test]
706
3
            fn conflict() -> anyhow::Result<()> {
707
3
                let harness = $harness::new($crate::test_util::HarnessTest::Conflict)?;
708
3
                let db = harness.connect()?;
709

            
710
3
                $crate::test_util::blocking_conflict_tests(&db)?;
711
3
                harness.shutdown()
712
3
            }
713

            
714
            #[test]
715
3
            fn bad_update() -> anyhow::Result<()> {
716
3
                let harness = $harness::new($crate::test_util::HarnessTest::BadUpdate)?;
717
3
                let db = harness.connect()?;
718

            
719
3
                $crate::test_util::blocking_bad_update_tests(&db)?;
720
3
                harness.shutdown()
721
3
            }
722

            
723
            #[test]
724
3
            fn no_update() -> anyhow::Result<()> {
725
3
                let harness = $harness::new($crate::test_util::HarnessTest::NoUpdate)?;
726
3
                let db = harness.connect()?;
727

            
728
3
                $crate::test_util::blocking_no_update_tests(&db)?;
729
3
                harness.shutdown()
730
3
            }
731

            
732
            #[test]
733
3
            fn get_multiple() -> anyhow::Result<()> {
734
3
                let harness = $harness::new($crate::test_util::HarnessTest::GetMultiple)?;
735
3
                let db = harness.connect()?;
736

            
737
3
                $crate::test_util::blocking_get_multiple_tests(&db)?;
738
3
                harness.shutdown()
739
3
            }
740

            
741
            #[test]
742
3
            fn list() -> anyhow::Result<()> {
743
3
                let harness = $harness::new($crate::test_util::HarnessTest::List)?;
744
3
                let db = harness.connect()?;
745

            
746
3
                $crate::test_util::blocking_list_tests(&db)?;
747
3
                harness.shutdown()
748
3
            }
749

            
750
            #[test]
751
3
            fn list_transactions() -> anyhow::Result<()> {
752
3
                let harness = $harness::new($crate::test_util::HarnessTest::ListTransactions)?;
753
3
                let db = harness.connect()?;
754

            
755
3
                $crate::test_util::blocking_list_transactions_tests(&db)?;
756
3
                harness.shutdown()
757
3
            }
758

            
759
            #[test]
760
3
            fn view_query() -> anyhow::Result<()> {
761
3
                let harness = $harness::new($crate::test_util::HarnessTest::ViewQuery)?;
762
3
                let db = harness.connect()?;
763

            
764
3
                $crate::test_util::blocking_view_query_tests(&db)?;
765
3
                harness.shutdown()
766
3
            }
767

            
768
            #[test]
769
3
            fn unassociated_collection() -> anyhow::Result<()> {
770
3
                let harness =
771
3
                    $harness::new($crate::test_util::HarnessTest::UnassociatedCollection)?;
772
3
                let db = harness.connect()?;
773

            
774
3
                $crate::test_util::blocking_unassociated_collection_tests(&db)?;
775
3
                harness.shutdown()
776
3
            }
777

            
778
            #[test]
779
3
            fn unimplemented_reduce() -> anyhow::Result<()> {
780
3
                let harness =
781
3
                    $harness::new($crate::test_util::HarnessTest::ViewUnimplementedReduce)?;
782
3
                let db = harness.connect()?;
783

            
784
3
                $crate::test_util::blocking_unimplemented_reduce(&db)?;
785
3
                harness.shutdown()
786
3
            }
787

            
788
            #[test]
789
3
            fn view_update() -> anyhow::Result<()> {
790
3
                let harness = $harness::new($crate::test_util::HarnessTest::ViewUpdate)?;
791
3
                let db = harness.connect()?;
792

            
793
3
                $crate::test_util::blocking_view_update_tests(&db)?;
794
3
                harness.shutdown()
795
3
            }
796

            
797
            #[test]
798
3
            fn view_multi_emit() -> anyhow::Result<()> {
799
3
                let harness = $harness::new($crate::test_util::HarnessTest::ViewMultiEmit)?;
800
3
                let db = harness.connect()?;
801

            
802
3
                $crate::test_util::blocking_view_multi_emit_tests(&db)?;
803
3
                harness.shutdown()
804
3
            }
805

            
806
            #[test]
807
3
            fn view_access_policies() -> anyhow::Result<()> {
808
3
                let harness = $harness::new($crate::test_util::HarnessTest::ViewAccessPolicies)?;
809
3
                let db = harness.connect()?;
810

            
811
3
                $crate::test_util::blocking_view_access_policy_tests(&db)?;
812
3
                harness.shutdown()
813
3
            }
814

            
815
            #[test]
816
3
            fn unique_views() -> anyhow::Result<()> {
817
3
                let harness = $harness::new($crate::test_util::HarnessTest::UniqueViews)?;
818
3
                let db = harness.connect()?;
819

            
820
3
                $crate::test_util::blocking_unique_view_tests(&db)?;
821
3
                harness.shutdown()
822
3
            }
823

            
824
            #[test]
825
3
            fn named_collection() -> anyhow::Result<()> {
826
3
                let harness = $harness::new($crate::test_util::HarnessTest::NamedCollection)?;
827
3
                let db = harness.connect()?;
828

            
829
3
                $crate::test_util::blocking_named_collection_tests(&db)?;
830
3
                harness.shutdown()
831
3
            }
832

            
833
            #[test]
834
3
            fn user_management() -> anyhow::Result<()> {
835
                use $crate::connection::StorageConnection;
836
3
                let harness = $harness::new($crate::test_util::HarnessTest::UserManagement)?;
837
3
                let _db = harness.connect()?;
838
3
                let server = harness.server();
839
3
                let admin =
840
3
                    server.database::<$crate::admin::Admin>($crate::admin::ADMIN_DATABASE_NAME)?;
841

            
842
3
                $crate::test_util::blocking_user_management_tests(
843
3
                    &admin,
844
3
                    server,
845
3
                    $harness::server_name(),
846
3
                )?;
847
3
                harness.shutdown()
848
3
            }
849

            
850
            #[test]
851
3
            fn compaction() -> anyhow::Result<()> {
852
3
                let harness = $harness::new($crate::test_util::HarnessTest::Compact)?;
853
3
                let db = harness.connect()?;
854

            
855
3
                $crate::test_util::blocking_compaction_tests(&db)?;
856
3
                harness.shutdown()
857
3
            }
858
        }
859
    };
860
}
861

            
862
506
pub async fn store_retrieve_update_delete_tests<C: AsyncConnection>(db: &C) -> anyhow::Result<()> {
863
506
    let original_value = Basic::new("initial_value");
864
506
    let collection = db.collection::<Basic>();
865
2115
    let header = collection.push(&original_value).await?;
866

            
867
506
    let mut doc = collection
868
1467
        .get(header.id)
869
1467
        .await?
870
506
        .expect("couldn't retrieve stored item");
871
506
    let mut value = Basic::document_contents(&doc)?;
872
506
    assert_eq!(original_value, value);
873
506
    let old_revision = doc.header.revision;
874
506

            
875
506
    // Update the value
876
506
    value.value = String::from("updated_value");
877
506
    Basic::set_document_contents(&mut doc, value.clone())?;
878
1640
    db.update::<Basic, _>(&mut doc).await?;
879

            
880
    // update should cause the revision to be changed
881
506
    assert_ne!(doc.header.revision, old_revision);
882

            
883
    // Check the value in the database to ensure it has the new document
884
506
    let doc = collection
885
1438
        .get(header.id)
886
1438
        .await?
887
506
        .expect("couldn't retrieve stored item");
888
506
    assert_eq!(Basic::document_contents(&doc)?, value);
889

            
890
    // These operations should have created two transactions with one change each
891
1502
    let transactions = db.list_executed_transactions(None, None).await?;
892
506
    assert_eq!(transactions.len(), 2);
893
506
    assert!(transactions[0].id < transactions[1].id);
894
1518
    for transaction in &transactions {
895
1012
        let changes = transaction
896
1012
            .changes
897
1012
            .documents()
898
1012
            .expect("incorrect transaction type");
899
1012
        assert_eq!(changes.documents.len(), 1);
900
1012
        assert_eq!(changes.collections.len(), 1);
901
1012
        assert_eq!(changes.collections[0], Basic::collection_name());
902
1012
        assert_eq!(changes.documents[0].collection, 0);
903
1012
        assert_eq!(header.id, changes.documents[0].id.deserialize()?);
904
1012
        assert!(!changes.documents[0].deleted);
905
    }
906

            
907
1680
    db.collection::<Basic>().delete(&doc).await?;
908
1606
    assert!(collection.get(header.id).await?.is_none());
909
506
    let transactions = db
910
1610
        .list_executed_transactions(Some(transactions.last().as_ref().unwrap().id + 1), None)
911
1610
        .await?;
912
506
    assert_eq!(transactions.len(), 1);
913
506
    let transaction = transactions.first().unwrap();
914
506
    let changes = transaction
915
506
        .changes
916
506
        .documents()
917
506
        .expect("incorrect transaction type");
918
506
    assert_eq!(changes.documents.len(), 1);
919
506
    assert_eq!(changes.collections[0], Basic::collection_name());
920
506
    assert_eq!(header.id, changes.documents[0].id.deserialize()?);
921
506
    assert!(changes.documents[0].deleted);
922

            
923
    // Use the Collection interface
924
1822
    let mut doc = original_value.clone().push_into_async(db).await?;
925
506
    doc.contents.category = Some(String::from("updated"));
926
1736
    doc.update_async(db).await?;
927
1515
    let reloaded = Basic::get_async(doc.header.id, db).await?.unwrap();
928
506
    assert_eq!(doc.contents, reloaded.contents);
929

            
930
    // Test Connection::insert with a specified id
931
506
    let doc = BorrowedDocument::with_contents::<Basic>(42, &Basic::new("42"))?;
932
506
    let document_42 = db
933
1609
        .insert::<Basic, _, _>(Some(doc.header.id), doc.contents.into_vec())
934
1609
        .await?;
935
506
    assert_eq!(document_42.id, 42);
936
1591
    let document_43 = Basic::new("43").insert_into_async(43, db).await?;
937
506
    assert_eq!(document_43.header.id, 43);
938

            
939
    // Test that inserting a document with the same ID results in a conflict:
940
506
    let conflict_err = Basic::new("43")
941
1562
        .insert_into_async(doc.header.id, db)
942
1562
        .await
943
506
        .unwrap_err();
944
506
    assert!(matches!(conflict_err.error, Error::DocumentConflict(..)));
945

            
946
    // Test that overwriting works
947
506
    let overwritten = Basic::new("43")
948
1606
        .overwrite_into_async(doc.header.id, db)
949
1606
        .await
950
506
        .unwrap();
951
506
    assert!(overwritten.header.revision.id > doc.header.revision.id);
952

            
953
506
    Ok(())
954
506
}
955

            
956
3
pub fn blocking_store_retrieve_update_delete_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
957
3
    let original_value = Basic::new("initial_value");
958
3
    let collection = db.collection::<Basic>();
959
3
    let header = collection.push(&original_value)?;
960

            
961
3
    let mut doc = collection
962
3
        .get(header.id)?
963
3
        .expect("couldn't retrieve stored item");
964
3
    let mut value = Basic::document_contents(&doc)?;
965
3
    assert_eq!(original_value, value);
966
3
    let old_revision = doc.header.revision;
967
3

            
968
3
    // Update the value
969
3
    value.value = String::from("updated_value");
970
3
    Basic::set_document_contents(&mut doc, value.clone())?;
971
3
    db.update::<Basic, _>(&mut doc)?;
972

            
973
    // update should cause the revision to be changed
974
3
    assert_ne!(doc.header.revision, old_revision);
975

            
976
    // Check the value in the database to ensure it has the new document
977
3
    let doc = collection
978
3
        .get(header.id)?
979
3
        .expect("couldn't retrieve stored item");
980
3
    assert_eq!(Basic::document_contents(&doc)?, value);
981

            
982
    // These operations should have created two transactions with one change each
983
3
    let transactions = db.list_executed_transactions(None, None)?;
984
3
    assert_eq!(transactions.len(), 2);
985
3
    assert!(transactions[0].id < transactions[1].id);
986
9
    for transaction in &transactions {
987
6
        let changes = transaction
988
6
            .changes
989
6
            .documents()
990
6
            .expect("incorrect transaction type");
991
6
        assert_eq!(changes.documents.len(), 1);
992
6
        assert_eq!(changes.collections.len(), 1);
993
6
        assert_eq!(changes.collections[0], Basic::collection_name());
994
6
        assert_eq!(changes.documents[0].collection, 0);
995
6
        assert_eq!(header.id, changes.documents[0].id.deserialize()?);
996
6
        assert!(!changes.documents[0].deleted);
997
    }
998

            
999
3
    db.collection::<Basic>().delete(&doc)?;
3
    assert!(collection.get(header.id)?.is_none());
3
    let transactions =
3
        db.list_executed_transactions(Some(transactions.last().as_ref().unwrap().id + 1), None)?;
3
    assert_eq!(transactions.len(), 1);
3
    let transaction = transactions.first().unwrap();
3
    let changes = transaction
3
        .changes
3
        .documents()
3
        .expect("incorrect transaction type");
3
    assert_eq!(changes.documents.len(), 1);
3
    assert_eq!(changes.collections[0], Basic::collection_name());
3
    assert_eq!(header.id, changes.documents[0].id.deserialize()?);
3
    assert!(changes.documents[0].deleted);

            
    // Use the Collection interface
3
    let mut doc = original_value.push_into(db)?;
3
    doc.contents.category = Some(String::from("updated"));
3
    doc.update(db)?;
3
    let reloaded = Basic::get(doc.header.id, db)?.unwrap();
3
    assert_eq!(doc.contents, reloaded.contents);

            
    // Test Connection::insert with a specified id
3
    let doc = BorrowedDocument::with_contents::<Basic>(42, &Basic::new("42"))?;
3
    let document_42 = db.insert::<Basic, _, _>(Some(doc.header.id), doc.contents.into_vec())?;
3
    assert_eq!(document_42.id, 42);
3
    let document_43 = Basic::new("43").insert_into(43, db)?;
3
    assert_eq!(document_43.header.id, 43);

            
    // Test that inserting a document with the same ID results in a conflict:
3
    let conflict_err = Basic::new("43").insert_into(doc.header.id, db).unwrap_err();
3
    assert!(matches!(conflict_err.error, Error::DocumentConflict(..)));

            
    // Test that overwriting works
3
    let overwritten = Basic::new("43").overwrite_into(doc.header.id, db).unwrap();
3
    assert!(overwritten.header.revision.id > doc.header.revision.id);

            
3
    Ok(())
3
}

            
5
pub async fn not_found_tests<C: AsyncConnection>(db: &C) -> anyhow::Result<()> {
5
    assert!(db.collection::<Basic>().get(1).await?.is_none());

            
5
    assert!(db.last_transaction_id().await?.is_none());

            
5
    Ok(())
5
}

            
3
pub fn blocking_not_found_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
3
    assert!(db.collection::<Basic>().get(1)?.is_none());

            
3
    assert!(db.last_transaction_id()?.is_none());

            
3
    Ok(())
3
}

            
5
pub async fn conflict_tests<C: AsyncConnection>(db: &C) -> anyhow::Result<()> {
5
    let original_value = Basic::new("initial_value");
5
    let collection = db.collection::<Basic>();
5
    let header = collection.push(&original_value).await?;

            
5
    let mut doc = collection
5
        .get(header.id)
5
        .await?
5
        .expect("couldn't retrieve stored item");
5
    let mut value = Basic::document_contents(&doc)?;
5
    value.value = String::from("updated_value");
5
    Basic::set_document_contents(&mut doc, value.clone())?;
5
    db.update::<Basic, _>(&mut doc).await?;

            
    // To generate a conflict, let's try to do the same update again by
    // reverting the header
5
    doc.header = Header::try_from(header).unwrap();
5
    match db
5
        .update::<Basic, _>(&mut doc)
5
        .await
5
        .expect_err("conflict should have generated an error")
    {
5
        Error::DocumentConflict(collection, header) => {
5
            assert_eq!(collection, Basic::collection_name());
5
            assert_eq!(header.id, doc.header.id);
        }
        other => return Err(anyhow::Error::from(other)),
    }

            
    // Let's force an update through overwrite. After this succeeds, the header
    // is updated to the new revision.
5
    db.collection::<Basic>().overwrite(&mut doc).await.unwrap();

            
    // Now, let's use the CollectionDocument API to modify the document through a refetch.
5
    let mut doc = CollectionDocument::<Basic>::try_from(&doc)?;
5
    doc.modify_async(db, |doc| {
5
        doc.contents.value = String::from("modify worked");
5
    })
5
    .await?;
5
    assert_eq!(doc.contents.value, "modify worked");
5
    let doc = Basic::get_async(doc.header.id, db).await?.unwrap();
5
    assert_eq!(doc.contents.value, "modify worked");

            
5
    Ok(())
5
}

            
3
pub fn blocking_conflict_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
3
    let original_value = Basic::new("initial_value");
3
    let collection = db.collection::<Basic>();
3
    let header = collection.push(&original_value)?;

            
3
    let mut doc = collection
3
        .get(header.id)?
3
        .expect("couldn't retrieve stored item");
3
    let mut value = Basic::document_contents(&doc)?;
3
    value.value = String::from("updated_value");
3
    Basic::set_document_contents(&mut doc, value)?;
3
    db.update::<Basic, _>(&mut doc)?;

            
    // To generate a conflict, let's try to do the same update again by
    // reverting the header
3
    doc.header = Header::try_from(header).unwrap();
3
    match db
3
        .update::<Basic, _>(&mut doc)
3
        .expect_err("conflict should have generated an error")
    {
3
        Error::DocumentConflict(collection, header) => {
3
            assert_eq!(collection, Basic::collection_name());
3
            assert_eq!(header.id, doc.header.id);
        }
        other => return Err(anyhow::Error::from(other)),
    }

            
    // Let's force an update through overwrite. After this succeeds, the header
    // is updated to the new revision.
3
    db.collection::<Basic>().overwrite(&mut doc).unwrap();

            
    // Now, let's use the CollectionDocument API to modify the document through a refetch.
3
    let mut doc = CollectionDocument::<Basic>::try_from(&doc)?;
3
    doc.modify(db, |doc| {
3
        doc.contents.value = String::from("modify worked");
3
    })?;
3
    assert_eq!(doc.contents.value, "modify worked");
3
    let doc = Basic::get(doc.header.id, db)?.unwrap();
3
    assert_eq!(doc.contents.value, "modify worked");

            
3
    Ok(())
3
}

            
5
pub async fn bad_update_tests<C: AsyncConnection>(db: &C) -> anyhow::Result<()> {
5
    let mut doc = BorrowedDocument::with_contents::<Basic>(1, &Basic::default())?;
5
    match db.update::<Basic, _>(&mut doc).await {
5
        Err(Error::DocumentNotFound(collection, id)) => {
5
            assert_eq!(collection, Basic::collection_name());
5
            assert_eq!(id.as_ref(), &DocumentId::from_u64(1));
5
            Ok(())
        }
        other => panic!("expected DocumentNotFound from update but got: {:?}", other),
    }
5
}

            
3
pub fn blocking_bad_update_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
3
    let mut doc = BorrowedDocument::with_contents::<Basic>(1, &Basic::default())?;
3
    match db.update::<Basic, _>(&mut doc) {
3
        Err(Error::DocumentNotFound(collection, id)) => {
3
            assert_eq!(collection, Basic::collection_name());
3
            assert_eq!(id.as_ref(), &DocumentId::from_u64(1));
3
            Ok(())
        }
        other => panic!("expected DocumentNotFound from update but got: {:?}", other),
    }
3
}

            
5
pub async fn no_update_tests<C: AsyncConnection>(db: &C) -> anyhow::Result<()> {
5
    let original_value = Basic::new("initial_value");
5
    let collection = db.collection::<Basic>();
5
    let header = collection.push(&original_value).await?;

            
5
    let mut doc = collection
5
        .get(header.id)
5
        .await?
5
        .expect("couldn't retrieve stored item");
5
    db.update::<Basic, _>(&mut doc).await?;

            
5
    assert_eq!(CollectionHeader::try_from(doc.header)?, header);

            
5
    Ok(())
5
}

            
3
pub fn blocking_no_update_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
3
    let original_value = Basic::new("initial_value");
3
    let collection = db.collection::<Basic>();
3
    let header = collection.push(&original_value)?;

            
3
    let mut doc = collection
3
        .get(header.id)?
3
        .expect("couldn't retrieve stored item");
3
    db.update::<Basic, _>(&mut doc)?;

            
3
    assert_eq!(CollectionHeader::try_from(doc.header)?, header);

            
3
    Ok(())
3
}

            
5
pub async fn get_multiple_tests<C: AsyncConnection>(db: &C) -> anyhow::Result<()> {
5
    let collection = db.collection::<Basic>();
5
    let doc1_value = Basic::new("initial_value");
5
    let doc1 = collection.push(&doc1_value).await?;

            
5
    let doc2_value = Basic::new("second_value");
5
    let doc2 = collection.push(&doc2_value).await?;

            
5
    let both_docs = Basic::get_multiple_async([doc1.id, doc2.id], db).await?;
5
    assert_eq!(both_docs.len(), 2);

            
5
    let out_of_order = Basic::get_multiple_async([doc2.id, doc1.id], db).await?;
5
    assert_eq!(out_of_order.len(), 2);

            
    // The order of get_multiple isn't guaranteed, so these two checks are done
    // with iterators instead of direct indexing
5
    let doc1 = both_docs
5
        .iter()
5
        .find(|doc| doc.header.id == doc1.id)
5
        .expect("Couldn't find doc1");
5
    assert_eq!(doc1.contents.value, doc1_value.value);
5
    let doc2 = both_docs
5
        .iter()
10
        .find(|doc| doc.header.id == doc2.id)
5
        .expect("Couldn't find doc2");
5
    assert_eq!(doc2.contents.value, doc2_value.value);

            
5
    Ok(())
5
}

            
3
pub fn blocking_get_multiple_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
3
    let collection = db.collection::<Basic>();
3
    let doc1_value = Basic::new("initial_value");
3
    let doc1 = collection.push(&doc1_value)?;

            
3
    let doc2_value = Basic::new("second_value");
3
    let doc2 = collection.push(&doc2_value)?;

            
3
    let both_docs = Basic::get_multiple([doc1.id, doc2.id], db)?;
3
    assert_eq!(both_docs.len(), 2);

            
3
    let out_of_order = Basic::get_multiple([doc2.id, doc1.id], db)?;
3
    assert_eq!(out_of_order.len(), 2);

            
    // The order of get_multiple isn't guaranteed, so these two checks are done
    // with iterators instead of direct indexing
3
    let doc1 = both_docs
3
        .iter()
3
        .find(|doc| doc.header.id == doc1.id)
3
        .expect("Couldn't find doc1");
3
    assert_eq!(doc1.contents.value, doc1_value.value);
3
    let doc2 = both_docs
3
        .iter()
6
        .find(|doc| doc.header.id == doc2.id)
3
        .expect("Couldn't find doc2");
3
    assert_eq!(doc2.contents.value, doc2_value.value);

            
3
    Ok(())
3
}

            
5
pub async fn list_tests<C: AsyncConnection>(db: &C) -> anyhow::Result<()> {
5
    let collection = db.collection::<Basic>();
5
    let doc1_value = Basic::new("initial_value");
5
    let doc1 = collection.push(&doc1_value).await?;

            
5
    let doc2_value = Basic::new("second_value");
5
    let doc2 = collection.push(&doc2_value).await?;

            
5
    let all_docs = Basic::all_async(db).await?;
5
    assert_eq!(all_docs.len(), 2);
5
    assert_eq!(Basic::all_async(db).count().await?, 2);

            
5
    let both_docs = Basic::list_async(doc1.id..=doc2.id, db).await?;
5
    assert_eq!(both_docs.len(), 2);
5
    assert_eq!(Basic::list_async(doc1.id..=doc2.id, db).count().await?, 2);

            
5
    assert_eq!(both_docs[0].contents.value, doc1_value.value);
5
    assert_eq!(both_docs[1].contents.value, doc2_value.value);

            
5
    let both_headers = Basic::list_async(doc1.id..=doc2.id, db).headers().await?;

            
5
    assert_eq!(both_headers.len(), 2);

            
5
    let one_doc = Basic::list_async(doc1.id..doc2.id, db).await?;
5
    assert_eq!(one_doc.len(), 1);

            
5
    let limited = Basic::list_async(doc1.id..=doc2.id, db)
5
        .limit(1)
5
        .descending()
5
        .await?;
5
    assert_eq!(limited.len(), 1);
5
    assert_eq!(limited[0].contents.value, doc2_value.value);

            
5
    Ok(())
5
}

            
3
pub fn blocking_list_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
3
    let collection = db.collection::<Basic>();
3
    let doc1_value = Basic::new("initial_value");
3
    let doc1 = collection.push(&doc1_value)?;

            
3
    let doc2_value = Basic::new("second_value");
3
    let doc2 = collection.push(&doc2_value)?;

            
3
    let all_docs = Basic::all(db).query()?;
3
    assert_eq!(all_docs.len(), 2);
3
    assert_eq!(Basic::all(db).count()?, 2);

            
3
    let both_docs = Basic::list(doc1.id..=doc2.id, db).query()?;
3
    assert_eq!(both_docs.len(), 2);
3
    assert_eq!(Basic::list(doc1.id..=doc2.id, db).count()?, 2);

            
3
    assert_eq!(both_docs[0].contents.value, doc1_value.value);
3
    assert_eq!(both_docs[1].contents.value, doc2_value.value);

            
3
    let both_headers = Basic::list(doc1.id..=doc2.id, db).headers()?;

            
3
    assert_eq!(both_headers.len(), 2);

            
3
    let one_doc = Basic::list(doc1.id..doc2.id, db).query()?;
3
    assert_eq!(one_doc.len(), 1);

            
3
    let limited = Basic::list(doc1.id..=doc2.id, db)
3
        .limit(1)
3
        .descending()
3
        .query()?;
3
    assert_eq!(limited.len(), 1);
3
    assert_eq!(limited[0].contents.value, doc2_value.value);

            
3
    Ok(())
3
}

            
5
pub async fn list_transactions_tests<C: AsyncConnection>(db: &C) -> anyhow::Result<()> {
5
    let collection = db.collection::<Basic>();
5

            
5
    // create LIST_TRANSACTIONS_MAX_RESULTS + 1 items, giving us just enough
5
    // transactions to test the edge cases of `list_transactions`
5
    futures::future::join_all(
5
        (0..=(LIST_TRANSACTIONS_MAX_RESULTS))
5005
            .map(|_| async { collection.push(&Basic::default()).await.unwrap() }),
4606
    )
4606
    .await;

            
    // Test defaults
5
    let transactions = db.list_executed_transactions(None, None).await?;
5
    assert_eq!(
5
        u32::try_from(transactions.len()).unwrap(),
5
        LIST_TRANSACTIONS_DEFAULT_RESULT_COUNT
5
    );

            
    // Test max results limit
5
    let transactions = db
5
        .list_executed_transactions(None, Some(LIST_TRANSACTIONS_MAX_RESULTS + 1))
5
        .await?;
5
    assert_eq!(
5
        u32::try_from(transactions.len()).unwrap(),
5
        LIST_TRANSACTIONS_MAX_RESULTS
5
    );

            
    // Test requesting 0 items
5
    let transactions = db.list_executed_transactions(None, Some(0)).await?;
5
    assert!(transactions.is_empty());

            
    // Test doing a loop fetching until we get no more results
5
    let mut transactions = Vec::new();
5
    let mut starting_id = None;
    loop {
60
        let chunk = db
60
            .list_executed_transactions(starting_id, Some(100))
60
            .await?;
60
        if chunk.is_empty() {
5
            break;
55
        }
55

            
55
        let max_id = chunk.last().map(|tx| tx.id).unwrap();
55
        starting_id = Some(max_id + 1);
55
        transactions.extend(chunk);
    }

            
5
    assert_eq!(
5
        u32::try_from(transactions.len()).unwrap(),
5
        LIST_TRANSACTIONS_MAX_RESULTS + 1
5
    );

            
5
    Ok(())
5
}

            
3
pub fn blocking_list_transactions_tests<C: Connection + Clone + 'static>(
3
    db: &C,
3
) -> anyhow::Result<()> {
3
    // create LIST_TRANSACTIONS_MAX_RESULTS + 1 items, giving us just enough
3
    // transactions to test the edge cases of `list_transactions`
3
    let mut threads = Vec::with_capacity(num_cpus::get());
3
    let transaction_counter = Arc::new(AtomicU32::new(0));
6
    for _ in 0..threads.capacity() {
6
        let db = db.clone();
6
        let transaction_counter = transaction_counter.clone();
6
        threads.push(std::thread::spawn(move || {
6
            let collection = db.collection::<Basic>();
3009
            while transaction_counter.fetch_add(1, Ordering::SeqCst)
3009
                <= LIST_TRANSACTIONS_MAX_RESULTS
3003
            {
3003
                collection.push(&Basic::default()).unwrap();
3003
            }
6
        }));
6
    }

            
9
    for thread in threads {
6
        thread.join().unwrap();
6
    }

            
    // Test defaults
3
    let transactions = db.list_executed_transactions(None, None)?;
3
    assert_eq!(
3
        u32::try_from(transactions.len()).unwrap(),
3
        LIST_TRANSACTIONS_DEFAULT_RESULT_COUNT
3
    );

            
    // Test max results limit
3
    let transactions =
3
        db.list_executed_transactions(None, Some(LIST_TRANSACTIONS_MAX_RESULTS + 1))?;
3
    assert_eq!(
3
        u32::try_from(transactions.len()).unwrap(),
3
        LIST_TRANSACTIONS_MAX_RESULTS
3
    );

            
    // Test requesting 0 items
3
    let transactions = db.list_executed_transactions(None, Some(0))?;
3
    assert!(transactions.is_empty());

            
    // Test doing a loop fetching until we get no more results
3
    let mut transactions = Vec::new();
3
    let mut starting_id = None;
    loop {
36
        let chunk = db.list_executed_transactions(starting_id, Some(100))?;
36
        if chunk.is_empty() {
3
            break;
33
        }
33

            
33
        let max_id = chunk.last().map(|tx| tx.id).unwrap();
33
        starting_id = Some(max_id + 1);
33
        transactions.extend(chunk);
    }

            
3
    assert_eq!(
3
        u32::try_from(transactions.len()).unwrap(),
3
        LIST_TRANSACTIONS_MAX_RESULTS + 1
3
    );

            
3
    Ok(())
3
}

            
5
pub async fn view_query_tests<C: AsyncConnection>(db: &C) -> anyhow::Result<()> {
5
    let collection = db.collection::<Basic>();
5
    let a = collection.push(&Basic::new("A")).await?;
5
    let b = collection.push(&Basic::new("B")).await?;
5
    let a_child = collection
5
        .push(
5
            &Basic::new("A.1")
5
                .with_parent_id(a.id)
5
                .with_category("Alpha"),
5
        )
5
        .await?;
5
    collection
5
        .push(&Basic::new("B.1").with_parent_id(b.id).with_category("Beta"))
5
        .await?;
5
    collection
5
        .push(&Basic::new("B.2").with_parent_id(b.id).with_category("beta"))
5
        .await?;

            
5
    let a_children = db
5
        .view::<BasicByParentId>()
5
        .with_key(Some(a.id))
5
        .query()
5
        .await?;
5
    assert_eq!(a_children.len(), 1);

            
5
    let a_children = db
5
        .view::<BasicByParentId>()
5
        .with_key(Some(a.id))
10
        .query_with_collection_docs()
10
        .await?;
5
    assert_eq!(a_children.len(), 1);
5
    assert_eq!(a_children.get(0).unwrap().document.header, a_child);

            
5
    let b_children = db
5
        .view::<BasicByParentId>()
5
        .with_key(Some(b.id))
5
        .query()
5
        .await?;
5
    assert_eq!(b_children.len(), 2);

            
5
    let a_and_b_children = db
5
        .view::<BasicByParentId>()
5
        .with_keys([Some(a.id), Some(b.id)])
5
        .query()
5
        .await?;
5
    assert_eq!(a_and_b_children.len(), 3);

            
    // Test out of order keys
5
    let a_and_b_children = db
5
        .view::<BasicByParentId>()
5
        .with_keys([Some(b.id), Some(a.id)])
5
        .query()
5
        .await?;
5
    assert_eq!(a_and_b_children.len(), 3);

            
5
    let has_parent = db
5
        .view::<BasicByParentId>()
5
        .with_key_range(Some(0)..=Some(u64::MAX))
5
        .query()
5
        .await?;
5
    assert_eq!(has_parent.len(), 3);
    // Verify the result is sorted ascending
5
    assert!(has_parent
5
        .windows(2)
10
        .all(|window| window[0].key <= window[1].key));

            
    // Test limiting and descending order
5
    let last_with_parent = db
5
        .view::<BasicByParentId>()
5
        .with_key_range(Some(0)..=Some(u64::MAX))
5
        .descending()
5
        .limit(1)
5
        .query()
5
        .await?;
10
    assert_eq!(last_with_parent.iter().map(|m| m.key).unique().count(), 1);
5
    assert_eq!(last_with_parent[0].key, has_parent[2].key);

            
5
    let items_with_categories = db.view::<BasicByCategory>().query().await?;
5
    assert_eq!(items_with_categories.len(), 3);

            
    // Test deleting
5
    let deleted_count = db
5
        .view::<BasicByParentId>()
5
        .with_key(Some(b.id))
5
        .delete_docs()
5
        .await?;
5
    assert_eq!(b_children.len() as u64, deleted_count);
    assert_eq!(
5
        db.view::<BasicByParentId>()
5
            .with_key(Some(b.id))
5
            .query()
5
            .await?
5
            .len(),
        0
    );

            
5
    Ok(())
5
}

            
3
pub fn blocking_view_query_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
3
    let collection = db.collection::<Basic>();
3
    let a = collection.push(&Basic::new("A"))?;
3
    let b = collection.push(&Basic::new("B"))?;
3
    let a_child = collection.push(
3
        &Basic::new("A.1")
3
            .with_parent_id(a.id)
3
            .with_category("Alpha"),
3
    )?;
3
    collection.push(&Basic::new("B.1").with_parent_id(b.id).with_category("Beta"))?;
3
    collection.push(&Basic::new("B.2").with_parent_id(b.id).with_category("beta"))?;

            
3
    let a_children = db.view::<BasicByParentId>().with_key(Some(a.id)).query()?;
3
    assert_eq!(a_children.len(), 1);

            
3
    let a_children = db
3
        .view::<BasicByParentId>()
3
        .with_key(Some(a.id))
3
        .query_with_collection_docs()?;
3
    assert_eq!(a_children.len(), 1);
3
    assert_eq!(a_children.get(0).unwrap().document.header, a_child);

            
3
    let b_children = db.view::<BasicByParentId>().with_key(Some(b.id)).query()?;
3
    assert_eq!(b_children.len(), 2);

            
3
    let a_and_b_children = db
3
        .view::<BasicByParentId>()
3
        .with_keys([Some(a.id), Some(b.id)])
3
        .query()?;
3
    assert_eq!(a_and_b_children.len(), 3);

            
    // Test out of order keys
3
    let a_and_b_children = db
3
        .view::<BasicByParentId>()
3
        .with_keys([Some(b.id), Some(a.id)])
3
        .query()?;
3
    assert_eq!(a_and_b_children.len(), 3);

            
3
    let has_parent = db
3
        .view::<BasicByParentId>()
3
        .with_key_range(Some(0)..=Some(u64::MAX))
3
        .query()?;
3
    assert_eq!(has_parent.len(), 3);
    // Verify the result is sorted ascending
3
    assert!(has_parent
3
        .windows(2)
6
        .all(|window| window[0].key <= window[1].key));

            
    // Test limiting and descending order
3
    let last_with_parent = db
3
        .view::<BasicByParentId>()
3
        .with_key_range(Some(0)..=Some(u64::MAX))
3
        .descending()
3
        .limit(1)
3
        .query()?;
6
    assert_eq!(last_with_parent.iter().map(|m| m.key).unique().count(), 1);
3
    assert_eq!(last_with_parent[0].key, has_parent[2].key);

            
3
    let items_with_categories = db.view::<BasicByCategory>().query()?;
3
    assert_eq!(items_with_categories.len(), 3);

            
    // Test deleting
3
    let deleted_count = db
3
        .view::<BasicByParentId>()
3
        .with_key(Some(b.id))
3
        .delete_docs()?;
3
    assert_eq!(b_children.len() as u64, deleted_count);
3
    assert_eq!(
3
        db.view::<BasicByParentId>()
3
            .with_key(Some(b.id))
3
            .query()?
3
            .len(),
        0
    );

            
3
    Ok(())
3
}

            
5
pub async fn unassociated_collection_tests<C: AsyncConnection>(db: &C) -> anyhow::Result<()> {
5
    let result = db
5
        .collection::<UnassociatedCollection>()
5
        .push(&UnassociatedCollection)
5
        .await;
5
    match result {
5
        Err(Error::CollectionNotFound) => {}
        other => unreachable!("unexpected result: {:?}", other),
    }

            
5
    Ok(())
5
}

            
3
pub fn blocking_unassociated_collection_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
3
    let result = db
3
        .collection::<UnassociatedCollection>()
3
        .push(&UnassociatedCollection);
3
    match result {
3
        Err(Error::CollectionNotFound) => {}
        other => unreachable!("unexpected result: {:?}", other),
    }

            
3
    Ok(())
3
}

            
5
pub async fn unimplemented_reduce<C: AsyncConnection>(db: &C) -> anyhow::Result<()> {
5
    assert!(matches!(
5
        db.view::<UniqueValue>().reduce().await,
        Err(Error::ReduceUnimplemented)
    ));
5
    Ok(())
5
}

            
3
pub fn blocking_unimplemented_reduce<C: Connection>(db: &C) -> anyhow::Result<()> {
3
    assert!(matches!(
3
        db.view::<UniqueValue>().reduce(),
        Err(Error::ReduceUnimplemented)
    ));
3
    Ok(())
3
}

            
5
pub async fn view_update_tests<C: AsyncConnection>(db: &C) -> anyhow::Result<()> {
5
    let collection = db.collection::<Basic>();
5
    let a = collection.push(&Basic::new("A")).await?;

            
5
    let a_children = db
5
        .view::<BasicByParentId>()
5
        .with_key(Some(a.id))
5
        .query()
5
        .await?;
5
    assert_eq!(a_children.len(), 0);
    // The reduce function of `BasicByParentId` acts as a "count" of records.
    assert_eq!(
5
        db.view::<BasicByParentId>()
5
            .with_key(Some(a.id))
5
            .reduce()
5
            .await?,
        0
    );

            
    // Test inserting a new record and the view being made available
5
    let a_child = collection
5
        .push(
5
            &Basic::new("A.1")
5
                .with_parent_id(a.id)
5
                .with_category("Alpha"),
5
        )
5
        .await?;

            
5
    let a_children = db
5
        .view::<BasicByParentId>()
5
        .with_key(Some(a.id))
5
        .query()
5
        .await?;
5
    assert_eq!(a_children.len(), 1);
    assert_eq!(
5
        db.view::<BasicByParentId>()
5
            .with_key(Some(a.id))
5
            .reduce()
5
            .await?,
        1
    );

            
    // Verify reduce_grouped matches our expectations.
    assert_eq!(
5
        db.view::<BasicByParentId>().reduce_grouped().await?,
5
        vec![MappedValue::new(None, 1,), MappedValue::new(Some(a.id), 1,),]
    );

            
    // Test updating the record and the view being updated appropriately
5
    let mut doc = db.collection::<Basic>().get(a_child.id).await?.unwrap();
5
    let mut basic = Basic::document_contents(&doc)?;
5
    basic.parent_id = None;
5
    Basic::set_document_contents(&mut doc, basic)?;
5
    db.update::<Basic, _>(&mut doc).await?;

            
5
    let a_children = db
5
        .view::<BasicByParentId>()
5
        .with_key(Some(a.id))
5
        .query()
5
        .await?;
5
    assert_eq!(a_children.len(), 0);
    assert_eq!(
5
        db.view::<BasicByParentId>()
5
            .with_key(Some(a.id))
5
            .reduce()
5
            .await?,
        0
    );
5
    assert_eq!(db.view::<BasicByParentId>().reduce().await?, 2);

            
    // Test deleting a record and ensuring it goes away
5
    db.collection::<Basic>().delete(&doc).await?;

            
5
    let all_entries = db.view::<BasicByParentId>().query().await?;
5
    assert_eq!(all_entries.len(), 1);

            
    // Verify reduce_grouped matches our expectations.
    assert_eq!(
5
        db.view::<BasicByParentId>().reduce_grouped().await?,
5
        vec![MappedValue::new(None, 1,),]
    );

            
    // Remove the final document, which has a parent id of None. We'll add a new
    // document with None to verify that the mapper handles the edge case of a
    // delete/insert in the same mapping operation.
5
    db.view::<BasicByParentId>().delete_docs().await?;
5
    collection.push(&Basic::new("B")).await?;

            
5
    let all_entries = db.view::<BasicByParentId>().query().await?;
5
    assert_eq!(all_entries.len(), 1);

            
5
    Ok(())
5
}

            
3
pub fn blocking_view_update_tests<C: Connection>(db: &C) -> anyhow::Result<()> {
3
    let collection = db.collection::<Basic>();
3
    let a = collection.push(&Basic::new("A"))?;

            
3
    let a_children = db.view::<BasicByParentId>().with_key(Some(a.id)).query()?;
3
    assert_eq!(a_children.len(), 0);
    // The reduce function of `BasicByParentId` acts as a "count" of records.
3
    assert_eq!(
3
        db.view::<BasicByParentId>().with_key(Some(a.id)).reduce()?,
        0
    );

            
    // Test inserting a new record and the view being made available
3
    let a_child = collection.push(
3
        &Basic::new("A.1")
3
            .with_parent_id(a.id)
3
            .with_category("Alpha"),
3
    )?;

            
3
    let a_children = db.view::<BasicByParentId>().with_key(Some(a.id)).query()?;
3
    assert_eq!(a_children.len(), 1);
3
    assert_eq!(
3
        db.view::<BasicByParentId>().with_key(Some(a.id)).reduce()?,
        1
    );

            
    // Verify reduce_grouped matches our expectations.
3
    assert_eq!(
3
        db.view::<BasicByParentId>().reduce_grouped()?,
3
        vec![MappedValue::new(None, 1,), MappedValue::new(Some(a.id), 1,),]
    );

            
    // Test updating the record and the view being updated appropriately
3
    let mut doc = db.collection::<Basic>().get(a_child.id)?.unwrap();
3
    let mut basic = Basic::document_contents(&doc)?;
3
    basic.parent_id = None;
3
    Basic::set_document_contents(&mut doc, basic)?;
3
    db.update::<Basic, _>(&mut doc)?;

            
3
    let a_children = db.view::<BasicByParentId>().with_key(Some(a.id)).query()?;
3
    assert_eq!(a_children.len(), 0);
3
    assert_eq!(
3
        db.view::<BasicByParentId>().with_key(Some(a.id)).reduce()?,
        0
    );
3
    assert_eq!(db.view::<BasicByParentId>().reduce()?, 2);

            
    // Test deleting a record and ensuring it goes away
3
    db.collection::<Basic>().delete(&doc)?;

            
3
    let all_entries = db.view::<BasicByParentId>().query()?;
3
    assert_eq!(all_entries.len(), 1);

            
    // Verify reduce_grouped matches our expectations.
3
    assert_eq!(
3
        db.view::<BasicByParentId>().reduce_grouped()?,
3
        vec![MappedValue::new(None, 1,),]
    );

            
    // Remove the final document, which has a parent id of None. We'll add a new
    // document with None to verify that the mapper handles the edge case of a
    // delete/insert in the same mapping operation.
3
    db.view::<BasicByParentId>().delete_docs()?;
3
    collection.push(&Basic::new("B"))?;

            
3
    let all_entries = db.view::<BasicByParentId>().query()?;
3
    assert_eq!(all_entries.len(), 1);

            
3
    Ok(())
3
}

            
5
pub async fn view_multi_emit_tests<C: AsyncConnection>(db: &C) -> anyhow::Result<()> {
5
    let mut a = Basic::new("A")
5
        .with_tag("red")
5
        .with_tag("green")
5
        .push_into_async(db)
5
        .await?;
5
    let mut b = Basic::new("B")
5
        .with_tag("blue")
5
        .with_tag("green")
5
        .push_into_async(db)
5
        .await?;

            
5
    assert_eq!(db.view::<BasicByTag>().query().await?.len(), 4);

            
    assert_eq!(