1
use std::path::Path;
2
use std::time::Duration;
3

            
4
use bonsaidb::client::url::Url;
5
use bonsaidb::client::AsyncClient;
6
use bonsaidb::core::async_trait::async_trait;
7
use bonsaidb::core::connection::{
8
    AccessPolicy, AsyncConnection, AsyncLowLevelConnection, AsyncStorageConnection,
9
};
10
use bonsaidb::core::document::{CollectionDocument, CollectionHeader, Emit};
11
use bonsaidb::core::schema::view::map::Mappings;
12
use bonsaidb::core::schema::{
13
    Collection, CollectionMapReduce, CollectionName, DefaultSerialization, InsertError,
14
    NamedCollection, Qualified, ReduceResult, Schema, Schematic, SerializedCollection, View,
15
    ViewMapResult, ViewMappedValue, ViewSchema,
16
};
17
use bonsaidb::core::transaction::Transaction;
18
use bonsaidb::core::{define_basic_unique_mapped_view, Error};
19
use bonsaidb::local::config::Builder;
20
#[cfg(feature = "compression")]
21
use bonsaidb::local::config::Compression;
22
use bonsaidb::server::{DefaultPermissions, Server, ServerConfiguration};
23
use bonsaidb::AnyDatabase;
24
use serde::{Deserialize, Serialize};
25

            
26
use crate::execute::{Backend, BackendOperator, Measurements, Metric, Operator};
27
use crate::model::{Cart, Category, Customer, Order, Product, ProductReview};
28
use crate::plan::{
29
    AddProductToCart, Checkout, CreateCart, FindProduct, Load, LookupProduct, OperationResult,
30
    ReviewProduct,
31
};
32

            
33
pub enum Bonsai {
34
    Local,
35
    LocalLz4,
36
    Quic,
37
    WebSockets,
38
}
39

            
40
impl Bonsai {
41
48
    pub fn label(&self) -> &'static str {
42
48
        match self {
43
12
            Self::Local => "bonsaidb-local",
44
12
            Self::LocalLz4 => "bonsaidb-local+lz4",
45
12
            Self::Quic => "bonsaidb-quic",
46
12
            Self::WebSockets => "bonsaidb-ws",
47
        }
48
48
    }
49
}
50

            
51
pub struct BonsaiBackend {
52
    server: Server,
53
    kind: Bonsai,
54
}
55

            
56
pub struct BonsaiOperator {
57
    label: &'static str,
58
    database: AnyDatabase,
59
}
60

            
61
29706
#[derive(Debug, Schema)]
62
#[schema(name = "commerce", authority = "benchmarks", collections = [Product, Category, Customer, Order, Cart, ProductReview])]
63
pub enum Commerce {}
64

            
65
#[async_trait]
66
impl Backend for BonsaiBackend {
67
    type Config = Bonsai;
68
    type Operator = BonsaiOperator;
69

            
70
48
    fn label(&self) -> &'static str {
71
48
        self.kind.label()
72
48
    }
73

            
74
    #[cfg_attr(not(feature = "compression"), allow(unused_mut))]
75
16
    async fn new(config: Self::Config) -> Self {
76
16
        let path = Path::new("commerce-benchmarks.bonsaidb");
77
16
        if path.exists() {
78
15
            std::fs::remove_dir_all(path).unwrap();
79
15
        }
80
16
        let mut server_config = ServerConfiguration::new(path)
81
16
            .default_permissions(DefaultPermissions::AllowAll)
82
16
            .with_schema::<Commerce>()
83
16
            .unwrap();
84

            
85
        #[cfg(feature = "compression")]
86
        {
87
16
            if matches!(config, Bonsai::LocalLz4) {
88
4
                server_config = server_config.default_compression(Compression::Lz4);
89
12
            }
90
        }
91

            
92
48
        let server = Server::open(server_config).await.unwrap();
93
160
        server.install_self_signed_certificate(false).await.unwrap();
94
16
        server
95
16
            .create_database::<Commerce>("commerce", false)
96
32
            .await
97
16
            .unwrap();
98
16

            
99
16
        match config {
100
4
            Bonsai::Quic => {
101
4
                let server = server.clone();
102
4
                tokio::spawn(async move {
103
31
                    server.listen_on(7022).await.unwrap();
104
4
                });
105
4
            }
106
4
            Bonsai::WebSockets => {
107
4
                let server = server.clone();
108
4
                tokio::spawn(async move {
109
4
                    server
110
4
                        .listen_for_websockets_on("0.0.0.0:7023", false)
111
12
                        .await
112
                        .unwrap();
113
4
                });
114
4
            }
115
8
            Bonsai::Local | Bonsai::LocalLz4 => {}
116
        }
117
        // Allow the server time to start listening
118
16
        tokio::time::sleep(Duration::from_millis(1000)).await;
119

            
120
16
        BonsaiBackend {
121
16
            server,
122
16
            kind: config,
123
16
        }
124
32
    }
125

            
126
48
    async fn new_operator_async(&self) -> Self::Operator {
127
48
        let database = match self.kind {
128
            Bonsai::Local | Bonsai::LocalLz4 => {
129
24
                AnyDatabase::Local(self.server.database::<Commerce>("commerce").await.unwrap())
130
            }
131

            
132
            Bonsai::Quic => {
133
12
                let client = AsyncClient::build(Url::parse("bonsaidb://localhost:7022").unwrap())
134
12
                    .with_certificate(
135
12
                        self.server
136
12
                            .certificate_chain()
137
18
                            .await
138
12
                            .unwrap()
139
12
                            .into_end_entity_certificate(),
140
12
                    )
141
12
                    .build()
142
12
                    .unwrap();
143
12
                AnyDatabase::Networked(client.database::<Commerce>("commerce").await.unwrap())
144
            }
145
            Bonsai::WebSockets => {
146
12
                let client = AsyncClient::build(Url::parse("ws://localhost:7023").unwrap())
147
12
                    .build()
148
12
                    .unwrap();
149
12
                AnyDatabase::Networked(client.database::<Commerce>("commerce").await.unwrap())
150
            }
151
        };
152
48
        BonsaiOperator {
153
48
            database,
154
48
            label: self.label(),
155
48
        }
156
96
    }
157
}
158

            
159
impl BackendOperator for BonsaiOperator {
160
    type Id = u32;
161
}
162

            
163
#[async_trait]
164
impl Operator<Load, u32> for BonsaiOperator {
165
16
    async fn operate(
166
16
        &mut self,
167
16
        operation: &Load,
168
16
        _results: &[OperationResult<u32>],
169
16
        measurements: &Measurements,
170
16
    ) -> OperationResult<u32> {
171
16
        let measurement = measurements.begin(self.label, Metric::Load);
172
16
        let mut tx = Transaction::default();
173
204
        for (id, category) in &operation.initial_data.categories {
174
204
            category.insert_in_transaction(id, &mut tx).unwrap();
175
204
        }
176
1352
        for (id, product) in &operation.initial_data.products {
177
1352
            product.insert_in_transaction(id, &mut tx).unwrap();
178
1352
        }
179
3188
        for (id, customer) in &operation.initial_data.customers {
180
3188
            customer.insert_in_transaction(id, &mut tx).unwrap();
181
3188
        }
182
3380
        for (id, order) in &operation.initial_data.orders {
183
3380
            order.insert_in_transaction(id, &mut tx).unwrap();
184
3380
        }
185
1336
        for review in &operation.initial_data.reviews {
186
1336
            review.push_in_transaction(&mut tx).unwrap();
187
1336
        }
188
16
        self.database.apply_transaction(tx).await.unwrap();
189
16
        measurement.finish();
190
16
        OperationResult::Ok
191
32
    }
192
}
193

            
194
#[async_trait]
195
impl Operator<FindProduct, u32> for BonsaiOperator {
196
9372
    async fn operate(
197
9372
        &mut self,
198
9372
        operation: &FindProduct,
199
9372
        _results: &[OperationResult<u32>],
200
9372
        measurements: &Measurements,
201
9372
    ) -> OperationResult<u32> {
202
9372
        let measurement = measurements.begin(self.label, Metric::FindProduct);
203
9372
        let doc = Product::load_async(&operation.name, &self.database)
204
17131
            .await
205
9372
            .unwrap()
206
9372
            .unwrap();
207
9372
        let rating = self
208
9372
            .database
209
9372
            .view::<ProductReviewsByProduct>()
210
9372
            .with_key(&doc.header.id)
211
9372
            .with_access_policy(AccessPolicy::NoUpdate)
212
9372
            .reduce()
213
8579
            .await
214
9372
            .unwrap();
215
9372
        measurement.finish();
216
9372
        OperationResult::Product {
217
9372
            id: doc.header.id,
218
9372
            product: doc.contents,
219
9372
            rating: rating.average(),
220
9372
        }
221
18744
    }
222
}
223

            
224
#[async_trait]
225
impl Operator<LookupProduct, u32> for BonsaiOperator {
226
8684
    async fn operate(
227
8684
        &mut self,
228
8684
        operation: &LookupProduct,
229
8684
        _results: &[OperationResult<u32>],
230
8684
        measurements: &Measurements,
231
8684
    ) -> OperationResult<u32> {
232
8684
        let measurement = measurements.begin(self.label, Metric::LookupProduct);
233
8684
        let doc = Product::get_async(&operation.id, &self.database)
234
7761
            .await
235
8684
            .unwrap()
236
8684
            .unwrap();
237
8684
        let rating = self
238
8684
            .database
239
8684
            .view::<ProductReviewsByProduct>()
240
8684
            .with_key(&doc.header.id)
241
8684
            .with_access_policy(AccessPolicy::NoUpdate)
242
8684
            .reduce()
243
7933
            .await
244
8684
            .unwrap();
245
8684
        measurement.finish();
246
8684
        OperationResult::Product {
247
8684
            id: doc.header.id,
248
8684
            product: doc.contents,
249
8684
            rating: rating.average(),
250
8684
        }
251
17368
    }
252
}
253

            
254
#[async_trait]
255
impl Operator<CreateCart, u32> for BonsaiOperator {
256
1848
    async fn operate(
257
1848
        &mut self,
258
1848
        _operation: &CreateCart,
259
1848
        _results: &[OperationResult<u32>],
260
1848
        measurements: &Measurements,
261
1848
    ) -> OperationResult<u32> {
262
1848
        let measurement = measurements.begin(self.label, Metric::CreateCart);
263
1848
        let cart = Cart::default()
264
1848
            .push_into_async(&self.database)
265
1847
            .await
266
1848
            .unwrap();
267
1848
        measurement.finish();
268
1848
        OperationResult::Cart { id: cart.header.id }
269
3696
    }
270
}
271

            
272
#[async_trait]
273
impl Operator<AddProductToCart, u32> for BonsaiOperator {
274
4736
    async fn operate(
275
4736
        &mut self,
276
4736
        operation: &AddProductToCart,
277
4736
        results: &[OperationResult<u32>],
278
4736
        measurements: &Measurements,
279
4736
    ) -> OperationResult<u32> {
280
4736
        let cart = match &results[operation.cart.0] {
281
4736
            OperationResult::Cart { id } => *id,
282
            _ => unreachable!("Invalid operation result"),
283
        };
284
4736
        let product = match &results[operation.product.0] {
285
4736
            OperationResult::Product { id, .. } => *id,
286
            _ => unreachable!("Invalid operation result"),
287
        };
288

            
289
4736
        let measurement = measurements.begin(self.label, Metric::AddProductToCart);
290
4736
        let mut cart = Cart::get_async(&cart, &self.database)
291
4211
            .await
292
4736
            .unwrap()
293
4736
            .unwrap();
294
4736
        cart.contents.product_ids.push(product);
295
4736
        cart.update_async(&self.database).await.unwrap();
296
4736
        measurement.finish();
297
4736

            
298
4736
        OperationResult::CartProduct { id: product }
299
9472
    }
300
}
301

            
302
#[async_trait]
303
impl Operator<Checkout, u32> for BonsaiOperator {
304
528
    async fn operate(
305
528
        &mut self,
306
528
        operation: &Checkout,
307
528
        results: &[OperationResult<u32>],
308
528
        measurements: &Measurements,
309
528
    ) -> OperationResult<u32> {
310
528
        let cart = match &results[operation.cart.0] {
311
528
            OperationResult::Cart { id } => *id,
312
            _ => unreachable!("Invalid operation result"),
313
        };
314

            
315
528
        let measurement = measurements.begin(self.label, Metric::Checkout);
316
528
        let cart = Cart::get_async(&cart, &self.database)
317
457
            .await
318
528
            .unwrap()
319
528
            .unwrap();
320
528
        cart.delete_async(&self.database).await.unwrap();
321
528
        Order {
322
528
            customer_id: operation.customer_id,
323
528
            product_ids: cart.contents.product_ids,
324
528
        }
325
528
        .push_into_async(&self.database)
326
527
        .await
327
528
        .unwrap();
328
528
        measurement.finish();
329
528

            
330
528
        OperationResult::Ok
331
1056
    }
332
}
333

            
334
#[async_trait]
335
impl Operator<ReviewProduct, u32> for BonsaiOperator {
336
424
    async fn operate(
337
424
        &mut self,
338
424
        operation: &ReviewProduct,
339
424
        results: &[OperationResult<u32>],
340
424
        measurements: &Measurements,
341
424
    ) -> OperationResult<u32> {
342
424
        let product_id = match &results[operation.product_id.0] {
343
            OperationResult::Product { id, .. } => *id,
344
424
            OperationResult::CartProduct { id, .. } => *id,
345
            other => unreachable!("Invalid operation result {:?}", other),
346
        };
347

            
348
424
        let measurement = measurements.begin(self.label, Metric::RateProduct);
349
424
        let review = ProductReview {
350
424
            customer_id: operation.customer_id,
351
424
            product_id,
352
424
            review: operation.review.clone(),
353
424
            rating: operation.rating,
354
424
        };
355
424
        // https://github.com/khonsulabs/bonsaidb/issues/189
356
424
        match review.push_into_async(&self.database).await {
357
424
            Ok(_) => {}
358
            Err(InsertError {
359
                error:
360
                    bonsaidb::core::Error::UniqueKeyViolation {
361
                        existing_document, ..
362
                    },
363
                contents,
364
            }) => {
365
                CollectionDocument::<ProductReview> {
366
                    header: CollectionHeader::try_from(*existing_document).unwrap(),
367
                    contents,
368
                }
369
                .update_async(&self.database)
370
                .await
371
                .unwrap();
372
            }
373
            other => {
374
                other.unwrap();
375
            }
376
        }
377
        // Force the view to update.
378
424
        self.database
379
424
            .view::<ProductReviewsByProduct>()
380
424
            .with_key(&0)
381
424
            .reduce()
382
424
            .await
383
424
            .unwrap();
384
424
        measurement.finish();
385
424

            
386
424
        OperationResult::Ok
387
848
    }
388
}
389

            
390
impl Collection for Product {
391
    type PrimaryKey = u32;
392

            
393
260705
    fn collection_name() -> CollectionName {
394
260705
        CollectionName::new("benchmarks", "products")
395
260705
    }
396

            
397
    fn define_views(schema: &mut Schematic) -> Result<(), Error> {
398
29706
        schema.define_view(ProductsByName)?;
399
29706
        schema.define_view(ProductsByCategoryId)?;
400
29706
        Ok(())
401
29706
    }
402
}
403

            
404
impl DefaultSerialization for Product {}
405

            
406
define_basic_unique_mapped_view!(
407
    ProductsByName,
408
    Product,
409
    1,
410
    "by-name",
411
    String,
412
    (),
413
1352
    |document: CollectionDocument<Product>| { document.header.emit_key(document.contents.name) },
414
);
415

            
416
31074
#[derive(Debug, Clone, View, ViewSchema)]
417
#[view(collection = Product, key = u32, value = u32, name = "by-category")]
418
pub struct ProductsByCategoryId;
419

            
420
impl CollectionMapReduce for ProductsByCategoryId {
421
    fn map<'doc>(
422
        &self,
423
        document: CollectionDocument<<Self::View as View>::Collection>,
424
    ) -> ViewMapResult<'doc, Self> {
425
        let mut mappings = Mappings::default();
426
        for &id in &document.contents.category_ids {
427
            mappings = mappings.and(document.header.emit_key_and_value(id, 1)?);
428
        }
429
        Ok(mappings)
430
    }
431
}
432

            
433
impl NamedCollection for Product {
434
    type ByNameView = ProductsByName;
435
}
436

            
437
impl Collection for ProductReview {
438
    type PrimaryKey = u32;
439

            
440
205689
    fn collection_name() -> CollectionName {
441
205689
        CollectionName::new("benchmarks", "reviews")
442
205689
    }
443

            
444
    fn define_views(schema: &mut Schematic) -> Result<(), Error> {
445
29706
        schema.define_view(ProductReviewsByProduct)?;
446
29706
        Ok(())
447
29706
    }
448
}
449

            
450
impl DefaultSerialization for ProductReview {}
451

            
452
106674
#[derive(Debug, Clone, View, ViewSchema)]
453
#[view(collection = ProductReview, key = u32, value = ProductRatings, name = "by-product")]
454
pub struct ProductReviewsByProduct;
455

            
456
impl CollectionMapReduce for ProductReviewsByProduct {
457
1760
    fn map<'doc>(
458
1760
        &self,
459
1760
        document: CollectionDocument<<Self as View>::Collection>,
460
1760
    ) -> ViewMapResult<'doc, Self> {
461
1760
        document.header.emit_key_and_value(
462
1760
            document.contents.product_id,
463
1760
            ProductRatings {
464
1760
                total_score: document.contents.rating as u32,
465
1760
                ratings: 1,
466
1760
            },
467
1760
        )
468
1760
    }
469

            
470
5337
    fn reduce(
471
5337
        &self,
472
5337
        mappings: &[ViewMappedValue<Self::View>],
473
5337
        _rereduce: bool,
474
5337
    ) -> ReduceResult<Self::View> {
475
5337
        Ok(mappings
476
5337
            .iter()
477
5337
            .map(|mapping| mapping.value.clone())
478
5337
            .reduce(|a, b| ProductRatings {
479
2184
                total_score: a.total_score + b.total_score,
480
2184
                ratings: a.ratings + b.ratings,
481
5337
            })
482
5337
            .unwrap_or_default())
483
5337
    }
484
}
485

            
486
108520
#[derive(Debug, Serialize, Deserialize, Default, Clone)]
487
pub struct ProductRatings {
488
    pub total_score: u32,
489
    pub ratings: u32,
490
}
491

            
492
impl ProductRatings {
493
18056
    pub fn average(&self) -> Option<f32> {
494
18056
        if self.ratings > 0 {
495
13823
            Some(self.total_score as f32 / self.ratings as f32)
496
        } else {
497
4233
            None
498
        }
499
18056
    }
500
}
501

            
502
impl Collection for Category {
503
    type PrimaryKey = u32;
504

            
505
29910
    fn collection_name() -> CollectionName {
506
29910
        CollectionName::new("benchmarks", "categories")
507
29910
    }
508

            
509
29706
    fn define_views(_schema: &mut Schematic) -> Result<(), Error> {
510
29706
        Ok(())
511
29706
    }
512
}
513

            
514
impl DefaultSerialization for Category {}
515

            
516
impl Collection for Customer {
517
    type PrimaryKey = u32;
518

            
519
32894
    fn collection_name() -> CollectionName {
520
32894
        CollectionName::new("benchmarks", "customers")
521
32894
    }
522

            
523
29706
    fn define_views(_schema: &mut Schematic) -> Result<(), Error> {
524
29706
        Ok(())
525
29706
    }
526
}
527

            
528
impl DefaultSerialization for Customer {}
529

            
530
impl Collection for Order {
531
    type PrimaryKey = u32;
532

            
533
33614
    fn collection_name() -> CollectionName {
534
33614
        CollectionName::new("benchmarks", "orders")
535
33614
    }
536

            
537
29706
    fn define_views(_schema: &mut Schematic) -> Result<(), Error> {
538
29706
        Ok(())
539
29706
    }
540
}
541

            
542
impl DefaultSerialization for Order {}
543

            
544
impl Collection for Cart {
545
    type PrimaryKey = u32;
546

            
547
42082
    fn collection_name() -> CollectionName {
548
42082
        CollectionName::new("benchmarks", "carts")
549
42082
    }
550

            
551
29706
    fn define_views(_schema: &mut Schematic) -> Result<(), Error> {
552
29706
        Ok(())
553
29706
    }
554
}
555

            
556
impl DefaultSerialization for Cart {}