1
use std::{path::Path, time::Duration};
2

            
3
use bonsaidb::{
4
    client::{url::Url, Client},
5
    core::{
6
        async_trait::async_trait,
7
        connection::{AccessPolicy, Connection, StorageConnection},
8
        define_basic_unique_mapped_view,
9
        document::CollectionDocument,
10
        schema::{
11
            view::map::Mappings, Collection, CollectionName, CollectionViewSchema,
12
            DefaultSerialization, InsertError, NamedCollection, ReduceResult, Schema, Schematic,
13
            SerializedCollection, View, ViewMapResult, ViewMappedValue,
14
        },
15
        transaction::{self, Transaction},
16
        Error,
17
    },
18
    local::config::Builder,
19
    server::{DefaultPermissions, Server, ServerConfiguration},
20
    AnyDatabase,
21
};
22
use serde::{Deserialize, Serialize};
23

            
24
use crate::{
25
    execute::{Backend, BackendOperator, Measurements, Metric, Operator},
26
    model::{Cart, Category, Customer, Order, Product, ProductReview},
27
    plan::{
28
        AddProductToCart, Checkout, CreateCart, FindProduct, Load, LookupProduct, OperationResult,
29
        ReviewProduct,
30
    },
31
};
32

            
33
pub enum Bonsai {
34
    Local,
35
    Quic,
36
    WebSockets,
37
}
38

            
39
impl Bonsai {
40
39
    pub fn label(&self) -> &'static str {
41
39
        match self {
42
13
            Self::Local => "bonsaidb-local",
43
13
            Self::Quic => "bonsaidb-quic",
44
13
            Self::WebSockets => "bonsaidb-ws",
45
        }
46
39
    }
47
}
48

            
49
pub struct BonsaiBackend {
50
    server: Server,
51
    kind: Bonsai,
52
}
53

            
54
pub struct BonsaiOperator {
55
    label: &'static str,
56
    database: AnyDatabase,
57
}
58

            
59
21451
#[derive(Debug, Schema)]
60
#[schema(name = "commerce", authority = "benchmarks", collections = [Product, Category, Customer, Order, Cart, ProductReview])]
61
pub enum Commerce {}
62

            
63
#[async_trait]
64
impl Backend for BonsaiBackend {
65
    type Operator = BonsaiOperator;
66
    type Config = Bonsai;
67

            
68
39
    fn label(&self) -> &'static str {
69
39
        self.kind.label()
70
39
    }
71

            
72
12
    async fn new(config: Self::Config) -> Self {
73
12
        let path = Path::new("commerce-benchmarks.bonsaidb");
74
12
        if path.exists() {
75
11
            std::fs::remove_dir_all(path).unwrap();
76
11
        }
77
12
        let server = Server::open(
78
12
            ServerConfiguration::new(path)
79
12
                .default_permissions(DefaultPermissions::AllowAll)
80
12
                .with_schema::<Commerce>()
81
12
                .unwrap(),
82
216
        )
83
216
        .await
84
12
        .unwrap();
85
91
        server.install_self_signed_certificate(false).await.unwrap();
86
12
        server
87
22
            .create_database::<Commerce>("commerce", false)
88
22
            .await
89
12
            .unwrap();
90
12

            
91
12
        match config {
92
4
            Bonsai::Quic => {
93
4
                let server = server.clone();
94
4
                tokio::spawn(async move {
95
28
                    server.listen_on(7022).await.unwrap();
96
4
                });
97
4
            }
98
4
            Bonsai::WebSockets => {
99
4
                let server = server.clone();
100
4
                tokio::spawn(async move {
101
4
                    server
102
10
                        .listen_for_websockets_on("0.0.0.0:7023", false)
103
10
                        .await
104
                        .unwrap();
105
4
                });
106
4
            }
107
4
            Bonsai::Local => {}
108
        }
109
        // Allow the server time to start listening
110
12
        tokio::time::sleep(Duration::from_millis(1000)).await;
111

            
112
12
        BonsaiBackend {
113
12
            server,
114
12
            kind: config,
115
12
        }
116
24
    }
117

            
118
39
    async fn new_operator_async(&self) -> Self::Operator {
119
39
        let database = match self.kind {
120
            Bonsai::Local => {
121
13
                AnyDatabase::Local(self.server.database::<Commerce>("commerce").await.unwrap())
122
            }
123

            
124
            Bonsai::Quic => {
125
13
                let client = Client::build(Url::parse("bonsaidb://localhost:7022").unwrap())
126
13
                    .with_certificate(
127
13
                        self.server
128
13
                            .certificate_chain()
129
                            .await
130
13
                            .unwrap()
131
13
                            .into_end_entity_certificate(),
132
13
                    )
133
13
                    .finish()
134
                    .await
135
13
                    .unwrap();
136
13
                AnyDatabase::Networked(client.database::<Commerce>("commerce").await.unwrap())
137
            }
138
            Bonsai::WebSockets => {
139
13
                let client = Client::build(Url::parse("ws://localhost:7023").unwrap())
140
13
                    .finish()
141
                    .await
142
13
                    .unwrap();
143
13
                AnyDatabase::Networked(client.database::<Commerce>("commerce").await.unwrap())
144
            }
145
        };
146
39
        BonsaiOperator {
147
39
            database,
148
39
            label: self.label(),
149
39
        }
150
78
    }
151
}
152

            
153
impl BackendOperator for BonsaiOperator {}
154

            
155
#[async_trait]
156
impl Operator<Load> for BonsaiOperator {
157
12
    async fn operate(
158
12
        &mut self,
159
12
        operation: &Load,
160
12
        _results: &[OperationResult],
161
12
        measurements: &Measurements,
162
12
    ) -> OperationResult {
163
12
        let measurement = measurements.begin(self.label, Metric::Load);
164
12
        let mut tx = Transaction::default();
165
171
        for (id, category) in &operation.initial_data.categories {
166
171
            tx.push(
167
171
                transaction::Operation::insert_serialized::<Category>(
168
171
                    Some(u64::from(*id)),
169
171
                    category,
170
171
                )
171
171
                .unwrap(),
172
171
            );
173
171
        }
174
1488
        for (id, product) in &operation.initial_data.products {
175
1488
            tx.push(
176
1488
                transaction::Operation::insert_serialized::<Product>(Some(u64::from(*id)), product)
177
1488
                    .unwrap(),
178
1488
            );
179
1488
        }
180
1707
        for (id, customer) in &operation.initial_data.customers {
181
1707
            tx.push(
182
1707
                transaction::Operation::insert_serialized::<Customer>(
183
1707
                    Some(u64::from(*id)),
184
1707
                    customer,
185
1707
                )
186
1707
                .unwrap(),
187
1707
            );
188
1707
        }
189
1509
        for (id, order) in &operation.initial_data.orders {
190
1509
            tx.push(
191
1509
                transaction::Operation::insert_serialized::<Order>(Some(u64::from(*id)), order)
192
1509
                    .unwrap(),
193
1509
            );
194
1509
        }
195
1047
        for review in &operation.initial_data.reviews {
196
1047
            tx.push(
197
1047
                transaction::Operation::insert_serialized::<ProductReview>(None, review).unwrap(),
198
1047
            );
199
1047
        }
200
14
        self.database.apply_transaction(tx).await.unwrap();
201
12
        measurement.finish();
202
12
        OperationResult::Ok
203
24
    }
204
}
205

            
206
#[async_trait]
207
impl Operator<FindProduct> for BonsaiOperator {
208
6030
    async fn operate(
209
6030
        &mut self,
210
6030
        operation: &FindProduct,
211
6030
        _results: &[OperationResult],
212
6030
        measurements: &Measurements,
213
6030
    ) -> OperationResult {
214
6030
        let measurement = measurements.begin(self.label, Metric::FindProduct);
215
6083
        let doc = Product::load(&operation.name, &self.database)
216
6083
            .await
217
6030
            .unwrap()
218
6030
            .unwrap();
219
6030
        let rating = self
220
6030
            .database
221
6030
            .view::<ProductReviewsByProduct>()
222
6030
            .with_key(doc.id as u32)
223
6030
            .with_access_policy(AccessPolicy::NoUpdate)
224
6030
            .reduce()
225
4020
            .await
226
6030
            .unwrap();
227
6030
        measurement.finish();
228
6030
        OperationResult::Product {
229
6030
            id: doc.id as u32,
230
6030
            product: doc.contents,
231
6030
            rating: rating.average(),
232
6030
        }
233
12060
    }
234
}
235

            
236
#[async_trait]
237
impl Operator<LookupProduct> for BonsaiOperator {
238
5760
    async fn operate(
239
5760
        &mut self,
240
5760
        operation: &LookupProduct,
241
5760
        _results: &[OperationResult],
242
5760
        measurements: &Measurements,
243
5760
    ) -> OperationResult {
244
5760
        let measurement = measurements.begin(self.label, Metric::LookupProduct);
245
5760
        let doc = Product::get(operation.id as u64, &self.database)
246
4775
            .await
247
5760
            .unwrap()
248
5760
            .unwrap();
249
5760
        let rating = self
250
5760
            .database
251
5760
            .view::<ProductReviewsByProduct>()
252
5760
            .with_key(doc.id as u32)
253
5760
            .with_access_policy(AccessPolicy::NoUpdate)
254
5760
            .reduce()
255
3840
            .await
256
5760
            .unwrap();
257
5760
        measurement.finish();
258
5760
        OperationResult::Product {
259
5760
            id: doc.id as u32,
260
5760
            product: doc.contents,
261
5760
            rating: rating.average(),
262
5760
        }
263
11520
    }
264
}
265

            
266
#[async_trait]
267
impl Operator<CreateCart> for BonsaiOperator {
268
1218
    async fn operate(
269
1218
        &mut self,
270
1218
        _operation: &CreateCart,
271
1218
        _results: &[OperationResult],
272
1218
        measurements: &Measurements,
273
1218
    ) -> OperationResult {
274
1218
        let measurement = measurements.begin(self.label, Metric::CreateCart);
275
1218
        let cart = Cart::default().push_into(&self.database).await.unwrap();
276
1218
        measurement.finish();
277
1218
        OperationResult::Cart { id: cart.id as u32 }
278
2436
    }
279
}
280

            
281
#[async_trait]
282
impl Operator<AddProductToCart> for BonsaiOperator {
283
2997
    async fn operate(
284
2997
        &mut self,
285
2997
        operation: &AddProductToCart,
286
2997
        results: &[OperationResult],
287
2997
        measurements: &Measurements,
288
2997
    ) -> OperationResult {
289
2997
        let cart = match &results[operation.cart.0] {
290
2997
            OperationResult::Cart { id } => *id as u64,
291
            _ => unreachable!("Invalid operation result"),
292
        };
293
2997
        let product = match &results[operation.product.0] {
294
2997
            OperationResult::Product { id, .. } => *id,
295
            _ => unreachable!("Invalid operation result"),
296
        };
297

            
298
2997
        let measurement = measurements.begin(self.label, Metric::AddProductToCart);
299
2997
        let mut cart = Cart::get(cart, &self.database).await.unwrap().unwrap();
300
2997
        cart.contents.product_ids.push(product);
301
2997
        cart.update(&self.database).await.unwrap();
302
2997
        measurement.finish();
303
2997

            
304
2997
        OperationResult::CartProduct { id: product }
305
5994
    }
306
}
307

            
308
#[async_trait]
309
impl Operator<Checkout> for BonsaiOperator {
310
300
    async fn operate(
311
300
        &mut self,
312
300
        operation: &Checkout,
313
300
        results: &[OperationResult],
314
300
        measurements: &Measurements,
315
300
    ) -> OperationResult {
316
300
        let cart = match &results[operation.cart.0] {
317
300
            OperationResult::Cart { id } => *id as u64,
318
            _ => unreachable!("Invalid operation result"),
319
        };
320

            
321
300
        let measurement = measurements.begin(self.label, Metric::Checkout);
322
300
        let cart = Cart::get(cart, &self.database).await.unwrap().unwrap();
323
300
        cart.delete(&self.database).await.unwrap();
324
300
        Order {
325
300
            customer_id: operation.customer_id,
326
300
            product_ids: cart.contents.product_ids,
327
300
        }
328
300
        .push_into(&self.database)
329
251
        .await
330
300
        .unwrap();
331
300
        measurement.finish();
332
300

            
333
300
        OperationResult::Ok
334
600
    }
335
}
336

            
337
#[async_trait]
338
impl Operator<ReviewProduct> for BonsaiOperator {
339
198
    async fn operate(
340
198
        &mut self,
341
198
        operation: &ReviewProduct,
342
198
        results: &[OperationResult],
343
198
        measurements: &Measurements,
344
198
    ) -> OperationResult {
345
198
        let product_id = match &results[operation.product_id.0] {
346
            OperationResult::Product { id, .. } => *id,
347
198
            OperationResult::CartProduct { id, .. } => *id,
348
            other => unreachable!("Invalid operation result {:?}", other),
349
        };
350

            
351
198
        let measurement = measurements.begin(self.label, Metric::RateProduct);
352
198
        let review = ProductReview {
353
198
            customer_id: operation.customer_id,
354
198
            product_id,
355
198
            review: operation.review.clone(),
356
198
            rating: operation.rating,
357
198
        };
358
198
        match review.push_into(&self.database).await {
359
198
            Ok(_) => {}
360
            Err(InsertError {
361
                error:
362
                    bonsaidb::core::Error::UniqueKeyViolation {
363
                        existing_document, ..
364
                    },
365
                contents,
366
            }) => {
367
                CollectionDocument::<ProductReview> {
368
                    header: existing_document,
369
                    contents,
370
                }
371
                .update(&self.database)
372
                .await
373
                .unwrap();
374
            }
375
            other => {
376
                other.unwrap();
377
            }
378
        }
379
        // Force the view to update.
380
198
        self.database
381
198
            .view::<ProductReviewsByProduct>()
382
198
            .with_key(0)
383
207
            .reduce()
384
207
            .await
385
198
            .unwrap();
386
198
        measurement.finish();
387
198

            
388
198
        OperationResult::Ok
389
396
    }
390
}
391

            
392
impl Collection for Product {
393
174220
    fn collection_name() -> CollectionName {
394
174220
        CollectionName::new("benchmarks", "products")
395
174220
    }
396

            
397
    fn define_views(schema: &mut Schematic) -> Result<(), Error> {
398
21451
        schema.define_view(ProductsByName)?;
399
21451
        schema.define_view(ProductsByCategoryId)?;
400
21451
        Ok(())
401
21451
    }
402
}
403

            
404
impl DefaultSerialization for Product {}
405

            
406
define_basic_unique_mapped_view!(
407
    ProductsByName,
408
    Product,
409
    1,
410
    "by-name",
411
    String,
412
    (),
413
1488
    |document: CollectionDocument<Product>| {
414
1488
        document.header.emit_key(document.contents.name.clone())
415
1488
    },
416
);
417

            
418
22951
#[derive(Debug, Clone, View)]
419
#[view(collection = Product, key = u32, value = u32, name = "by-category")]
420
pub struct ProductsByCategoryId;
421

            
422
impl CollectionViewSchema for ProductsByCategoryId {
423
    type View = Self;
424

            
425
    fn map(
426
        &self,
427
        document: CollectionDocument<<Self::View as View>::Collection>,
428
    ) -> ViewMapResult<Self::View> {
429
        let mut mappings = Mappings::default();
430
        for &id in &document.contents.category_ids {
431
            mappings = mappings.and(document.emit_key_and_value(id, 1));
432
        }
433
        Ok(mappings)
434
    }
435
}
436

            
437
impl NamedCollection for Product {
438
    type ByNameView = ProductsByName;
439
}
440

            
441
impl Collection for ProductReview {
442
104223
    fn collection_name() -> CollectionName {
443
104223
        CollectionName::new("benchmarks", "reviews")
444
104223
    }
445

            
446
    fn define_views(schema: &mut Schematic) -> Result<(), Error> {
447
21451
        schema.define_view(ProductReviewsByProduct)?;
448
21451
        Ok(())
449
21451
    }
450
}
451

            
452
impl DefaultSerialization for ProductReview {}
453

            
454
47475
#[derive(Debug, Clone, View)]
455
#[view(collection = ProductReview, key = u32, value = ProductRatings, name = "by-product")]
456
pub struct ProductReviewsByProduct;
457

            
458
impl CollectionViewSchema for ProductReviewsByProduct {
459
    type View = Self;
460

            
461
1245
    fn map(
462
1245
        &self,
463
1245
        document: CollectionDocument<<Self as View>::Collection>,
464
1245
    ) -> ViewMapResult<Self::View> {
465
1245
        Ok(document.emit_key_and_value(
466
1245
            document.contents.product_id,
467
1245
            ProductRatings {
468
1245
                total_score: document.contents.rating as u32,
469
1245
                ratings: 1,
470
1245
            },
471
1245
        ))
472
1245
    }
473

            
474
7287
    fn reduce(
475
7287
        &self,
476
7287
        mappings: &[ViewMappedValue<Self::View>],
477
7287
        _rereduce: bool,
478
7287
    ) -> ReduceResult<Self::View> {
479
7287
        Ok(mappings
480
7287
            .iter()
481
7287
            .map(|mapping| mapping.value.clone())
482
7287
            .reduce(|a, b| ProductRatings {
483
534
                total_score: a.total_score + b.total_score,
484
534
                ratings: a.ratings + b.ratings,
485
7287
            })
486
7287
            .unwrap_or_default())
487
7287
    }
488
}
489

            
490
68834
#[derive(Debug, Serialize, Deserialize, Default, Clone)]
491
pub struct ProductRatings {
492
    pub total_score: u32,
493
    pub ratings: u32,
494
}
495

            
496
impl ProductRatings {
497
11789
    pub fn average(&self) -> Option<f32> {
498
11789
        if self.ratings > 0 {
499
5829
            Some(self.total_score as f32 / self.ratings as f32)
500
        } else {
501
5960
            None
502
        }
503
11789
    }
504
}
505

            
506
impl Collection for Category {
507
21622
    fn collection_name() -> CollectionName {
508
21622
        CollectionName::new("benchmarks", "categories")
509
21622
    }
510

            
511
21451
    fn define_views(_schema: &mut Schematic) -> Result<(), Error> {
512
21451
        Ok(())
513
21451
    }
514
}
515

            
516
impl DefaultSerialization for Category {}
517

            
518
impl Collection for Customer {
519
23158
    fn collection_name() -> CollectionName {
520
23158
        CollectionName::new("benchmarks", "customers")
521
23158
    }
522

            
523
21451
    fn define_views(_schema: &mut Schematic) -> Result<(), Error> {
524
21451
        Ok(())
525
21451
    }
526
}
527

            
528
impl DefaultSerialization for Customer {}
529

            
530
impl Collection for Order {
531
23260
    fn collection_name() -> CollectionName {
532
23260
        CollectionName::new("benchmarks", "orders")
533
23260
    }
534

            
535
21451
    fn define_views(_schema: &mut Schematic) -> Result<(), Error> {
536
21451
        Ok(())
537
21451
    }
538
}
539

            
540
impl DefaultSerialization for Order {}
541

            
542
impl Collection for Cart {
543
29263
    fn collection_name() -> CollectionName {
544
29263
        CollectionName::new("benchmarks", "carts")
545
29263
    }
546

            
547
21451
    fn define_views(_schema: &mut Schematic) -> Result<(), Error> {
548
21451
        Ok(())
549
21451
    }
550
}
551

            
552
impl DefaultSerialization for Cart {}