1
mod compatibility;
2

            
3
use std::time::Duration;
4

            
5
#[cfg(feature = "encryption")]
6
use bonsaidb_core::test_util::EncryptedBasic;
7
use bonsaidb_core::{
8
    connection::{AccessPolicy, Connection, StorageConnection},
9
    document::DocumentId,
10
    permissions::{Permissions, Statement},
11
    test_util::{
12
        Basic, BasicByBrokenParentId, BasicByParentId, BasicCollectionWithNoViews,
13
        BasicCollectionWithOnlyBrokenParentId, BasicSchema, HarnessTest, TestDirectory,
14
    },
15
};
16
use config::StorageConfiguration;
17

            
18
use super::*;
19
use crate::{config::Builder, Database};
20

            
21
macro_rules! define_local_suite {
22
    ($name:ident) => {
23
        mod $name {
24
            use super::*;
25
            struct TestHarness {
26
                _directory: TestDirectory,
27
                db: Database,
28
            }
29

            
30
            impl TestHarness {
31
60
                async fn new(test: HarnessTest) -> anyhow::Result<Self> {
32
                    let directory = TestDirectory::new(format!("{}-{}", stringify!($name), test));
33
                    let mut config =
34
                        StorageConfiguration::new(&directory).with_schema::<BasicSchema>()?;
35
                    if stringify!($name) == "memory" {
36
                        config = config.memory_only()
37
                    }
38

            
39
                    #[cfg(feature = "compression")]
40
                    {
41
                        config = config.default_compression(crate::config::Compression::Lz4);
42
                    }
43

            
44
                    let storage = Storage::open(config).await?;
45
                    storage
46
                        .create_database::<BasicSchema>("tests", false)
47
                        .await?;
48
                    let db = storage.database::<BasicSchema>("tests").await?;
49

            
50
                    Ok(Self {
51
                        _directory: directory,
52
                        db,
53
                    })
54
                }
55

            
56
4
                const fn server_name() -> &'static str {
57
4
                    stringify!($name)
58
4
                }
59

            
60
4
                fn server(&self) -> &'_ Storage {
61
4
                    self.db.storage()
62
4
                }
63

            
64
                #[allow(dead_code)]
65
                async fn connect_with_permissions(
66
                    &self,
67
                    permissions: Vec<Statement>,
68
                    _label: &str,
69
                ) -> anyhow::Result<Database> {
70
                    Ok(self
71
                        .db
72
                        .with_effective_permissions(Permissions::from(permissions)))
73
                }
74

            
75
58
                async fn connect(&self) -> anyhow::Result<Database> {
76
58
                    Ok(self.db.clone())
77
58
                }
78

            
79
52
                pub async fn shutdown(&self) -> anyhow::Result<()> {
80
52
                    Ok(())
81
52
                }
82
            }
83

            
84
            bonsaidb_core::define_connection_test_suite!(TestHarness);
85

            
86
            bonsaidb_core::define_pubsub_test_suite!(TestHarness);
87

            
88
            bonsaidb_core::define_kv_test_suite!(TestHarness);
89
        }
90
    };
91
}
92

            
93
200
define_local_suite!(persisted);
94
200
define_local_suite!(memory);
95

            
96
1
#[test]
97
#[cfg_attr(not(feature = "compression"), allow(unused_mut))]
98
1
fn integrity_checks() -> anyhow::Result<()> {
99
1
    let path = TestDirectory::new("integrity-checks");
100
1
    let mut config = StorageConfiguration::new(&path);
101
1
    #[cfg(feature = "compression")]
102
1
    {
103
1
        config = config.default_compression(crate::config::Compression::Lz4);
104
1
    }
105
    // To ensure full cleanup between each block, each runs in its own runtime;
106

            
107
    // Add a doc with no views installed
108
1
    {
109
1
        let rt = tokio::runtime::Builder::new_current_thread()
110
1
            .enable_all()
111
1
            .build()?;
112
1
        rt.block_on(async {
113
            {
114
21
                let db = Database::open::<BasicCollectionWithNoViews>(config.clone()).await?;
115
1
                let collection = db.collection::<BasicCollectionWithNoViews>();
116
1
                collection
117
1
                    .push(&Basic::default().with_parent_id(DocumentId::from_u64(1)))
118
1
                    .await?;
119
            }
120
1
            Result::<(), anyhow::Error>::Ok(())
121
1
        })
122
1
        .unwrap();
123
    }
124
    // Connect with a new view and see the automatic update with a query
125
1
    {
126
1
        let rt = tokio::runtime::Builder::new_current_thread()
127
1
            .enable_all()
128
1
            .build()?;
129
1
        rt.block_on(async {
130
1
            let db =
131
15
                Database::open::<BasicCollectionWithOnlyBrokenParentId>(config.clone()).await?;
132
            // Give the integrity scanner time to run if it were to run (it shouldn't in this configuration).
133
1
            tokio::time::sleep(Duration::from_millis(100)).await;
134

            
135
            // NoUpdate should return data without the validation checker having run.
136
1
            assert_eq!(
137
1
                db.view::<BasicByBrokenParentId>()
138
1
                    .with_access_policy(AccessPolicy::NoUpdate)
139
1
                    .query()
140
                    .await?
141
1
                    .len(),
142
                0
143
            );
144

            
145
            // Regular query should show the correct data
146
2
            assert_eq!(db.view::<BasicByBrokenParentId>().query().await?.len(), 1);
147
1
            Result::<(), anyhow::Error>::Ok(())
148
1
        })
149
1
        .unwrap();
150
    }
151
    // Connect with a fixed view, and wait for the integrity scanner to work
152
1
    {
153
1
        let rt = tokio::runtime::Builder::new_current_thread()
154
1
            .enable_all()
155
1
            .build()?;
156
1
        rt.block_on(async {
157
16
            let db = Database::open::<Basic>(config.check_view_integrity_on_open(true)).await?;
158
1
            for _ in 0_u8..100 {
159
1
                tokio::time::sleep(Duration::from_millis(1000)).await;
160
1
                if db
161
1
                    .view::<BasicByParentId>()
162
1
                    .with_access_policy(AccessPolicy::NoUpdate)
163
1
                    .with_key(Some(1))
164
1
                    .query()
165
                    .await?
166
1
                    .len()
167
                    == 1
168
                {
169
1
                    return Result::<(), anyhow::Error>::Ok(());
170
                }
171
            }
172

            
173
            panic!("Integrity checker didn't run in the allocated time")
174
1
        })
175
1
        .unwrap();
176
1
    }
177
1

            
178
1
    Ok(())
179
1
}
180

            
181
1
#[test]
182
#[cfg(feature = "encryption")]
183
1
fn encryption() -> anyhow::Result<()> {
184
1
    use bonsaidb_core::schema::SerializedCollection;
185
1
    let path = TestDirectory::new("encryption");
186
1
    let document_header = {
187
1
        let rt = tokio::runtime::Runtime::new()?;
188
1
        rt.block_on(async {
189
19
            let db = Database::open::<BasicSchema>(StorageConfiguration::new(&path)).await?;
190

            
191
1
            let document_header = db
192
1
                .collection::<EncryptedBasic>()
193
1
                .push(&EncryptedBasic::new("hello"))
194
1
                .await?;
195

            
196
            // Retrieve the document, showing that it was stored successfully.
197
1
            let doc = db
198
1
                .collection::<EncryptedBasic>()
199
1
                .get(document_header.id)
200
1
                .await?
201
1
                .expect("doc not found");
202
1
            assert_eq!(&EncryptedBasic::document_contents(&doc)?.value, "hello");
203

            
204
1
            Result::<_, anyhow::Error>::Ok(document_header)
205
1
        })?
206
    };
207

            
208
    // By resetting the encryption key, we should be able to force an error in
209
    // decryption, which proves that the document was encrypted. To ensure the
210
    // server starts up and generates a new key, we must delete the sealing key.
211

            
212
1
    std::fs::remove_file(path.join("master-keys"))?;
213

            
214
1
    let rt = tokio::runtime::Runtime::new()?;
215
1
    rt.block_on(async move {
216
18
        let db = Database::open::<BasicSchema>(StorageConfiguration::new(&path)).await?;
217

            
218
        // Try retrieving the document, but expect an error decrypting.
219
1
        if let Err(bonsaidb_core::Error::Database(err)) = db
220
1
            .collection::<EncryptedBasic>()
221
1
            .get(document_header.id)
222
1
            .await
223
        {
224
1
            assert!(err.contains("vault"));
225
        } else {
226
            panic!("successfully retrieved encrypted document without keys");
227
        }
228

            
229
1
        Result::<_, anyhow::Error>::Ok(())
230
1
    })?;
231

            
232
1
    Ok(())
233
1
}
234

            
235
1
#[test]
236
1
fn expiration_after_close() -> anyhow::Result<()> {
237
    use bonsaidb_core::{keyvalue::KeyValue, test_util::TimingTest};
238
1
    loop {
239
1
        let path = TestDirectory::new("expiration-after-close");
240
1
        // To ensure full cleanup between each block, each runs in its own runtime;
241
1
        let timing = TimingTest::new(Duration::from_millis(100));
242
        // Set a key with an expiration, then close it. Then try to validate it
243
        // exists after opening, and then expires at the correct time.
244
        {
245
1
            let rt = tokio::runtime::Runtime::new()?;
246
1
            rt.block_on(async {
247
19
                let db = Database::open::<()>(StorageConfiguration::new(&path)).await?;
248

            
249
                // TODO This is a workaroun for the key-value expiration task
250
                // taking ownership of an instance of Database. If this async
251
                // task runs too quickly, sometimes things don't get cleaned up
252
                // if that task hasn't completed. This pause ensures the startup
253
                // tasks complete before we continue with the test. This should
254
                // be replaced with a proper shutdown call for the local
255
                // storage/database.
256
1
                tokio::time::sleep(Duration::from_millis(100)).await;
257

            
258
1
                db.set_key("a", &0_u32)
259
1
                    .expire_in(Duration::from_secs(3))
260
                    .await?;
261
1
                Result::<(), anyhow::Error>::Ok(())
262
1
            })?;
263
        }
264

            
265
        {
266
1
            let rt = tokio::runtime::Runtime::new()?;
267
1
            let retry = rt.block_on(async {
268
15
                let db = Database::open::<()>(StorageConfiguration::new(&path)).await?;
269

            
270
1
                let key = db.get_key("a").await?;
271
                // Due to not having a reliable way to shut down the database,
272
                // we can't make many guarantees about what happened after
273
                // setting the key in the above block. If we get None back,
274
                // we'll consider the test needing to retry. Once we have a
275
                // shutdown operation that guarantees that the key-value store
276
                // persists, the key.is_none() check shoud be removed, instead
277
                // asserting `key.is_some()`.
278
1
                if timing.elapsed() > Duration::from_secs(1) || key.is_none() {
279
                    return Ok(true);
280
1
                }
281
1

            
282
1
                timing.wait_until(Duration::from_secs(4)).await;
283

            
284
1
                assert!(db.get_key("a").await?.is_none());
285

            
286
1
                Result::<bool, anyhow::Error>::Ok(false)
287
1
            })?;
288

            
289
1
            if retry {
290
                println!("Retrying  expiration_after_close because it was too slow");
291
                continue;
292
1
            }
293
1
        }
294
1

            
295
1
        break;
296
1
    }
297
1
    Ok(())
298
1
}