1
use std::{path::Path, time::Duration};
2

            
3
use bonsaidb::{
4
    client::{url::Url, Client},
5
    core::{
6
        async_trait::async_trait,
7
        connection::{AccessPolicy, Connection, StorageConnection},
8
        define_basic_unique_mapped_view,
9
        document::CollectionDocument,
10
        schema::{
11
            view::map::Mappings, Collection, CollectionName, CollectionViewSchema,
12
            DefaultSerialization, InsertError, NamedCollection, ReduceResult, Schema, Schematic,
13
            SerializedCollection, View, ViewMapResult, ViewMappedValue,
14
        },
15
        transaction::{self, Transaction},
16
        Error,
17
    },
18
    local::config::Builder,
19
    server::{DefaultPermissions, Server, ServerConfiguration},
20
    AnyDatabase,
21
};
22
use serde::{Deserialize, Serialize};
23

            
24
use crate::{
25
    execute::{Backend, BackendOperator, Measurements, Metric, Operator},
26
    model::{Cart, Category, Customer, Order, Product, ProductReview},
27
    plan::{
28
        AddProductToCart, Checkout, CreateCart, FindProduct, Load, LookupProduct, OperationResult,
29
        ReviewProduct,
30
    },
31
};
32

            
33
pub enum Bonsai {
34
    Local,
35
    Quic,
36
    WebSockets,
37
}
38

            
39
impl Bonsai {
40
39
    pub fn label(&self) -> &'static str {
41
39
        match self {
42
13
            Self::Local => "bonsaidb-local",
43
13
            Self::Quic => "bonsaidb-quic",
44
13
            Self::WebSockets => "bonsaidb-ws",
45
        }
46
39
    }
47
}
48

            
49
pub struct BonsaiBackend {
50
    server: Server,
51
    kind: Bonsai,
52
}
53

            
54
pub struct BonsaiOperator {
55
    label: &'static str,
56
    database: AnyDatabase,
57
}
58

            
59
21873
#[derive(Debug, Schema)]
60
#[schema(name = "commerce", authority = "benchmarks", collections = [Product, Category, Customer, Order, Cart, ProductReview])]
61
pub enum Commerce {}
62

            
63
#[async_trait]
64
impl Backend for BonsaiBackend {
65
    type Operator = BonsaiOperator;
66
    type Config = Bonsai;
67

            
68
39
    fn label(&self) -> &'static str {
69
39
        self.kind.label()
70
39
    }
71

            
72
12
    async fn new(config: Self::Config) -> Self {
73
12
        let path = Path::new("commerce-benchmarks.bonsaidb");
74
12
        if path.exists() {
75
11
            std::fs::remove_dir_all(path).unwrap();
76
11
        }
77
12
        let server = Server::open(
78
12
            ServerConfiguration::new(path)
79
12
                .default_permissions(DefaultPermissions::AllowAll)
80
12
                .with_schema::<Commerce>()
81
12
                .unwrap(),
82
206
        )
83
206
        .await
84
12
        .unwrap();
85
92
        server.install_self_signed_certificate(false).await.unwrap();
86
12
        server
87
23
            .create_database::<Commerce>("commerce", false)
88
23
            .await
89
12
            .unwrap();
90
12

            
91
12
        match config {
92
4
            Bonsai::Quic => {
93
4
                let server = server.clone();
94
4
                tokio::spawn(async move {
95
27
                    server.listen_on(7022).await.unwrap();
96
4
                });
97
4
            }
98
4
            Bonsai::WebSockets => {
99
4
                let server = server.clone();
100
4
                tokio::spawn(async move {
101
4
                    server
102
8
                        .listen_for_websockets_on("0.0.0.0:7023", false)
103
8
                        .await
104
                        .unwrap();
105
4
                });
106
4
            }
107
4
            Bonsai::Local => {}
108
        }
109
        // Allow the server time to start listening
110
12
        tokio::time::sleep(Duration::from_millis(1000)).await;
111

            
112
12
        BonsaiBackend {
113
12
            server,
114
12
            kind: config,
115
12
        }
116
24
    }
117

            
118
39
    async fn new_operator_async(&self) -> Self::Operator {
119
39
        let database = match self.kind {
120
            Bonsai::Local => {
121
13
                AnyDatabase::Local(self.server.database::<Commerce>("commerce").await.unwrap())
122
            }
123

            
124
            Bonsai::Quic => {
125
13
                let client = Client::build(Url::parse("bonsaidb://localhost:7022").unwrap())
126
13
                    .with_certificate(
127
13
                        self.server
128
13
                            .certificate_chain()
129
                            .await
130
13
                            .unwrap()
131
13
                            .into_end_entity_certificate(),
132
13
                    )
133
13
                    .finish()
134
                    .await
135
13
                    .unwrap();
136
13
                AnyDatabase::Networked(client.database::<Commerce>("commerce").await.unwrap())
137
            }
138
            Bonsai::WebSockets => {
139
13
                let client = Client::build(Url::parse("ws://localhost:7023").unwrap())
140
13
                    .finish()
141
                    .await
142
13
                    .unwrap();
143
13
                AnyDatabase::Networked(client.database::<Commerce>("commerce").await.unwrap())
144
            }
145
        };
146
39
        BonsaiOperator {
147
39
            database,
148
39
            label: self.label(),
149
39
        }
150
78
    }
151
}
152

            
153
impl BackendOperator for BonsaiOperator {}
154

            
155
#[async_trait]
156
impl Operator<Load> for BonsaiOperator {
157
12
    async fn operate(
158
12
        &mut self,
159
12
        operation: &Load,
160
12
        _results: &[OperationResult],
161
12
        measurements: &Measurements,
162
12
    ) -> OperationResult {
163
12
        let measurement = measurements.begin(self.label, Metric::Load);
164
12
        let mut tx = Transaction::default();
165
156
        for (id, category) in &operation.initial_data.categories {
166
156
            tx.push(
167
156
                transaction::Operation::insert_serialized::<Category>(
168
156
                    Some(u64::from(*id)),
169
156
                    category,
170
156
                )
171
156
                .unwrap(),
172
156
            );
173
156
        }
174
1386
        for (id, product) in &operation.initial_data.products {
175
1386
            tx.push(
176
1386
                transaction::Operation::insert_serialized::<Product>(Some(u64::from(*id)), product)
177
1386
                    .unwrap(),
178
1386
            );
179
1386
        }
180
2361
        for (id, customer) in &operation.initial_data.customers {
181
2361
            tx.push(
182
2361
                transaction::Operation::insert_serialized::<Customer>(
183
2361
                    Some(u64::from(*id)),
184
2361
                    customer,
185
2361
                )
186
2361
                .unwrap(),
187
2361
            );
188
2361
        }
189
1605
        for (id, order) in &operation.initial_data.orders {
190
1605
            tx.push(
191
1605
                transaction::Operation::insert_serialized::<Order>(Some(u64::from(*id)), order)
192
1605
                    .unwrap(),
193
1605
            );
194
1605
        }
195
1329
        for review in &operation.initial_data.reviews {
196
1329
            tx.push(
197
1329
                transaction::Operation::insert_serialized::<ProductReview>(None, review).unwrap(),
198
1329
            );
199
1329
        }
200
15
        self.database.apply_transaction(tx).await.unwrap();
201
12
        measurement.finish();
202
12
        OperationResult::Ok
203
24
    }
204
}
205

            
206
#[async_trait]
207
impl Operator<FindProduct> for BonsaiOperator {
208
5964
    async fn operate(
209
5964
        &mut self,
210
5964
        operation: &FindProduct,
211
5964
        _results: &[OperationResult],
212
5964
        measurements: &Measurements,
213
5964
    ) -> OperationResult {
214
5964
        let measurement = measurements.begin(self.label, Metric::FindProduct);
215
6073
        let doc = Product::load(&operation.name, &self.database)
216
6073
            .await
217
5964
            .unwrap()
218
5964
            .unwrap();
219
5964
        let rating = self
220
5964
            .database
221
5964
            .view::<ProductReviewsByProduct>()
222
5964
            .with_key(doc.id as u32)
223
5964
            .with_access_policy(AccessPolicy::NoUpdate)
224
5964
            .reduce()
225
3976
            .await
226
5964
            .unwrap();
227
5964
        measurement.finish();
228
5964
        OperationResult::Product {
229
5964
            id: doc.id as u32,
230
5964
            product: doc.contents,
231
5964
            rating: rating.average(),
232
5964
        }
233
11928
    }
234
}
235

            
236
#[async_trait]
237
impl Operator<LookupProduct> for BonsaiOperator {
238
5931
    async fn operate(
239
5931
        &mut self,
240
5931
        operation: &LookupProduct,
241
5931
        _results: &[OperationResult],
242
5931
        measurements: &Measurements,
243
5931
    ) -> OperationResult {
244
5931
        let measurement = measurements.begin(self.label, Metric::LookupProduct);
245
5931
        let doc = Product::get(operation.id as u64, &self.database)
246
4986
            .await
247
5931
            .unwrap()
248
5931
            .unwrap();
249
5931
        let rating = self
250
5931
            .database
251
5931
            .view::<ProductReviewsByProduct>()
252
5931
            .with_key(doc.id as u32)
253
5931
            .with_access_policy(AccessPolicy::NoUpdate)
254
5931
            .reduce()
255
3954
            .await
256
5931
            .unwrap();
257
5931
        measurement.finish();
258
5931
        OperationResult::Product {
259
5931
            id: doc.id as u32,
260
5931
            product: doc.contents,
261
5931
            rating: rating.average(),
262
5931
        }
263
11862
    }
264
}
265

            
266
#[async_trait]
267
impl Operator<CreateCart> for BonsaiOperator {
268
1233
    async fn operate(
269
1233
        &mut self,
270
1233
        _operation: &CreateCart,
271
1233
        _results: &[OperationResult],
272
1233
        measurements: &Measurements,
273
1233
    ) -> OperationResult {
274
1233
        let measurement = measurements.begin(self.label, Metric::CreateCart);
275
1233
        let cart = Cart::default().push_into(&self.database).await.unwrap();
276
1233
        measurement.finish();
277
1233
        OperationResult::Cart { id: cart.id as u32 }
278
2466
    }
279
}
280

            
281
#[async_trait]
282
impl Operator<AddProductToCart> for BonsaiOperator {
283
3138
    async fn operate(
284
3138
        &mut self,
285
3138
        operation: &AddProductToCart,
286
3138
        results: &[OperationResult],
287
3138
        measurements: &Measurements,
288
3138
    ) -> OperationResult {
289
3138
        let cart = match &results[operation.cart.0] {
290
3138
            OperationResult::Cart { id } => *id as u64,
291
            _ => unreachable!("Invalid operation result"),
292
        };
293
3138
        let product = match &results[operation.product.0] {
294
3138
            OperationResult::Product { id, .. } => *id,
295
            _ => unreachable!("Invalid operation result"),
296
        };
297

            
298
3138
        let measurement = measurements.begin(self.label, Metric::AddProductToCart);
299
3138
        let mut cart = Cart::get(cart, &self.database).await.unwrap().unwrap();
300
3138
        cart.contents.product_ids.push(product);
301
3138
        cart.update(&self.database).await.unwrap();
302
3138
        measurement.finish();
303
3138

            
304
3138
        OperationResult::CartProduct { id: product }
305
6276
    }
306
}
307

            
308
#[async_trait]
309
impl Operator<Checkout> for BonsaiOperator {
310
318
    async fn operate(
311
318
        &mut self,
312
318
        operation: &Checkout,
313
318
        results: &[OperationResult],
314
318
        measurements: &Measurements,
315
318
    ) -> OperationResult {
316
318
        let cart = match &results[operation.cart.0] {
317
318
            OperationResult::Cart { id } => *id as u64,
318
            _ => unreachable!("Invalid operation result"),
319
        };
320

            
321
318
        let measurement = measurements.begin(self.label, Metric::Checkout);
322
318
        let cart = Cart::get(cart, &self.database).await.unwrap().unwrap();
323
318
        cart.delete(&self.database).await.unwrap();
324
318
        Order {
325
318
            customer_id: operation.customer_id,
326
318
            product_ids: cart.contents.product_ids,
327
318
        }
328
318
        .push_into(&self.database)
329
273
        .await
330
318
        .unwrap();
331
318
        measurement.finish();
332
318

            
333
318
        OperationResult::Ok
334
636
    }
335
}
336

            
337
#[async_trait]
338
impl Operator<ReviewProduct> for BonsaiOperator {
339
234
    async fn operate(
340
234
        &mut self,
341
234
        operation: &ReviewProduct,
342
234
        results: &[OperationResult],
343
234
        measurements: &Measurements,
344
234
    ) -> OperationResult {
345
234
        let product_id = match &results[operation.product_id.0] {
346
            OperationResult::Product { id, .. } => *id,
347
234
            OperationResult::CartProduct { id, .. } => *id,
348
            other => unreachable!("Invalid operation result {:?}", other),
349
        };
350

            
351
234
        let measurement = measurements.begin(self.label, Metric::RateProduct);
352
234
        let review = ProductReview {
353
234
            customer_id: operation.customer_id,
354
234
            product_id,
355
234
            review: operation.review.clone(),
356
234
            rating: operation.rating,
357
234
        };
358
234
        match review.push_into(&self.database).await {
359
234
            Ok(_) => {}
360
            Err(InsertError {
361
                error:
362
                    bonsaidb::core::Error::UniqueKeyViolation {
363
                        existing_document, ..
364
                    },
365
                contents,
366
            }) => {
367
                CollectionDocument::<ProductReview> {
368
                    header: existing_document,
369
                    contents,
370
                }
371
                .update(&self.database)
372
                .await
373
                .unwrap();
374
            }
375
            other => {
376
                other.unwrap();
377
            }
378
        }
379
        // Force the view to update.
380
234
        self.database
381
234
            .view::<ProductReviewsByProduct>()
382
234
            .with_key(0)
383
243
            .reduce()
384
243
            .await
385
234
            .unwrap();
386
234
        measurement.finish();
387
234

            
388
234
        OperationResult::Ok
389
468
    }
390
}
391

            
392
impl Collection for Product {
393
175560
    fn collection_name() -> CollectionName {
394
175560
        CollectionName::new("benchmarks", "products")
395
175560
    }
396

            
397
    fn define_views(schema: &mut Schematic) -> Result<(), Error> {
398
21873
        schema.define_view(ProductsByName)?;
399
21873
        schema.define_view(ProductsByCategoryId)?;
400
21873
        Ok(())
401
21873
    }
402
}
403

            
404
impl DefaultSerialization for Product {}
405

            
406
define_basic_unique_mapped_view!(
407
    ProductsByName,
408
    Product,
409
    1,
410
    "by-name",
411
    String,
412
    (),
413
1386
    |document: CollectionDocument<Product>| {
414
1386
        document.header.emit_key(document.contents.name.clone())
415
1386
    },
416
);
417

            
418
23271
#[derive(Debug, Clone, View)]
419
#[view(collection = Product, key = u32, value = u32, name = "by-category")]
420
pub struct ProductsByCategoryId;
421

            
422
impl CollectionViewSchema for ProductsByCategoryId {
423
    type View = Self;
424

            
425
    fn map(
426
        &self,
427
        document: CollectionDocument<<Self::View as View>::Collection>,
428
    ) -> ViewMapResult<Self::View> {
429
        let mut mappings = Mappings::default();
430
        for &id in &document.contents.category_ids {
431
            mappings = mappings.and(document.emit_key_and_value(id, 1));
432
        }
433
        Ok(mappings)
434
    }
435
}
436

            
437
impl NamedCollection for Product {
438
    type ByNameView = ProductsByName;
439
}
440

            
441
impl Collection for ProductReview {
442
106803
    fn collection_name() -> CollectionName {
443
106803
        CollectionName::new("benchmarks", "reviews")
444
106803
    }
445

            
446
    fn define_views(schema: &mut Schematic) -> Result<(), Error> {
447
21873
        schema.define_view(ProductReviewsByProduct)?;
448
21873
        Ok(())
449
21873
    }
450
}
451

            
452
impl DefaultSerialization for ProductReview {}
453

            
454
48642
#[derive(Debug, Clone, View)]
455
#[view(collection = ProductReview, key = u32, value = ProductRatings, name = "by-product")]
456
pub struct ProductReviewsByProduct;
457

            
458
impl CollectionViewSchema for ProductReviewsByProduct {
459
    type View = Self;
460

            
461
1563
    fn map(
462
1563
        &self,
463
1563
        document: CollectionDocument<<Self as View>::Collection>,
464
1563
    ) -> ViewMapResult<Self::View> {
465
1563
        Ok(document.emit_key_and_value(
466
1563
            document.contents.product_id,
467
1563
            ProductRatings {
468
1563
                total_score: document.contents.rating as u32,
469
1563
                ratings: 1,
470
1563
            },
471
1563
        ))
472
1563
    }
473

            
474
6186
    fn reduce(
475
6186
        &self,
476
6186
        mappings: &[ViewMappedValue<Self::View>],
477
6186
        _rereduce: bool,
478
6186
    ) -> ReduceResult<Self::View> {
479
6186
        Ok(mappings
480
6186
            .iter()
481
6186
            .map(|mapping| mapping.value.clone())
482
6186
            .reduce(|a, b| ProductRatings {
483
1176
                total_score: a.total_score + b.total_score,
484
1176
                ratings: a.ratings + b.ratings,
485
6186
            })
486
6186
            .unwrap_or_default())
487
6186
    }
488
}
489

            
490
74340
#[derive(Debug, Serialize, Deserialize, Default, Clone)]
491
pub struct ProductRatings {
492
    pub total_score: u32,
493
    pub ratings: u32,
494
}
495

            
496
impl ProductRatings {
497
11894
    pub fn average(&self) -> Option<f32> {
498
11894
        if self.ratings > 0 {
499
7272
            Some(self.total_score as f32 / self.ratings as f32)
500
        } else {
501
4622
            None
502
        }
503
11894
    }
504
}
505

            
506
impl Collection for Category {
507
22029
    fn collection_name() -> CollectionName {
508
22029
        CollectionName::new("benchmarks", "categories")
509
22029
    }
510

            
511
21873
    fn define_views(_schema: &mut Schematic) -> Result<(), Error> {
512
21873
        Ok(())
513
21873
    }
514
}
515

            
516
impl DefaultSerialization for Category {}
517

            
518
impl Collection for Customer {
519
24234
    fn collection_name() -> CollectionName {
520
24234
        CollectionName::new("benchmarks", "customers")
521
24234
    }
522

            
523
21873
    fn define_views(_schema: &mut Schematic) -> Result<(), Error> {
524
21873
        Ok(())
525
21873
    }
526
}
527

            
528
impl DefaultSerialization for Customer {}
529

            
530
impl Collection for Order {
531
23796
    fn collection_name() -> CollectionName {
532
23796
        CollectionName::new("benchmarks", "orders")
533
23796
    }
534

            
535
21873
    fn define_views(_schema: &mut Schematic) -> Result<(), Error> {
536
21873
        Ok(())
537
21873
    }
538
}
539

            
540
impl DefaultSerialization for Order {}
541

            
542
impl Collection for Cart {
543
30018
    fn collection_name() -> CollectionName {
544
30018
        CollectionName::new("benchmarks", "carts")
545
30018
    }
546

            
547
21873
    fn define_views(_schema: &mut Schematic) -> Result<(), Error> {
548
21873
        Ok(())
549
21873
    }
550
}
551

            
552
impl DefaultSerialization for Cart {}