1
use std::time::Duration;
2

            
3
#[cfg(feature = "encryption")]
4
use bonsaidb_core::test_util::EncryptedBasic;
5
use bonsaidb_core::{
6
    connection::{AccessPolicy, Connection, StorageConnection},
7
    document::DocumentId,
8
    permissions::{Permissions, Statement},
9
    test_util::{
10
        Basic, BasicByBrokenParentId, BasicByParentId, BasicCollectionWithNoViews,
11
        BasicCollectionWithOnlyBrokenParentId, BasicSchema, HarnessTest, TestDirectory,
12
    },
13
};
14
use config::StorageConfiguration;
15

            
16
use super::*;
17
use crate::{config::Builder, Database};
18

            
19
macro_rules! define_local_suite {
20
    ($name:ident) => {
21
        mod $name {
22
            use super::*;
23
            struct TestHarness {
24
                _directory: TestDirectory,
25
                db: Database,
26
            }
27

            
28
            impl TestHarness {
29
60
                async fn new(test: HarnessTest) -> anyhow::Result<Self> {
30
                    let directory = TestDirectory::new(format!("{}-{}", stringify!($name), test));
31
                    let mut config =
32
                        StorageConfiguration::new(&directory).with_schema::<BasicSchema>()?;
33
                    if stringify!($name) == "memory" {
34
                        config = config.memory_only()
35
                    }
36

            
37
                    #[cfg(feature = "compression")]
38
                    {
39
                        config = config.default_compression(crate::config::Compression::Lz4);
40
                    }
41

            
42
                    let storage = Storage::open(config).await?;
43
                    storage
44
                        .create_database::<BasicSchema>("tests", false)
45
                        .await?;
46
                    let db = storage.database::<BasicSchema>("tests").await?;
47

            
48
                    Ok(Self {
49
                        _directory: directory,
50
                        db,
51
                    })
52
                }
53

            
54
4
                const fn server_name() -> &'static str {
55
4
                    stringify!($name)
56
4
                }
57

            
58
4
                fn server(&self) -> &'_ Storage {
59
4
                    self.db.storage()
60
4
                }
61

            
62
                #[allow(dead_code)]
63
                async fn connect_with_permissions(
64
                    &self,
65
                    permissions: Vec<Statement>,
66
                    _label: &str,
67
                ) -> anyhow::Result<Database> {
68
                    Ok(self
69
                        .db
70
                        .with_effective_permissions(Permissions::from(permissions)))
71
                }
72

            
73
58
                async fn connect(&self) -> anyhow::Result<Database> {
74
58
                    Ok(self.db.clone())
75
58
                }
76

            
77
52
                pub async fn shutdown(&self) -> anyhow::Result<()> {
78
52
                    Ok(())
79
52
                }
80
            }
81

            
82
            bonsaidb_core::define_connection_test_suite!(TestHarness);
83

            
84
            bonsaidb_core::define_pubsub_test_suite!(TestHarness);
85

            
86
            bonsaidb_core::define_kv_test_suite!(TestHarness);
87
        }
88
    };
89
}
90

            
91
200
define_local_suite!(persisted);
92
200
define_local_suite!(memory);
93

            
94
1
#[test]
95
1
fn integrity_checks() -> anyhow::Result<()> {
96
1
    let path = TestDirectory::new("integrity-checks");
97
    // To ensure full cleanup between each block, each runs in its own runtime;
98

            
99
    // Add a doc with no views installed
100
1
    {
101
1
        let rt = tokio::runtime::Builder::new_current_thread()
102
1
            .enable_all()
103
1
            .build()?;
104
1
        rt.block_on(async {
105
            {
106
1
                let db =
107
20
                    Database::open::<BasicCollectionWithNoViews>(StorageConfiguration::new(&path))
108
20
                        .await?;
109
1
                let collection = db.collection::<BasicCollectionWithNoViews>();
110
1
                collection
111
1
                    .push(&Basic::default().with_parent_id(DocumentId::from_u64(1)))
112
1
                    .await?;
113
            }
114
1
            Result::<(), anyhow::Error>::Ok(())
115
1
        })
116
1
        .unwrap();
117
    }
118
    // Connect with a new view and see the automatic update with a query
119
1
    {
120
1
        let rt = tokio::runtime::Builder::new_current_thread()
121
1
            .enable_all()
122
1
            .build()?;
123
1
        rt.block_on(async {
124
1
            let db = Database::open::<BasicCollectionWithOnlyBrokenParentId>(
125
1
                StorageConfiguration::new(&path),
126
16
            )
127
16
            .await?;
128
            // Give the integrity scanner time to run if it were to run (it shouldn't in this configuration).
129
1
            tokio::time::sleep(Duration::from_millis(100)).await;
130

            
131
            // NoUpdate should return data without the validation checker having run.
132
1
            assert_eq!(
133
1
                db.view::<BasicByBrokenParentId>()
134
1
                    .with_access_policy(AccessPolicy::NoUpdate)
135
1
                    .query()
136
                    .await?
137
1
                    .len(),
138
                0
139
            );
140

            
141
            // Regular query should show the correct data
142
2
            assert_eq!(db.view::<BasicByBrokenParentId>().query().await?.len(), 1);
143
1
            Result::<(), anyhow::Error>::Ok(())
144
1
        })
145
1
        .unwrap();
146
    }
147
    // Connect with a fixed view, and wait for the integrity scanner to work
148
1
    {
149
1
        let rt = tokio::runtime::Builder::new_current_thread()
150
1
            .enable_all()
151
1
            .build()?;
152
1
        rt.block_on(async {
153
1
            let db = Database::open::<Basic>(
154
1
                StorageConfiguration::new(&path).check_view_integrity_on_open(true),
155
15
            )
156
15
            .await?;
157
1
            for _ in 0_u8..100 {
158
1
                tokio::time::sleep(Duration::from_millis(1000)).await;
159
1
                if db
160
1
                    .view::<BasicByParentId>()
161
1
                    .with_access_policy(AccessPolicy::NoUpdate)
162
1
                    .with_key(Some(1))
163
1
                    .query()
164
                    .await?
165
1
                    .len()
166
                    == 1
167
                {
168
1
                    return Result::<(), anyhow::Error>::Ok(());
169
                }
170
            }
171

            
172
            panic!("Integrity checker didn't run in the allocated time")
173
1
        })
174
1
        .unwrap();
175
1
    }
176
1

            
177
1
    Ok(())
178
1
}
179

            
180
1
#[test]
181
#[cfg(feature = "encryption")]
182
1
fn encryption() -> anyhow::Result<()> {
183
1
    use bonsaidb_core::schema::SerializedCollection;
184
1
    let path = TestDirectory::new("encryption");
185
1
    let document_header = {
186
1
        let rt = tokio::runtime::Runtime::new()?;
187
1
        rt.block_on(async {
188
18
            let db = Database::open::<BasicSchema>(StorageConfiguration::new(&path)).await?;
189

            
190
1
            let document_header = db
191
1
                .collection::<EncryptedBasic>()
192
1
                .push(&EncryptedBasic::new("hello"))
193
1
                .await?;
194

            
195
            // Retrieve the document, showing that it was stored successfully.
196
1
            let doc = db
197
1
                .collection::<EncryptedBasic>()
198
1
                .get(document_header.id)
199
1
                .await?
200
1
                .expect("doc not found");
201
1
            assert_eq!(&EncryptedBasic::document_contents(&doc)?.value, "hello");
202

            
203
1
            Result::<_, anyhow::Error>::Ok(document_header)
204
1
        })?
205
    };
206

            
207
    // By resetting the encryption key, we should be able to force an error in
208
    // decryption, which proves that the document was encrypted. To ensure the
209
    // server starts up and generates a new key, we must delete the sealing key.
210

            
211
1
    std::fs::remove_file(path.join("master-keys"))?;
212

            
213
1
    let rt = tokio::runtime::Runtime::new()?;
214
1
    rt.block_on(async move {
215
19
        let db = Database::open::<BasicSchema>(StorageConfiguration::new(&path)).await?;
216

            
217
        // Try retrieving the document, but expect an error decrypting.
218
1
        if let Err(bonsaidb_core::Error::Database(err)) = db
219
1
            .collection::<EncryptedBasic>()
220
1
            .get(document_header.id)
221
1
            .await
222
        {
223
1
            assert!(err.contains("vault"));
224
        } else {
225
            panic!("successfully retrieved encrypted document without keys");
226
        }
227

            
228
1
        Result::<_, anyhow::Error>::Ok(())
229
1
    })?;
230

            
231
1
    Ok(())
232
1
}
233

            
234
1
#[test]
235
1
fn expiration_after_close() -> anyhow::Result<()> {
236
    use bonsaidb_core::{keyvalue::KeyValue, test_util::TimingTest};
237
1
    loop {
238
1
        let path = TestDirectory::new("expiration-after-close");
239
1
        // To ensure full cleanup between each block, each runs in its own runtime;
240
1
        let timing = TimingTest::new(Duration::from_millis(100));
241
        // Set a key with an expiration, then close it. Then try to validate it
242
        // exists after opening, and then expires at the correct time.
243
        {
244
1
            let rt = tokio::runtime::Runtime::new()?;
245
1
            rt.block_on(async {
246
18
                let db = Database::open::<()>(StorageConfiguration::new(&path)).await?;
247

            
248
                // TODO This is a workaroun for the key-value expiration task
249
                // taking ownership of an instance of Database. If this async
250
                // task runs too quickly, sometimes things don't get cleaned up
251
                // if that task hasn't completed. This pause ensures the startup
252
                // tasks complete before we continue with the test. This should
253
                // be replaced with a proper shutdown call for the local
254
                // storage/database.
255
1
                tokio::time::sleep(Duration::from_millis(100)).await;
256

            
257
1
                db.set_key("a", &0_u32)
258
1
                    .expire_in(Duration::from_secs(3))
259
                    .await?;
260
1
                Result::<(), anyhow::Error>::Ok(())
261
1
            })?;
262
        }
263

            
264
        {
265
1
            let rt = tokio::runtime::Runtime::new()?;
266
1
            let retry = rt.block_on(async {
267
15
                let db = Database::open::<()>(StorageConfiguration::new(&path)).await?;
268

            
269
1
                let key = db.get_key("a").await?;
270
                // Due to not having a reliable way to shut down the database,
271
                // we can't make many guarantees about what happened after
272
                // setting the key in the above block. If we get None back,
273
                // we'll consider the test needing to retry. Once we have a
274
                // shutdown operation that guarantees that the key-value store
275
                // persists, the key.is_none() check shoud be removed, instead
276
                // asserting `key.is_some()`.
277
1
                if timing.elapsed() > Duration::from_secs(1) || key.is_none() {
278
                    return Ok(true);
279
1
                }
280
1

            
281
1
                timing.wait_until(Duration::from_secs(4)).await;
282

            
283
1
                assert!(db.get_key("a").await?.is_none());
284

            
285
1
                Result::<bool, anyhow::Error>::Ok(false)
286
1
            })?;
287

            
288
1
            if retry {
289
                println!("Retrying  expiration_after_close because it was too slow");
290
                continue;
291
1
            }
292
1
        }
293
1

            
294
1
        break;
295
1
    }
296
1
    Ok(())
297
1
}