1
use std::{
2
    collections::HashSet,
3
    path::{Path, PathBuf},
4
    time::Duration,
5
};
6

            
7
use bonsaidb_core::{
8
    connection::{Connection, StorageConnection},
9
    document::{CollectionDocument, Emit},
10
    keyvalue::KeyValue,
11
    schema::{
12
        Collection, CollectionViewSchema, ReduceResult, Schema, SerializedCollection, View,
13
        ViewMappedValue,
14
    },
15
    test_util::TestDirectory,
16
};
17
use fs_extra::dir;
18
use serde::{Deserialize, Serialize};
19

            
20
use crate::{
21
    config::{Builder, StorageConfiguration},
22
    Database, Storage,
23
};
24

            
25
12
#[derive(Schema, Debug)]
26
#[schema(name = "schema-a", collections = [Basic], core = bonsaidb_core)]
27
struct SchemaA;
28

            
29
20
#[derive(Schema, Debug)]
30
#[schema(name = "schema-b", collections = [Unique, Basic], core = bonsaidb_core)]
31
struct SchemaB;
32

            
33
72
#[derive(Collection, Debug, Serialize, Deserialize)]
34
#[collection(name = "unique", core = bonsaidb_core, views = [UniqueView])]
35
struct Unique {
36
    name: String,
37
}
38

            
39
74
#[derive(View, Debug, Clone)]
40
#[view(collection = Unique, key = String, core = bonsaidb_core)]
41
struct UniqueView;
42

            
43
impl CollectionViewSchema for UniqueView {
44
    type View = UniqueView;
45

            
46
62
    fn unique(&self) -> bool {
47
62
        true
48
62
    }
49

            
50
18
    fn map(
51
18
        &self,
52
18
        document: CollectionDocument<<Self::View as View>::Collection>,
53
18
    ) -> bonsaidb_core::schema::ViewMapResult<Self::View> {
54
18
        document.header.emit_key(document.contents.name)
55
18
    }
56
}
57

            
58
720
#[derive(Collection, Clone, Debug, Serialize, Deserialize)]
59
#[collection(name = "basic", views = [Scores], core = bonsaidb_core)]
60
struct Basic {
61
    key: String,
62
    value: u32,
63
}
64

            
65
180
#[derive(Clone, View, Debug)]
66
#[view(collection = Basic, key = String, value = u32, core = bonsaidb_core)]
67
struct Scores;
68

            
69
impl CollectionViewSchema for Scores {
70
    type View = Scores;
71

            
72
60
    fn map(
73
60
        &self,
74
60
        document: CollectionDocument<<Self::View as View>::Collection>,
75
60
    ) -> bonsaidb_core::schema::ViewMapResult<Self::View> {
76
60
        document
77
60
            .header
78
60
            .emit_key_and_value(document.contents.key, document.contents.value)
79
60
    }
80

            
81
36
    fn reduce(
82
36
        &self,
83
36
        mappings: &[ViewMappedValue<Self::View>],
84
36
        _rereduce: bool,
85
36
    ) -> ReduceResult<Self::View> {
86
60
        Ok(mappings.iter().map(|map| map.value).sum())
87
36
    }
88
}
89

            
90
1
async fn create_databases(path: impl AsRef<Path>) {
91
1
    let storage = Storage::open(
92
1
        StorageConfiguration::new(path)
93
1
            .with_schema::<SchemaA>()
94
1
            .unwrap()
95
1
            .with_schema::<SchemaB>()
96
1
            .unwrap(),
97
17
    )
98
17
    .await
99
1
    .unwrap();
100
1

            
101
1
    storage
102
2
        .create_database::<SchemaA>("a-1", false)
103
2
        .await
104
1
        .unwrap();
105
6
    write_basic(storage.database::<SchemaA>("a-1").await.unwrap()).await;
106

            
107
1
    storage
108
2
        .create_database::<SchemaA>("a-2", false)
109
2
        .await
110
1
        .unwrap();
111
6
    write_basic(storage.database::<SchemaA>("a-2").await.unwrap()).await;
112

            
113
1
    storage
114
2
        .create_database::<SchemaB>("b-1", false)
115
2
        .await
116
1
        .unwrap();
117
4
    write_unique(storage.database::<SchemaB>("b-1").await.unwrap()).await;
118
6
    write_basic(storage.database::<SchemaB>("b-1").await.unwrap()).await;
119

            
120
1
    storage
121
2
        .create_database::<SchemaB>("b-2", false)
122
2
        .await
123
1
        .unwrap();
124
4
    write_unique(storage.database::<SchemaB>("b-2").await.unwrap()).await;
125
6
    write_basic(storage.database::<SchemaB>("b-2").await.unwrap()).await;
126
1
    drop(storage);
127
1
}
128

            
129
4
async fn write_basic(db: Database) {
130
4
    db.set_numeric_key("integer", 1_u64).await.unwrap();
131
4
    db.set_key("string", &"test").await.unwrap();
132
4
    // Give the kv-store time to persist
133
4
    tokio::time::sleep(Duration::from_millis(10)).await;
134

            
135
4
    Basic {
136
4
        key: String::from("a"),
137
4
        value: 1,
138
4
    }
139
4
    .push_into(&db)
140
4
    .await
141
4
    .unwrap();
142
4
    Basic {
143
4
        key: String::from("a"),
144
4
        value: 2,
145
4
    }
146
4
    .push_into(&db)
147
4
    .await
148
4
    .unwrap();
149
4
    Basic {
150
4
        key: String::from("b"),
151
4
        value: 3,
152
4
    }
153
4
    .push_into(&db)
154
4
    .await
155
4
    .unwrap();
156
4
    Basic {
157
4
        key: String::from("b"),
158
4
        value: 4,
159
4
    }
160
4
    .push_into(&db)
161
4
    .await
162
4
    .unwrap();
163
4
    Basic {
164
4
        key: String::from("c"),
165
4
        value: 5,
166
4
    }
167
4
    .push_into(&db)
168
4
    .await
169
4
    .unwrap();
170
4
}
171

            
172
2
async fn write_unique(db: Database) {
173
2
    Unique {
174
2
        name: String::from("jon"),
175
2
    }
176
6
    .push_into(&db)
177
6
    .await
178
2
    .unwrap();
179
2
    Unique {
180
2
        name: String::from("jane"),
181
2
    }
182
2
    .push_into(&db)
183
2
    .await
184
2
    .unwrap();
185
2
}
186

            
187
3
async fn test_databases(path: impl AsRef<Path>) {
188
3
    let storage = Storage::open(
189
3
        StorageConfiguration::new(path)
190
3
            .with_schema::<SchemaA>()
191
3
            .unwrap()
192
3
            .with_schema::<SchemaB>()
193
3
            .unwrap(),
194
38
    )
195
38
    .await
196
3
    .unwrap();
197
3

            
198
18
    test_basic(storage.database::<SchemaA>("a-1").await.unwrap()).await;
199
18
    test_basic(storage.database::<SchemaA>("a-2").await.unwrap()).await;
200
14
    test_unique(storage.database::<SchemaB>("b-1").await.unwrap()).await;
201
18
    test_basic(storage.database::<SchemaB>("b-1").await.unwrap()).await;
202
14
    test_unique(storage.database::<SchemaB>("b-2").await.unwrap()).await;
203
18
    test_basic(storage.database::<SchemaB>("b-2").await.unwrap()).await;
204
3
}
205

            
206
12
async fn test_basic(db: Database) {
207
12
    assert_eq!(db.get_key("integer").into_u64().await.unwrap(), Some(1));
208
    assert_eq!(
209
12
        db.get_key("string")
210
12
            .into::<String>()
211
            .await
212
12
            .unwrap()
213
12
            .as_deref(),
214
        Some("test")
215
    );
216

            
217
12
    let all_docs = Basic::all(&db).await.unwrap();
218
12
    assert_eq!(all_docs.len(), 5);
219

            
220
12
    let a_scores = db
221
12
        .view::<Scores>()
222
12
        .with_key(String::from("a"))
223
36
        .query_with_collection_docs()
224
36
        .await
225
12
        .unwrap();
226
12
    assert_eq!(a_scores.mappings.len(), 2);
227
12
    assert_eq!(a_scores.documents.len(), 2);
228

            
229
36
    for mapping in db.view::<Scores>().reduce_grouped().await.unwrap() {
230
36
        let expected_value = match mapping.key.as_str() {
231
36
            "a" => 3,
232
24
            "b" => 7,
233
12
            "c" => 5,
234
            _ => unreachable!(),
235
        };
236
36
        assert_eq!(mapping.value, expected_value);
237
    }
238

            
239
12
    let transactions = db.list_executed_transactions(None, None).await.unwrap();
240
12
    let kv_transactions = transactions
241
12
        .iter()
242
96
        .filter_map(|t| t.changes.keys())
243
12
        .collect::<Vec<_>>();
244
12
    assert_eq!(kv_transactions.len(), 2);
245
12
    let keys = kv_transactions
246
12
        .iter()
247
24
        .flat_map(|changed_keys| {
248
24
            changed_keys
249
24
                .iter()
250
24
                .map(|changed_key| changed_key.key.as_str())
251
24
        })
252
12
        .collect::<HashSet<_>>();
253
12
    assert_eq!(keys.len(), 2);
254
12
    assert!(keys.contains("string"));
255
12
    assert!(keys.contains("integer"));
256

            
257
96
    let basic_transactions = transactions.iter().filter_map(|t| {
258
96
        t.changes.documents().and_then(|changes| {
259
72
            changes
260
72
                .collections
261
72
                .contains(&Basic::collection_name())
262
72
                .then(|| &changes.documents)
263
96
        })
264
96
    });
265
12
    assert_eq!(basic_transactions.count(), 5);
266
12
}
267

            
268
6
async fn test_unique(db: Database) {
269
6
    // Attempt to violate a unique key violation before accessing the view. This
270
6
    // tests the upgrade fpath for view verisons, if things have changed.
271
6
    Unique {
272
6
        name: String::from("jon"),
273
6
    }
274
16
    .push_into(&db)
275
16
    .await
276
6
    .unwrap_err();
277
6
    let mappings = db
278
6
        .view::<UniqueView>()
279
6
        .with_key(String::from("jane"))
280
12
        .query_with_collection_docs()
281
12
        .await
282
6
        .unwrap();
283
6
    assert_eq!(mappings.len(), 1);
284
6
    let jane = mappings.into_iter().next().unwrap().document;
285
6
    assert_eq!(jane.contents.name, "jane");
286
6
}
287

            
288
1
#[tokio::test]
289
1
async fn self_compatibility() {
290
1
    let dir = TestDirectory::new("self-compatibiltiy.bonsaidb");
291
61
    create_databases(&dir).await;
292
47
    test_databases(&dir).await;
293
1
    if std::env::var("UPDATE_COMPATIBILITY")
294
1
        .map(|v| !v.is_empty())
295
1
        .unwrap_or_default()
296
    {
297
        let version = format!(
298
            "{}.{}",
299
            std::env::var("CARGO_PKG_VERSION_MAJOR").unwrap(),
300
            std::env::var("CARGO_PKG_VERSION_MINOR").unwrap()
301
        );
302
        let path = PathBuf::from(std::env::var("CARGO_MANIFEST_DIR").unwrap());
303
        let version_path = path
304
            .join("src")
305
            .join("tests")
306
            .join("compatibility")
307
            .join(version);
308
        if version_path.exists() {
309
            std::fs::remove_dir_all(&version_path).unwrap();
310
        }
311
        dir::copy(
312
            &dir,
313
            version_path,
314
            &dir::CopyOptions {
315
                content_only: true,
316
                copy_inside: true,
317
                ..dir::CopyOptions::default()
318
            },
319
        )
320
        .unwrap();
321
1
    }
322
1
}
323

            
324
2
async fn test_compatibility(dir: &str) {
325
2
    let project_dir = PathBuf::from(
326
2
        std::env::var("CARGO_MANIFEST_DIR")
327
2
            .unwrap_or_else(|_| String::from("./crates/bonsaidb-local")),
328
2
    );
329
2

            
330
2
    let test_dir = TestDirectory::new(format!("v{}-compatibility.nebari", dir));
331
2
    dir::copy(
332
2
        project_dir
333
2
            .join("src")
334
2
            .join("tests")
335
2
            .join("compatibility")
336
2
            .join(dir),
337
2
        &test_dir,
338
2
        &dir::CopyOptions {
339
2
            content_only: true,
340
2
            copy_inside: true,
341
2
            ..dir::CopyOptions::default()
342
2
        },
343
2
    )
344
2
    .unwrap();
345
2

            
346
103
    test_databases(&test_dir).await;
347
2
}
348

            
349
1
#[tokio::test]
350
1
async fn compatible_with_0_1_x() {
351
51
    test_compatibility("0.1").await;
352
1
}
353

            
354
1
#[tokio::test]
355
1
async fn compatible_with_0_2_x() {
356
52
    test_compatibility("0.2").await;
357
1
}