1
use std::{path::Path, time::Duration};
2

            
3
#[cfg(feature = "compression")]
4
use bonsaidb::local::config::Compression;
5
use bonsaidb::{
6
    client::{url::Url, Client},
7
    core::{
8
        async_trait::async_trait,
9
        connection::{
10
            AccessPolicy, AsyncConnection, AsyncLowLevelConnection, AsyncStorageConnection,
11
        },
12
        define_basic_unique_mapped_view,
13
        document::{CollectionDocument, CollectionHeader, Emit},
14
        schema::{
15
            view::map::Mappings, Collection, CollectionName, CollectionViewSchema,
16
            DefaultSerialization, InsertError, NamedCollection, Qualified, ReduceResult, Schema,
17
            Schematic, SerializedCollection, View, ViewMapResult, ViewMappedValue,
18
        },
19
        transaction::{self, Transaction},
20
        Error,
21
    },
22
    local::config::Builder,
23
    server::{DefaultPermissions, Server, ServerConfiguration},
24
    AnyDatabase,
25
};
26
use serde::{Deserialize, Serialize};
27

            
28
use crate::{
29
    execute::{Backend, BackendOperator, Measurements, Metric, Operator},
30
    model::{Cart, Category, Customer, Order, Product, ProductReview},
31
    plan::{
32
        AddProductToCart, Checkout, CreateCart, FindProduct, Load, LookupProduct, OperationResult,
33
        ReviewProduct,
34
    },
35
};
36

            
37
pub enum Bonsai {
38
    Local,
39
    LocalLz4,
40
    Quic,
41
    WebSockets,
42
}
43

            
44
impl Bonsai {
45
48
    pub fn label(&self) -> &'static str {
46
48
        match self {
47
12
            Self::Local => "bonsaidb-local",
48
12
            Self::LocalLz4 => "bonsaidb-local+lz4",
49
12
            Self::Quic => "bonsaidb-quic",
50
12
            Self::WebSockets => "bonsaidb-ws",
51
        }
52
48
    }
53
}
54

            
55
pub struct BonsaiBackend {
56
    server: Server,
57
    kind: Bonsai,
58
}
59

            
60
pub struct BonsaiOperator {
61
    label: &'static str,
62
    database: AnyDatabase,
63
}
64

            
65
28070
#[derive(Debug, Schema)]
66
#[schema(name = "commerce", authority = "benchmarks", collections = [Product, Category, Customer, Order, Cart, ProductReview])]
67
pub enum Commerce {}
68

            
69
#[async_trait]
70
impl Backend for BonsaiBackend {
71
    type Operator = BonsaiOperator;
72
    type Config = Bonsai;
73

            
74
48
    fn label(&self) -> &'static str {
75
48
        self.kind.label()
76
48
    }
77

            
78
    #[cfg_attr(not(feature = "compression"), allow(unused_mut))]
79
16
    async fn new(config: Self::Config) -> Self {
80
16
        let path = Path::new("commerce-benchmarks.bonsaidb");
81
16
        if path.exists() {
82
15
            std::fs::remove_dir_all(path).unwrap();
83
15
        }
84
16
        let mut server_config = ServerConfiguration::new(path)
85
16
            .default_permissions(DefaultPermissions::AllowAll)
86
16
            .with_schema::<Commerce>()
87
16
            .unwrap();
88

            
89
        #[cfg(feature = "compression")]
90
        {
91
16
            if matches!(config, Bonsai::LocalLz4) {
92
4
                server_config = server_config.default_compression(Compression::Lz4);
93
12
            }
94
        }
95

            
96
48
        let server = Server::open(server_config).await.unwrap();
97
160
        server.install_self_signed_certificate(false).await.unwrap();
98
16
        server
99
32
            .create_database::<Commerce>("commerce", false)
100
32
            .await
101
16
            .unwrap();
102
16

            
103
16
        match config {
104
4
            Bonsai::Quic => {
105
4
                let server = server.clone();
106
4
                tokio::spawn(async move {
107
30
                    server.listen_on(7022).await.unwrap();
108
4
                });
109
4
            }
110
4
            Bonsai::WebSockets => {
111
4
                let server = server.clone();
112
4
                tokio::spawn(async move {
113
4
                    server
114
9
                        .listen_for_websockets_on("0.0.0.0:7023", false)
115
9
                        .await
116
                        .unwrap();
117
4
                });
118
4
            }
119
8
            Bonsai::Local | Bonsai::LocalLz4 => {}
120
        }
121
        // Allow the server time to start listening
122
16
        tokio::time::sleep(Duration::from_millis(1000)).await;
123

            
124
16
        BonsaiBackend {
125
16
            server,
126
16
            kind: config,
127
16
        }
128
32
    }
129

            
130
48
    async fn new_operator_async(&self) -> Self::Operator {
131
48
        let database = match self.kind {
132
            Bonsai::Local | Bonsai::LocalLz4 => {
133
24
                AnyDatabase::Local(self.server.database::<Commerce>("commerce").await.unwrap())
134
            }
135

            
136
            Bonsai::Quic => {
137
12
                let client = Client::build(Url::parse("bonsaidb://localhost:7022").unwrap())
138
12
                    .with_certificate(
139
12
                        self.server
140
18
                            .certificate_chain()
141
18
                            .await
142
12
                            .unwrap()
143
12
                            .into_end_entity_certificate(),
144
12
                    )
145
12
                    .finish()
146
12
                    .unwrap();
147
12
                AnyDatabase::Networked(client.database::<Commerce>("commerce").await.unwrap())
148
            }
149
            Bonsai::WebSockets => {
150
12
                let client = Client::build(Url::parse("ws://localhost:7023").unwrap())
151
12
                    .finish()
152
12
                    .unwrap();
153
12
                AnyDatabase::Networked(client.database::<Commerce>("commerce").await.unwrap())
154
            }
155
        };
156
48
        BonsaiOperator {
157
48
            database,
158
48
            label: self.label(),
159
48
        }
160
96
    }
161
}
162

            
163
impl BackendOperator for BonsaiOperator {}
164

            
165
#[async_trait]
166
impl Operator<Load> for BonsaiOperator {
167
16
    async fn operate(
168
16
        &mut self,
169
16
        operation: &Load,
170
16
        _results: &[OperationResult],
171
16
        measurements: &Measurements,
172
16
    ) -> OperationResult {
173
16
        let measurement = measurements.begin(self.label, Metric::Load);
174
16
        let mut tx = Transaction::default();
175
184
        for (id, category) in &operation.initial_data.categories {
176
184
            tx.push(
177
184
                transaction::Operation::insert_serialized::<Category>(Some(*id), category).unwrap(),
178
184
            );
179
184
        }
180
1816
        for (id, product) in &operation.initial_data.products {
181
1816
            tx.push(
182
1816
                transaction::Operation::insert_serialized::<Product>(Some(*id), product).unwrap(),
183
1816
            );
184
1816
        }
185
1280
        for (id, customer) in &operation.initial_data.customers {
186
1280
            tx.push(
187
1280
                transaction::Operation::insert_serialized::<Customer>(Some(*id), customer).unwrap(),
188
1280
            );
189
1280
        }
190
1880
        for (id, order) in &operation.initial_data.orders {
191
1880
            tx.push(transaction::Operation::insert_serialized::<Order>(Some(*id), order).unwrap());
192
1880
        }
193
888
        for review in &operation.initial_data.reviews {
194
888
            tx.push(
195
888
                transaction::Operation::insert_serialized::<ProductReview>(None, review).unwrap(),
196
888
            );
197
888
        }
198
16
        self.database.apply_transaction(tx).await.unwrap();
199
16
        measurement.finish();
200
16
        OperationResult::Ok
201
32
    }
202
}
203

            
204
#[async_trait]
205
impl Operator<FindProduct> for BonsaiOperator {
206
8756
    async fn operate(
207
8756
        &mut self,
208
8756
        operation: &FindProduct,
209
8756
        _results: &[OperationResult],
210
8756
        measurements: &Measurements,
211
8756
    ) -> OperationResult {
212
8756
        let measurement = measurements.begin(self.label, Metric::FindProduct);
213
16032
        let doc = Product::load_async(&operation.name, &self.database)
214
16032
            .await
215
8756
            .unwrap()
216
8756
            .unwrap();
217
8756
        let rating = self
218
8756
            .database
219
8756
            .view::<ProductReviewsByProduct>()
220
8756
            .with_key(doc.header.id)
221
8756
            .with_access_policy(AccessPolicy::NoUpdate)
222
8756
            .reduce()
223
7936
            .await
224
8756
            .unwrap();
225
8756
        measurement.finish();
226
8756
        OperationResult::Product {
227
8756
            id: doc.header.id,
228
8756
            product: doc.contents,
229
8756
            rating: rating.average(),
230
8756
        }
231
17512
    }
232
}
233

            
234
#[async_trait]
235
impl Operator<LookupProduct> for BonsaiOperator {
236
8452
    async fn operate(
237
8452
        &mut self,
238
8452
        operation: &LookupProduct,
239
8452
        _results: &[OperationResult],
240
8452
        measurements: &Measurements,
241
8452
    ) -> OperationResult {
242
8452
        let measurement = measurements.begin(self.label, Metric::LookupProduct);
243
8452
        let doc = Product::get_async(operation.id, &self.database)
244
7606
            .await
245
8452
            .unwrap()
246
8452
            .unwrap();
247
8452
        let rating = self
248
8452
            .database
249
8452
            .view::<ProductReviewsByProduct>()
250
8452
            .with_key(doc.header.id)
251
8452
            .with_access_policy(AccessPolicy::NoUpdate)
252
8452
            .reduce()
253
7685
            .await
254
8452
            .unwrap();
255
8452
        measurement.finish();
256
8452
        OperationResult::Product {
257
8452
            id: doc.header.id,
258
8452
            product: doc.contents,
259
8452
            rating: rating.average(),
260
8452
        }
261
16904
    }
262
}
263

            
264
#[async_trait]
265
impl Operator<CreateCart> for BonsaiOperator {
266
1828
    async fn operate(
267
1828
        &mut self,
268
1828
        _operation: &CreateCart,
269
1828
        _results: &[OperationResult],
270
1828
        measurements: &Measurements,
271
1828
    ) -> OperationResult {
272
1828
        let measurement = measurements.begin(self.label, Metric::CreateCart);
273
1828
        let cart = Cart::default()
274
1828
            .push_into_async(&self.database)
275
1654
            .await
276
1828
            .unwrap();
277
1828
        measurement.finish();
278
1828
        OperationResult::Cart { id: cart.header.id }
279
3656
    }
280
}
281

            
282
#[async_trait]
283
impl Operator<AddProductToCart> for BonsaiOperator {
284
4444
    async fn operate(
285
4444
        &mut self,
286
4444
        operation: &AddProductToCart,
287
4444
        results: &[OperationResult],
288
4444
        measurements: &Measurements,
289
4444
    ) -> OperationResult {
290
4444
        let cart = match &results[operation.cart.0] {
291
4444
            OperationResult::Cart { id } => *id,
292
            _ => unreachable!("Invalid operation result"),
293
        };
294
4444
        let product = match &results[operation.product.0] {
295
4444
            OperationResult::Product { id, .. } => *id,
296
            _ => unreachable!("Invalid operation result"),
297
        };
298

            
299
4444
        let measurement = measurements.begin(self.label, Metric::AddProductToCart);
300
4444
        let mut cart = Cart::get_async(cart, &self.database)
301
4008
            .await
302
4444
            .unwrap()
303
4444
            .unwrap();
304
4444
        cart.contents.product_ids.push(product);
305
4444
        cart.update_async(&self.database).await.unwrap();
306
4444
        measurement.finish();
307
4444

            
308
4444
        OperationResult::CartProduct { id: product }
309
8888
    }
310
}
311

            
312
#[async_trait]
313
impl Operator<Checkout> for BonsaiOperator {
314
460
    async fn operate(
315
460
        &mut self,
316
460
        operation: &Checkout,
317
460
        results: &[OperationResult],
318
460
        measurements: &Measurements,
319
460
    ) -> OperationResult {
320
460
        let cart = match &results[operation.cart.0] {
321
460
            OperationResult::Cart { id } => *id,
322
            _ => unreachable!("Invalid operation result"),
323
        };
324

            
325
460
        let measurement = measurements.begin(self.label, Metric::Checkout);
326
460
        let cart = Cart::get_async(cart, &self.database)
327
399
            .await
328
460
            .unwrap()
329
460
            .unwrap();
330
460
        cart.delete_async(&self.database).await.unwrap();
331
460
        Order {
332
460
            customer_id: operation.customer_id,
333
460
            product_ids: cart.contents.product_ids,
334
460
        }
335
460
        .push_into_async(&self.database)
336
411
        .await
337
460
        .unwrap();
338
460
        measurement.finish();
339
460

            
340
460
        OperationResult::Ok
341
920
    }
342
}
343

            
344
#[async_trait]
345
impl Operator<ReviewProduct> for BonsaiOperator {
346
348
    async fn operate(
347
348
        &mut self,
348
348
        operation: &ReviewProduct,
349
348
        results: &[OperationResult],
350
348
        measurements: &Measurements,
351
348
    ) -> OperationResult {
352
348
        let product_id = match &results[operation.product_id.0] {
353
            OperationResult::Product { id, .. } => *id,
354
348
            OperationResult::CartProduct { id, .. } => *id,
355
            other => unreachable!("Invalid operation result {:?}", other),
356
        };
357

            
358
348
        let measurement = measurements.begin(self.label, Metric::RateProduct);
359
348
        let review = ProductReview {
360
348
            customer_id: operation.customer_id,
361
348
            product_id,
362
348
            review: operation.review.clone(),
363
348
            rating: operation.rating,
364
348
        };
365
348
        // https://github.com/khonsulabs/bonsaidb/issues/189
366
348
        match review.push_into_async(&self.database).await {
367
348
            Ok(_) => {}
368
            Err(InsertError {
369
                error:
370
                    bonsaidb::core::Error::UniqueKeyViolation {
371
                        existing_document, ..
372
                    },
373
                contents,
374
            }) => {
375
                CollectionDocument::<ProductReview> {
376
                    header: CollectionHeader::try_from(*existing_document).unwrap(),
377
                    contents,
378
                }
379
                .update_async(&self.database)
380
                .await
381
                .unwrap();
382
            }
383
            other => {
384
                other.unwrap();
385
            }
386
        }
387
        // Force the view to update.
388
348
        self.database
389
348
            .view::<ProductReviewsByProduct>()
390
348
            .with_key(0)
391
348
            .reduce()
392
348
            .await
393
348
            .unwrap();
394
348
        measurement.finish();
395
348

            
396
348
        OperationResult::Ok
397
696
    }
398
}
399

            
400
impl Collection for Product {
401
    type PrimaryKey = u32;
402

            
403
247668
    fn collection_name() -> CollectionName {
404
247668
        CollectionName::new("benchmarks", "products")
405
247668
    }
406

            
407
    fn define_views(schema: &mut Schematic) -> Result<(), Error> {
408
28070
        schema.define_view(ProductsByName)?;
409
28070
        schema.define_view(ProductsByCategoryId)?;
410
28070
        Ok(())
411
28070
    }
412
}
413

            
414
impl DefaultSerialization for Product {}
415

            
416
define_basic_unique_mapped_view!(
417
    ProductsByName,
418
    Product,
419
    1,
420
    "by-name",
421
    String,
422
    (),
423
1816
    |document: CollectionDocument<Product>| { document.header.emit_key(document.contents.name) },
424
);
425

            
426
29902
#[derive(Debug, Clone, View)]
427
#[view(collection = Product, key = u32, value = u32, name = "by-category")]
428
pub struct ProductsByCategoryId;
429

            
430
impl CollectionViewSchema for ProductsByCategoryId {
431
    type View = Self;
432

            
433
    fn map(
434
        &self,
435
        document: CollectionDocument<<Self::View as View>::Collection>,
436
    ) -> ViewMapResult<Self::View> {
437
        let mut mappings = Mappings::default();
438
        for &id in &document.contents.category_ids {
439
            mappings = mappings.and(document.header.emit_key_and_value(id, 1)?);
440
        }
441
        Ok(mappings)
442
    }
443
}
444

            
445
impl NamedCollection for Product {
446
    type ByNameView = ProductsByName;
447
}
448

            
449
impl Collection for ProductReview {
450
    type PrimaryKey = u32;
451

            
452
159387
    fn collection_name() -> CollectionName {
453
159387
        CollectionName::new("benchmarks", "reviews")
454
159387
    }
455

            
456
    fn define_views(schema: &mut Schematic) -> Result<(), Error> {
457
28070
        schema.define_view(ProductReviewsByProduct)?;
458
28070
        Ok(())
459
28070
    }
460
}
461

            
462
impl DefaultSerialization for ProductReview {}
463

            
464
83382
#[derive(Debug, Clone, View)]
465
#[view(collection = ProductReview, key = u32, value = ProductRatings, name = "by-product")]
466
pub struct ProductReviewsByProduct;
467

            
468
impl CollectionViewSchema for ProductReviewsByProduct {
469
    type View = Self;
470

            
471
1236
    fn map(
472
1236
        &self,
473
1236
        document: CollectionDocument<<Self as View>::Collection>,
474
1236
    ) -> ViewMapResult<Self::View> {
475
1236
        document.header.emit_key_and_value(
476
1236
            document.contents.product_id,
477
1236
            ProductRatings {
478
1236
                total_score: document.contents.rating as u32,
479
1236
                ratings: 1,
480
1236
            },
481
1236
        )
482
1236
    }
483

            
484
11489
    fn reduce(
485
11489
        &self,
486
11489
        mappings: &[ViewMappedValue<Self::View>],
487
11489
        _rereduce: bool,
488
11489
    ) -> ReduceResult<Self::View> {
489
11489
        Ok(mappings
490
11489
            .iter()
491
11489
            .map(|mapping| mapping.value.clone())
492
11489
            .reduce(|a, b| ProductRatings {
493
400
                total_score: a.total_score + b.total_score,
494
400
                ratings: a.ratings + b.ratings,
495
11489
            })
496
11489
            .unwrap_or_default())
497
11489
    }
498
}
499

            
500
95175
#[derive(Debug, Serialize, Deserialize, Default, Clone)]
501
pub struct ProductRatings {
502
    pub total_score: u32,
503
    pub ratings: u32,
504
}
505

            
506
impl ProductRatings {
507
17208
    pub fn average(&self) -> Option<f32> {
508
17208
        if self.ratings > 0 {
509
6798
            Some(self.total_score as f32 / self.ratings as f32)
510
        } else {
511
10410
            None
512
        }
513
17208
    }
514
}
515

            
516
impl Collection for Category {
517
    type PrimaryKey = u32;
518

            
519
28254
    fn collection_name() -> CollectionName {
520
28254
        CollectionName::new("benchmarks", "categories")
521
28254
    }
522

            
523
28070
    fn define_views(_schema: &mut Schematic) -> Result<(), Error> {
524
28070
        Ok(())
525
28070
    }
526
}
527

            
528
impl DefaultSerialization for Category {}
529

            
530
impl Collection for Customer {
531
    type PrimaryKey = u32;
532

            
533
29350
    fn collection_name() -> CollectionName {
534
29350
        CollectionName::new("benchmarks", "customers")
535
29350
    }
536

            
537
28070
    fn define_views(_schema: &mut Schematic) -> Result<(), Error> {
538
28070
        Ok(())
539
28070
    }
540
}
541

            
542
impl DefaultSerialization for Customer {}
543

            
544
impl Collection for Order {
545
    type PrimaryKey = u32;
546

            
547
30410
    fn collection_name() -> CollectionName {
548
30410
        CollectionName::new("benchmarks", "orders")
549
30410
    }
550

            
551
28070
    fn define_views(_schema: &mut Schematic) -> Result<(), Error> {
552
28070
        Ok(())
553
28070
    }
554
}
555

            
556
impl DefaultSerialization for Order {}
557

            
558
impl Collection for Cart {
559
    type PrimaryKey = u32;
560

            
561
39706
    fn collection_name() -> CollectionName {
562
39706
        CollectionName::new("benchmarks", "carts")
563
39706
    }
564

            
565
28070
    fn define_views(_schema: &mut Schematic) -> Result<(), Error> {
566
28070
        Ok(())
567
28070
    }
568
}
569

            
570
impl DefaultSerialization for Cart {}