1
use std::time::Duration;
2

            
3
#[cfg(feature = "encryption")]
4
use bonsaidb_core::test_util::EncryptedBasic;
5
use bonsaidb_core::{
6
    connection::{AccessPolicy, Connection, StorageConnection},
7
    permissions::{Permissions, Statement},
8
    test_util::{
9
        Basic, BasicByBrokenParentId, BasicByParentId, BasicCollectionWithNoViews,
10
        BasicCollectionWithOnlyBrokenParentId, BasicSchema, HarnessTest, TestDirectory,
11
    },
12
};
13
use config::StorageConfiguration;
14

            
15
use super::*;
16
use crate::{config::Builder, Database};
17

            
18
macro_rules! define_local_suite {
19
    ($name:ident) => {
20
        mod $name {
21
            use super::*;
22
            struct TestHarness {
23
                _directory: TestDirectory,
24
                db: Database,
25
            }
26

            
27
            impl TestHarness {
28
60
                async fn new(test: HarnessTest) -> anyhow::Result<Self> {
29
                    let directory = TestDirectory::new(format!("{}-{}", stringify!($name), test));
30
                    let mut config =
31
                        StorageConfiguration::new(&directory).with_schema::<BasicSchema>()?;
32
                    if stringify!($name) == "memory" {
33
                        config = config.memory_only()
34
                    }
35
                    let storage = Storage::open(config).await?;
36
                    storage
37
                        .create_database::<BasicSchema>("tests", false)
38
                        .await?;
39
                    let db = storage.database::<BasicSchema>("tests").await?;
40

            
41
                    Ok(Self {
42
                        _directory: directory,
43
                        db,
44
                    })
45
                }
46

            
47
4
                const fn server_name() -> &'static str {
48
4
                    stringify!($name)
49
4
                }
50

            
51
4
                fn server(&self) -> &'_ Storage {
52
4
                    self.db.storage()
53
4
                }
54

            
55
                #[allow(dead_code)]
56
                async fn connect_with_permissions(
57
                    &self,
58
                    permissions: Vec<Statement>,
59
                    _label: &str,
60
                ) -> anyhow::Result<Database> {
61
                    Ok(self
62
                        .db
63
                        .with_effective_permissions(Permissions::from(permissions)))
64
                }
65

            
66
58
                async fn connect(&self) -> anyhow::Result<Database> {
67
58
                    Ok(self.db.clone())
68
58
                }
69

            
70
52
                pub async fn shutdown(&self) -> anyhow::Result<()> {
71
52
                    Ok(())
72
52
                }
73
            }
74

            
75
            bonsaidb_core::define_connection_test_suite!(TestHarness);
76

            
77
            bonsaidb_core::define_pubsub_test_suite!(TestHarness);
78

            
79
            bonsaidb_core::define_kv_test_suite!(TestHarness);
80
        }
81
    };
82
}
83

            
84
200
define_local_suite!(persisted);
85
200
define_local_suite!(memory);
86

            
87
1
#[test]
88
1
fn integrity_checks() -> anyhow::Result<()> {
89
1
    let path = TestDirectory::new("integrity-checks");
90
    // To ensure full cleanup between each block, each runs in its own runtime;
91

            
92
    // Add a doc with no views installed
93
1
    {
94
1
        let rt = tokio::runtime::Builder::new_current_thread()
95
1
            .enable_all()
96
1
            .build()?;
97
1
        rt.block_on(async {
98
            {
99
1
                let db =
100
20
                    Database::open::<BasicCollectionWithNoViews>(StorageConfiguration::new(&path))
101
20
                        .await?;
102
1
                let collection = db.collection::<BasicCollectionWithNoViews>();
103
1
                collection.push(&Basic::default().with_parent_id(1)).await?;
104
            }
105
1
            Result::<(), anyhow::Error>::Ok(())
106
1
        })
107
1
        .unwrap();
108
    }
109
    // Connect with a new view and see the automatic update with a query
110
1
    {
111
1
        let rt = tokio::runtime::Builder::new_current_thread()
112
1
            .enable_all()
113
1
            .build()?;
114
1
        rt.block_on(async {
115
1
            let db = Database::open::<BasicCollectionWithOnlyBrokenParentId>(
116
1
                StorageConfiguration::new(&path),
117
16
            )
118
16
            .await?;
119
            // Give the integrity scanner time to run if it were to run (it shouldn't in this configuration).
120
1
            tokio::time::sleep(Duration::from_millis(100)).await;
121

            
122
            // NoUpdate should return data without the validation checker having run.
123
1
            assert_eq!(
124
1
                db.view::<BasicByBrokenParentId>()
125
1
                    .with_access_policy(AccessPolicy::NoUpdate)
126
1
                    .query()
127
                    .await?
128
1
                    .len(),
129
                0
130
            );
131

            
132
            // Regular query should show the correct data
133
2
            assert_eq!(db.view::<BasicByBrokenParentId>().query().await?.len(), 1);
134
1
            Result::<(), anyhow::Error>::Ok(())
135
1
        })
136
1
        .unwrap();
137
    }
138
    // Connect with a fixed view, and wait for the integrity scanner to work
139
1
    {
140
1
        let rt = tokio::runtime::Builder::new_current_thread()
141
1
            .enable_all()
142
1
            .build()?;
143
1
        rt.block_on(async {
144
1
            let db = Database::open::<Basic>(
145
1
                StorageConfiguration::new(&path).check_view_integrity_on_open(true),
146
15
            )
147
15
            .await?;
148
1
            for _ in 0_u8..100 {
149
1
                tokio::time::sleep(Duration::from_millis(1000)).await;
150
1
                if db
151
1
                    .view::<BasicByParentId>()
152
1
                    .with_access_policy(AccessPolicy::NoUpdate)
153
1
                    .with_key(Some(1))
154
1
                    .query()
155
                    .await?
156
1
                    .len()
157
                    == 1
158
                {
159
1
                    return Result::<(), anyhow::Error>::Ok(());
160
                }
161
            }
162

            
163
            panic!("Integrity checker didn't run in the allocated time")
164
1
        })
165
1
        .unwrap();
166
1
    }
167
1

            
168
1
    Ok(())
169
1
}
170

            
171
1
#[test]
172
#[cfg(feature = "encryption")]
173
1
fn encryption() -> anyhow::Result<()> {
174
1
    use bonsaidb_core::document::Document;
175
1

            
176
1
    let path = TestDirectory::new("encryption");
177
1
    let document_header = {
178
1
        let rt = tokio::runtime::Runtime::new()?;
179
1
        rt.block_on(async {
180
19
            let db = Database::open::<BasicSchema>(StorageConfiguration::new(&path)).await?;
181

            
182
1
            let document_header = db
183
1
                .collection::<EncryptedBasic>()
184
1
                .push(&EncryptedBasic::new("hello"))
185
1
                .await?;
186

            
187
            // Retrieve the document, showing that it was stored successfully.
188
1
            let doc = db
189
1
                .collection::<EncryptedBasic>()
190
1
                .get(document_header.id)
191
1
                .await?
192
1
                .expect("doc not found");
193
1
            assert_eq!(&doc.contents::<EncryptedBasic>()?.value, "hello");
194

            
195
1
            Result::<_, anyhow::Error>::Ok(document_header)
196
1
        })?
197
    };
198

            
199
    // By resetting the encryption key, we should be able to force an error in
200
    // decryption, which proves that the document was encrypted. To ensure the
201
    // server starts up and generates a new key, we must delete the sealing key.
202
1
    std::fs::remove_file(path.join("master-keys"))?;
203

            
204
1
    let rt = tokio::runtime::Runtime::new()?;
205
1
    rt.block_on(async move {
206
17
        let db = Database::open::<BasicSchema>(StorageConfiguration::new(&path)).await?;
207

            
208
        // Try retrieving the document, but expect an error decrypting.
209
1
        if let Err(bonsaidb_core::Error::Database(err)) = db
210
1
            .collection::<EncryptedBasic>()
211
1
            .get(document_header.id)
212
1
            .await
213
        {
214
1
            assert!(err.contains("vault"));
215
        } else {
216
            panic!("successfully retrieved encrypted document without keys");
217
        }
218

            
219
1
        Result::<_, anyhow::Error>::Ok(())
220
1
    })?;
221

            
222
1
    Ok(())
223
1
}
224

            
225
1
#[test]
226
1
fn expiration_after_close() -> anyhow::Result<()> {
227
    use bonsaidb_core::{keyvalue::KeyValue, test_util::TimingTest};
228
1
    loop {
229
1
        let path = TestDirectory::new("expiration-after-close");
230
1
        // To ensure full cleanup between each block, each runs in its own runtime;
231
1
        let timing = TimingTest::new(Duration::from_millis(100));
232
        // Set a key with an expiration, then close it. Then try to validate it
233
        // exists after opening, and then expires at the correct time.
234
        {
235
1
            let rt = tokio::runtime::Runtime::new()?;
236
1
            rt.block_on(async {
237
17
                let db = Database::open::<()>(StorageConfiguration::new(&path)).await?;
238

            
239
                // TODO This is a workaroun for the key-value expiration task
240
                // taking ownership of an instance of Database. If this async
241
                // task runs too quickly, sometimes things don't get cleaned up
242
                // if that task hasn't completed. This pause ensures the startup
243
                // tasks complete before we continue with the test. This should
244
                // be replaced with a proper shutdown call for the local
245
                // storage/database.
246
1
                tokio::time::sleep(Duration::from_millis(100)).await;
247

            
248
1
                db.set_key("a", &0_u32)
249
1
                    .expire_in(Duration::from_secs(3))
250
                    .await?;
251
1
                Result::<(), anyhow::Error>::Ok(())
252
1
            })?;
253
        }
254

            
255
        {
256
1
            let rt = tokio::runtime::Runtime::new()?;
257
1
            let retry = rt.block_on(async {
258
15
                let db = Database::open::<()>(StorageConfiguration::new(&path)).await?;
259

            
260
1
                let key = db.get_key("a").into().await?;
261

            
262
1
                if timing.elapsed() > Duration::from_secs(1) {
263
                    return Ok(true);
264
1
                }
265
1

            
266
1
                assert_eq!(key, Some(0_u32));
267

            
268
1
                timing.wait_until(Duration::from_secs(4)).await;
269

            
270
1
                assert!(db.get_key("a").await?.is_none());
271

            
272
1
                Result::<bool, anyhow::Error>::Ok(false)
273
1
            })?;
274

            
275
1
            if retry {
276
                println!("Retrying  expiration_after_close because it was too slow");
277
                continue;
278
1
            }
279
1
        }
280
1

            
281
1
        break;
282
1
    }
283
1
    Ok(())
284
1
}