1
use std::{path::Path, time::Duration};
2

            
3
use bonsaidb::{
4
    client::{url::Url, Client},
5
    core::{
6
        async_trait::async_trait,
7
        connection::{AccessPolicy, Connection, StorageConnection},
8
        define_basic_unique_mapped_view,
9
        document::CollectionDocument,
10
        schema::{
11
            view::map::Mappings, Collection, CollectionName, CollectionViewSchema,
12
            DefaultSerialization, DefaultViewSerialization, InsertError, Name, NamedCollection,
13
            ReduceResult, Schema, SchemaName, Schematic, SerializedCollection, View, ViewMapResult,
14
            ViewMappedValue,
15
        },
16
        transaction::{self, Transaction},
17
        Error,
18
    },
19
    local::config::Builder,
20
    server::{DefaultPermissions, Server, ServerConfiguration},
21
    AnyDatabase,
22
};
23
use serde::{Deserialize, Serialize};
24

            
25
use crate::{
26
    execute::{Backend, BackendOperator, Measurements, Metric, Operator},
27
    model::{Cart, Category, Customer, Order, Product, ProductReview},
28
    plan::{
29
        AddProductToCart, Checkout, CreateCart, FindProduct, Load, LookupProduct, OperationResult,
30
        ReviewProduct,
31
    },
32
};
33

            
34
pub enum Bonsai {
35
    Local,
36
    Quic,
37
    WebSockets,
38
}
39

            
40
impl Bonsai {
41
39
    pub fn label(&self) -> &'static str {
42
39
        match self {
43
13
            Self::Local => "bonsaidb-local",
44
13
            Self::Quic => "bonsaidb-quic",
45
13
            Self::WebSockets => "bonsaidb-ws",
46
        }
47
39
    }
48
}
49

            
50
pub struct BonsaiBackend {
51
    server: Server,
52
    kind: Bonsai,
53
}
54

            
55
pub struct BonsaiOperator {
56
    label: &'static str,
57
    database: AnyDatabase,
58
}
59

            
60
#[derive(Debug)]
61
pub enum Commerce {}
62

            
63
#[async_trait]
64
impl Backend for BonsaiBackend {
65
    type Operator = BonsaiOperator;
66
    type Config = Bonsai;
67

            
68
39
    fn label(&self) -> &'static str {
69
39
        self.kind.label()
70
39
    }
71

            
72
12
    async fn new(config: Self::Config) -> Self {
73
12
        let path = Path::new("commerce-benchmarks.bonsaidb");
74
12
        if path.exists() {
75
11
            std::fs::remove_dir_all(path).unwrap();
76
11
        }
77
12
        let server = Server::open(
78
12
            ServerConfiguration::new(path)
79
12
                .default_permissions(DefaultPermissions::AllowAll)
80
12
                .with_schema::<Commerce>()
81
12
                .unwrap(),
82
225
        )
83
225
        .await
84
12
        .unwrap();
85
94
        server.install_self_signed_certificate(false).await.unwrap();
86
12
        server
87
22
            .create_database::<Commerce>("commerce", false)
88
22
            .await
89
12
            .unwrap();
90
12

            
91
12
        match config {
92
4
            Bonsai::Quic => {
93
4
                let server = server.clone();
94
4
                tokio::spawn(async move {
95
26
                    server.listen_on(7022).await.unwrap();
96
4
                });
97
4
            }
98
4
            Bonsai::WebSockets => {
99
4
                let server = server.clone();
100
4
                tokio::spawn(async move {
101
4
                    server
102
10
                        .listen_for_websockets_on("0.0.0.0:7023", false)
103
10
                        .await
104
                        .unwrap();
105
4
                });
106
4
            }
107
4
            Bonsai::Local => {}
108
        }
109
        // Allow the server time to start listening
110
12
        tokio::time::sleep(Duration::from_millis(1000)).await;
111

            
112
12
        BonsaiBackend {
113
12
            server,
114
12
            kind: config,
115
12
        }
116
24
    }
117

            
118
39
    async fn new_operator_async(&self) -> Self::Operator {
119
39
        let database = match self.kind {
120
            Bonsai::Local => {
121
13
                AnyDatabase::Local(self.server.database::<Commerce>("commerce").await.unwrap())
122
            }
123

            
124
            Bonsai::Quic => {
125
13
                let client = Client::build(Url::parse("bonsaidb://localhost:7022").unwrap())
126
13
                    .with_certificate(
127
13
                        self.server
128
13
                            .certificate_chain()
129
                            .await
130
13
                            .unwrap()
131
13
                            .into_end_entity_certificate(),
132
13
                    )
133
13
                    .finish()
134
                    .await
135
13
                    .unwrap();
136
13
                AnyDatabase::Networked(client.database::<Commerce>("commerce").await.unwrap())
137
            }
138
            Bonsai::WebSockets => {
139
13
                let client = Client::build(Url::parse("ws://localhost:7023").unwrap())
140
13
                    .finish()
141
                    .await
142
13
                    .unwrap();
143
13
                AnyDatabase::Networked(client.database::<Commerce>("commerce").await.unwrap())
144
            }
145
        };
146
39
        BonsaiOperator {
147
39
            database,
148
39
            label: self.label(),
149
39
        }
150
78
    }
151
}
152

            
153
impl BackendOperator for BonsaiOperator {}
154

            
155
#[async_trait]
156
impl Operator<Load> for BonsaiOperator {
157
12
    async fn operate(
158
12
        &mut self,
159
12
        operation: &Load,
160
12
        _results: &[OperationResult],
161
12
        measurements: &Measurements,
162
12
    ) -> OperationResult {
163
12
        let measurement = measurements.begin(self.label, Metric::Load);
164
12
        let mut tx = Transaction::default();
165
120
        for (id, category) in &operation.initial_data.categories {
166
120
            tx.push(
167
120
                transaction::Operation::insert_serialized::<Category>(
168
120
                    Some(u64::from(*id)),
169
120
                    category,
170
120
                )
171
120
                .unwrap(),
172
120
            );
173
120
        }
174
1467
        for (id, product) in &operation.initial_data.products {
175
1467
            tx.push(
176
1467
                transaction::Operation::insert_serialized::<Product>(Some(u64::from(*id)), product)
177
1467
                    .unwrap(),
178
1467
            );
179
1467
        }
180
2112
        for (id, customer) in &operation.initial_data.customers {
181
2112
            tx.push(
182
2112
                transaction::Operation::insert_serialized::<Customer>(
183
2112
                    Some(u64::from(*id)),
184
2112
                    customer,
185
2112
                )
186
2112
                .unwrap(),
187
2112
            );
188
2112
        }
189
3735
        for (id, order) in &operation.initial_data.orders {
190
3735
            tx.push(
191
3735
                transaction::Operation::insert_serialized::<Order>(Some(u64::from(*id)), order)
192
3735
                    .unwrap(),
193
3735
            );
194
3735
        }
195
1650
        for review in &operation.initial_data.reviews {
196
1650
            tx.push(
197
1650
                transaction::Operation::insert_serialized::<ProductReview>(None, review).unwrap(),
198
1650
            );
199
1650
        }
200
13
        self.database.apply_transaction(tx).await.unwrap();
201
12
        measurement.finish();
202
12
        OperationResult::Ok
203
24
    }
204
}
205

            
206
#[async_trait]
207
impl Operator<FindProduct> for BonsaiOperator {
208
5826
    async fn operate(
209
5826
        &mut self,
210
5826
        operation: &FindProduct,
211
5826
        _results: &[OperationResult],
212
5826
        measurements: &Measurements,
213
5826
    ) -> OperationResult {
214
5826
        let measurement = measurements.begin(self.label, Metric::FindProduct);
215
5826
        let doc = Product::load(&operation.name, &self.database)
216
5781
            .await
217
5826
            .unwrap()
218
5826
            .unwrap();
219
5826
        let rating = self
220
5826
            .database
221
5826
            .view::<ProductReviewsByProduct>()
222
5826
            .with_key(doc.id as u32)
223
5826
            .with_access_policy(AccessPolicy::NoUpdate)
224
5826
            .reduce()
225
3884
            .await
226
5826
            .unwrap();
227
5826
        measurement.finish();
228
5826
        OperationResult::Product {
229
5826
            id: doc.id as u32,
230
5826
            product: doc.contents,
231
5826
            rating: rating.average(),
232
5826
        }
233
11652
    }
234
}
235

            
236
#[async_trait]
237
impl Operator<LookupProduct> for BonsaiOperator {
238
5604
    async fn operate(
239
5604
        &mut self,
240
5604
        operation: &LookupProduct,
241
5604
        _results: &[OperationResult],
242
5604
        measurements: &Measurements,
243
5604
    ) -> OperationResult {
244
5604
        let measurement = measurements.begin(self.label, Metric::LookupProduct);
245
5604
        let doc = Product::get(operation.id as u64, &self.database)
246
4608
            .await
247
5604
            .unwrap()
248
5604
            .unwrap();
249
5604
        let rating = self
250
5604
            .database
251
5604
            .view::<ProductReviewsByProduct>()
252
5604
            .with_key(doc.id as u32)
253
5604
            .with_access_policy(AccessPolicy::NoUpdate)
254
5604
            .reduce()
255
3736
            .await
256
5604
            .unwrap();
257
5604
        measurement.finish();
258
5604
        OperationResult::Product {
259
5604
            id: doc.id as u32,
260
5604
            product: doc.contents,
261
5604
            rating: rating.average(),
262
5604
        }
263
11208
    }
264
}
265

            
266
#[async_trait]
267
impl Operator<CreateCart> for BonsaiOperator {
268
1167
    async fn operate(
269
1167
        &mut self,
270
1167
        _operation: &CreateCart,
271
1167
        _results: &[OperationResult],
272
1167
        measurements: &Measurements,
273
1167
    ) -> OperationResult {
274
1167
        let measurement = measurements.begin(self.label, Metric::CreateCart);
275
1167
        let cart = Cart::default().push_into(&self.database).await.unwrap();
276
1167
        measurement.finish();
277
1167
        OperationResult::Cart { id: cart.id as u32 }
278
2334
    }
279
}
280

            
281
#[async_trait]
282
impl Operator<AddProductToCart> for BonsaiOperator {
283
2853
    async fn operate(
284
2853
        &mut self,
285
2853
        operation: &AddProductToCart,
286
2853
        results: &[OperationResult],
287
2853
        measurements: &Measurements,
288
2853
    ) -> OperationResult {
289
2853
        let cart = match &results[operation.cart.0] {
290
2853
            OperationResult::Cart { id } => *id as u64,
291
            _ => unreachable!("Invalid operation result"),
292
        };
293
2853
        let product = match &results[operation.product.0] {
294
2853
            OperationResult::Product { id, .. } => *id,
295
            _ => unreachable!("Invalid operation result"),
296
        };
297

            
298
2853
        let measurement = measurements.begin(self.label, Metric::AddProductToCart);
299
2853
        let mut cart = Cart::get(cart, &self.database).await.unwrap().unwrap();
300
2853
        cart.contents.product_ids.push(product);
301
2853
        cart.update(&self.database).await.unwrap();
302
2853
        measurement.finish();
303
2853

            
304
2853
        OperationResult::CartProduct { id: product }
305
5706
    }
306
}
307

            
308
#[async_trait]
309
impl Operator<Checkout> for BonsaiOperator {
310
297
    async fn operate(
311
297
        &mut self,
312
297
        operation: &Checkout,
313
297
        results: &[OperationResult],
314
297
        measurements: &Measurements,
315
297
    ) -> OperationResult {
316
297
        let cart = match &results[operation.cart.0] {
317
297
            OperationResult::Cart { id } => *id as u64,
318
            _ => unreachable!("Invalid operation result"),
319
        };
320

            
321
297
        let measurement = measurements.begin(self.label, Metric::Checkout);
322
297
        let cart = Cart::get(cart, &self.database).await.unwrap().unwrap();
323
297
        cart.delete(&self.database).await.unwrap();
324
297
        Order {
325
297
            customer_id: operation.customer_id,
326
297
            product_ids: cart.contents.product_ids,
327
297
        }
328
297
        .push_into(&self.database)
329
246
        .await
330
297
        .unwrap();
331
297
        measurement.finish();
332
297

            
333
297
        OperationResult::Ok
334
594
    }
335
}
336

            
337
#[async_trait]
338
impl Operator<ReviewProduct> for BonsaiOperator {
339
195
    async fn operate(
340
195
        &mut self,
341
195
        operation: &ReviewProduct,
342
195
        results: &[OperationResult],
343
195
        measurements: &Measurements,
344
195
    ) -> OperationResult {
345
195
        let product_id = match &results[operation.product_id.0] {
346
            OperationResult::Product { id, .. } => *id,
347
195
            OperationResult::CartProduct { id, .. } => *id,
348
            other => unreachable!("Invalid operation result {:?}", other),
349
        };
350

            
351
195
        let measurement = measurements.begin(self.label, Metric::RateProduct);
352
195
        let review = ProductReview {
353
195
            customer_id: operation.customer_id,
354
195
            product_id,
355
195
            review: operation.review.clone(),
356
195
            rating: operation.rating,
357
195
        };
358
195
        match review.push_into(&self.database).await {
359
195
            Ok(_) => {}
360
            Err(InsertError {
361
                error:
362
                    bonsaidb::core::Error::UniqueKeyViolation {
363
                        existing_document, ..
364
                    },
365
                contents,
366
            }) => {
367
                CollectionDocument::<ProductReview> {
368
                    header: existing_document,
369
                    contents,
370
                }
371
                .update(&self.database)
372
                .await
373
                .unwrap();
374
            }
375
            other => {
376
                other.unwrap();
377
            }
378
        }
379
        // Force the view to update.
380
195
        self.database
381
195
            .view::<ProductReviewsByProduct>()
382
195
            .with_key(0)
383
204
            .reduce()
384
204
            .await
385
195
            .unwrap();
386
195
        measurement.finish();
387
195

            
388
195
        OperationResult::Ok
389
390
    }
390
}
391

            
392
impl Schema for Commerce {
393
20772
    fn schema_name() -> SchemaName {
394
20772
        SchemaName::new("benchmarks", "commerce")
395
20772
    }
396

            
397
    fn define_collections(schema: &mut Schematic) -> Result<(), Error> {
398
20735
        schema.define_collection::<Product>()?;
399
20735
        schema.define_collection::<Category>()?;
400
20735
        schema.define_collection::<Customer>()?;
401
20735
        schema.define_collection::<Order>()?;
402
20735
        schema.define_collection::<Cart>()?;
403
20735
        schema.define_collection::<ProductReview>()?;
404
20735
        Ok(())
405
20735
    }
406
}
407

            
408
impl Collection for Product {
409
168501
    fn collection_name() -> CollectionName {
410
168501
        CollectionName::new("benchmarks", "products")
411
168501
    }
412

            
413
    fn define_views(schema: &mut Schematic) -> Result<(), Error> {
414
20735
        schema.define_view(ProductsByName)?;
415
20735
        schema.define_view(ProductsByCategoryId)?;
416
20735
        Ok(())
417
20735
    }
418
}
419

            
420
impl DefaultSerialization for Product {}
421

            
422
define_basic_unique_mapped_view!(
423
    ProductsByName,
424
    Product,
425
    1,
426
    "by-name",
427
    String,
428
    (),
429
1467
    |document: CollectionDocument<Product>| {
430
1467
        document.header.emit_key(document.contents.name.clone())
431
1467
    },
432
);
433

            
434
20735
#[derive(Debug, Clone)]
435
pub struct ProductsByCategoryId;
436

            
437
impl View for ProductsByCategoryId {
438
    type Collection = Product;
439
    type Key = u32;
440
    type Value = u32;
441

            
442
22214
    fn name(&self) -> Name {
443
22214
        Name::new("by-category")
444
22214
    }
445
}
446

            
447
impl CollectionViewSchema for ProductsByCategoryId {
448
    type View = Self;
449

            
450
    fn map(
451
        &self,
452
        document: CollectionDocument<<Self::View as View>::Collection>,
453
    ) -> ViewMapResult<Self::View> {
454
        let mut mappings = Mappings::default();
455
        for &id in &document.contents.category_ids {
456
            mappings = mappings.and(document.emit_key_and_value(id, 1));
457
        }
458
        Ok(mappings)
459
    }
460
}
461

            
462
impl DefaultViewSerialization for ProductsByCategoryId {}
463

            
464
impl NamedCollection for Product {
465
    type ByNameView = ProductsByName;
466
}
467

            
468
impl Collection for ProductReview {
469
102160
    fn collection_name() -> CollectionName {
470
102160
        CollectionName::new("benchmarks", "reviews")
471
102160
    }
472

            
473
    fn define_views(schema: &mut Schematic) -> Result<(), Error> {
474
20735
        schema.define_view(ProductReviewsByProduct)?;
475
20735
        Ok(())
476
20735
    }
477
}
478

            
479
impl DefaultSerialization for ProductReview {}
480

            
481
20735
#[derive(Debug, Clone)]
482
pub struct ProductReviewsByProduct;
483

            
484
impl View for ProductReviewsByProduct {
485
    type Collection = ProductReview;
486
    type Key = u32;
487
    type Value = ProductRatings;
488

            
489
46622
    fn name(&self) -> Name {
490
46622
        Name::new("by-product")
491
46622
    }
492
}
493

            
494
impl CollectionViewSchema for ProductReviewsByProduct {
495
    type View = Self;
496

            
497
1845
    fn map(
498
1845
        &self,
499
1845
        document: CollectionDocument<<Self as View>::Collection>,
500
1845
    ) -> ViewMapResult<Self::View> {
501
1845
        Ok(document.emit_key_and_value(
502
1845
            document.contents.product_id,
503
1845
            ProductRatings {
504
1845
                total_score: document.contents.rating as u32,
505
1845
                ratings: 1,
506
1845
            },
507
1845
        ))
508
1845
    }
509

            
510
7057
    fn reduce(
511
7057
        &self,
512
7057
        mappings: &[ViewMappedValue<Self::View>],
513
7057
        _rereduce: bool,
514
7057
    ) -> ReduceResult<Self::View> {
515
7057
        Ok(mappings
516
7057
            .iter()
517
7057
            .map(|mapping| mapping.value.clone())
518
7057
            .reduce(|a, b| ProductRatings {
519
1635
                total_score: a.total_score + b.total_score,
520
1635
                ratings: a.ratings + b.ratings,
521
7057
            })
522
7057
            .unwrap_or_default())
523
7057
    }
524
}
525

            
526
impl DefaultViewSerialization for ProductReviewsByProduct {}
527

            
528
75525
#[derive(Debug, Serialize, Deserialize, Default, Clone)]
529
pub struct ProductRatings {
530
    pub total_score: u32,
531
    pub ratings: u32,
532
}
533

            
534
impl ProductRatings {
535
11430
    pub fn average(&self) -> Option<f32> {
536
11430
        if self.ratings > 0 {
537
6218
            Some(self.total_score as f32 / self.ratings as f32)
538
        } else {
539
5212
            None
540
        }
541
11430
    }
542
}
543

            
544
impl Collection for Category {
545
20855
    fn collection_name() -> CollectionName {
546
20855
        CollectionName::new("benchmarks", "categories")
547
20855
    }
548

            
549
20735
    fn define_views(_schema: &mut Schematic) -> Result<(), Error> {
550
20735
        Ok(())
551
20735
    }
552
}
553

            
554
impl DefaultSerialization for Category {}
555

            
556
impl Collection for Customer {
557
22847
    fn collection_name() -> CollectionName {
558
22847
        CollectionName::new("benchmarks", "customers")
559
22847
    }
560

            
561
20735
    fn define_views(_schema: &mut Schematic) -> Result<(), Error> {
562
20735
        Ok(())
563
20735
    }
564
}
565

            
566
impl DefaultSerialization for Customer {}
567

            
568
impl Collection for Order {
569
24767
    fn collection_name() -> CollectionName {
570
24767
        CollectionName::new("benchmarks", "orders")
571
24767
    }
572

            
573
20735
    fn define_views(_schema: &mut Schematic) -> Result<(), Error> {
574
20735
        Ok(())
575
20735
    }
576
}
577

            
578
impl DefaultSerialization for Order {}
579

            
580
impl Collection for Cart {
581
28202
    fn collection_name() -> CollectionName {
582
28202
        CollectionName::new("benchmarks", "carts")
583
28202
    }
584

            
585
20735
    fn define_views(_schema: &mut Schematic) -> Result<(), Error> {
586
20735
        Ok(())
587
20735
    }
588
}
589

            
590
impl DefaultSerialization for Cart {}