1
use std::{path::Path, time::Duration};
2

            
3
#[cfg(feature = "compression")]
4
use bonsaidb::local::config::Compression;
5
use bonsaidb::{
6
    client::{url::Url, Client},
7
    core::{
8
        async_trait::async_trait,
9
        connection::{AccessPolicy, Connection, StorageConnection},
10
        define_basic_unique_mapped_view,
11
        document::{CollectionDocument, CollectionHeader, Emit},
12
        schema::{
13
            view::map::Mappings, Collection, CollectionName, CollectionViewSchema,
14
            DefaultSerialization, InsertError, NamedCollection, ReduceResult, Schema, Schematic,
15
            SerializedCollection, View, ViewMapResult, ViewMappedValue,
16
        },
17
        transaction::{self, Transaction},
18
        Error,
19
    },
20
    local::config::Builder,
21
    server::{DefaultPermissions, Server, ServerConfiguration},
22
    AnyDatabase,
23
};
24
use serde::{Deserialize, Serialize};
25

            
26
use crate::{
27
    execute::{Backend, BackendOperator, Measurements, Metric, Operator},
28
    model::{Cart, Category, Customer, Order, Product, ProductReview},
29
    plan::{
30
        AddProductToCart, Checkout, CreateCart, FindProduct, Load, LookupProduct, OperationResult,
31
        ReviewProduct,
32
    },
33
};
34

            
35
pub enum Bonsai {
36
    Local,
37
    LocalLz4,
38
    Quic,
39
    WebSockets,
40
}
41

            
42
impl Bonsai {
43
52
    pub fn label(&self) -> &'static str {
44
52
        match self {
45
13
            Self::Local => "bonsaidb-local",
46
13
            Self::LocalLz4 => "bonsaidb-local+lz4",
47
13
            Self::Quic => "bonsaidb-quic",
48
13
            Self::WebSockets => "bonsaidb-ws",
49
        }
50
52
    }
51
}
52

            
53
pub struct BonsaiBackend {
54
    server: Server,
55
    kind: Bonsai,
56
}
57

            
58
pub struct BonsaiOperator {
59
    label: &'static str,
60
    database: AnyDatabase,
61
}
62

            
63
20736
#[derive(Debug, Schema)]
64
#[schema(name = "commerce", authority = "benchmarks", collections = [Product, Category, Customer, Order, Cart, ProductReview])]
65
pub enum Commerce {}
66

            
67
#[async_trait]
68
impl Backend for BonsaiBackend {
69
    type Operator = BonsaiOperator;
70
    type Config = Bonsai;
71

            
72
52
    fn label(&self) -> &'static str {
73
52
        self.kind.label()
74
52
    }
75

            
76
    #[cfg_attr(not(feature = "compression"), allow(unused_mut))]
77
16
    async fn new(config: Self::Config) -> Self {
78
16
        let path = Path::new("commerce-benchmarks.bonsaidb");
79
16
        if path.exists() {
80
15
            std::fs::remove_dir_all(path).unwrap();
81
15
        }
82
16
        let mut server_config = ServerConfiguration::new(path)
83
16
            .default_permissions(DefaultPermissions::AllowAll)
84
16
            .with_schema::<Commerce>()
85
16
            .unwrap();
86

            
87
        #[cfg(feature = "compression")]
88
        {
89
16
            if matches!(config, Bonsai::LocalLz4) {
90
4
                server_config = server_config.default_compression(Compression::Lz4);
91
12
            }
92
        }
93

            
94
292
        let server = Server::open(server_config).await.unwrap();
95
124
        server.install_self_signed_certificate(false).await.unwrap();
96
16
        server
97
28
            .create_database::<Commerce>("commerce", false)
98
28
            .await
99
16
            .unwrap();
100
16

            
101
16
        match config {
102
4
            Bonsai::Quic => {
103
4
                let server = server.clone();
104
4
                tokio::spawn(async move {
105
27
                    server.listen_on(7022).await.unwrap();
106
4
                });
107
4
            }
108
4
            Bonsai::WebSockets => {
109
4
                let server = server.clone();
110
4
                tokio::spawn(async move {
111
4
                    server
112
10
                        .listen_for_websockets_on("0.0.0.0:7023", false)
113
10
                        .await
114
                        .unwrap();
115
4
                });
116
4
            }
117
8
            Bonsai::Local | Bonsai::LocalLz4 => {}
118
        }
119
        // Allow the server time to start listening
120
16
        tokio::time::sleep(Duration::from_millis(1000)).await;
121

            
122
16
        BonsaiBackend {
123
16
            server,
124
16
            kind: config,
125
16
        }
126
32
    }
127

            
128
52
    async fn new_operator_async(&self) -> Self::Operator {
129
52
        let database = match self.kind {
130
            Bonsai::Local | Bonsai::LocalLz4 => {
131
26
                AnyDatabase::Local(self.server.database::<Commerce>("commerce").await.unwrap())
132
            }
133

            
134
            Bonsai::Quic => {
135
13
                let client = Client::build(Url::parse("bonsaidb://localhost:7022").unwrap())
136
13
                    .with_certificate(
137
13
                        self.server
138
13
                            .certificate_chain()
139
                            .await
140
13
                            .unwrap()
141
13
                            .into_end_entity_certificate(),
142
13
                    )
143
13
                    .finish()
144
                    .await
145
13
                    .unwrap();
146
13
                AnyDatabase::Networked(client.database::<Commerce>("commerce").await.unwrap())
147
            }
148
            Bonsai::WebSockets => {
149
13
                let client = Client::build(Url::parse("ws://localhost:7023").unwrap())
150
13
                    .finish()
151
                    .await
152
13
                    .unwrap();
153
13
                AnyDatabase::Networked(client.database::<Commerce>("commerce").await.unwrap())
154
            }
155
        };
156
52
        BonsaiOperator {
157
52
            database,
158
52
            label: self.label(),
159
52
        }
160
104
    }
161
}
162

            
163
impl BackendOperator for BonsaiOperator {}
164

            
165
#[async_trait]
166
impl Operator<Load> for BonsaiOperator {
167
16
    async fn operate(
168
16
        &mut self,
169
16
        operation: &Load,
170
16
        _results: &[OperationResult],
171
16
        measurements: &Measurements,
172
16
    ) -> OperationResult {
173
16
        let measurement = measurements.begin(self.label, Metric::Load);
174
16
        let mut tx = Transaction::default();
175
212
        for (id, category) in &operation.initial_data.categories {
176
212
            tx.push(
177
212
                transaction::Operation::insert_serialized::<Category>(Some(*id), category).unwrap(),
178
212
            );
179
212
        }
180
1460
        for (id, product) in &operation.initial_data.products {
181
1460
            tx.push(
182
1460
                transaction::Operation::insert_serialized::<Product>(Some(*id), product).unwrap(),
183
1460
            );
184
1460
        }
185
1832
        for (id, customer) in &operation.initial_data.customers {
186
1832
            tx.push(
187
1832
                transaction::Operation::insert_serialized::<Customer>(Some(*id), customer).unwrap(),
188
1832
            );
189
1832
        }
190
3036
        for (id, order) in &operation.initial_data.orders {
191
3036
            tx.push(transaction::Operation::insert_serialized::<Order>(Some(*id), order).unwrap());
192
3036
        }
193
1236
        for review in &operation.initial_data.reviews {
194
1236
            tx.push(
195
1236
                transaction::Operation::insert_serialized::<ProductReview>(None, review).unwrap(),
196
1236
            );
197
1236
        }
198
23
        self.database.apply_transaction(tx).await.unwrap();
199
16
        measurement.finish();
200
16
        OperationResult::Ok
201
32
    }
202
}
203

            
204
#[async_trait]
205
impl Operator<FindProduct> for BonsaiOperator {
206
7656
    async fn operate(
207
7656
        &mut self,
208
7656
        operation: &FindProduct,
209
7656
        _results: &[OperationResult],
210
7656
        measurements: &Measurements,
211
7656
    ) -> OperationResult {
212
7656
        let measurement = measurements.begin(self.label, Metric::FindProduct);
213
7877
        let doc = Product::load(&operation.name, &self.database)
214
7877
            .await
215
7656
            .unwrap()
216
7656
            .unwrap();
217
7656
        let rating = self
218
7656
            .database
219
7656
            .view::<ProductReviewsByProduct>()
220
7656
            .with_key(doc.header.id)
221
7656
            .with_access_policy(AccessPolicy::NoUpdate)
222
7656
            .reduce()
223
3828
            .await
224
7656
            .unwrap();
225
7656
        measurement.finish();
226
7656
        OperationResult::Product {
227
7656
            id: doc.header.id,
228
7656
            product: doc.contents,
229
7656
            rating: rating.average(),
230
7656
        }
231
15312
    }
232
}
233

            
234
#[async_trait]
235
impl Operator<LookupProduct> for BonsaiOperator {
236
7480
    async fn operate(
237
7480
        &mut self,
238
7480
        operation: &LookupProduct,
239
7480
        _results: &[OperationResult],
240
7480
        measurements: &Measurements,
241
7480
    ) -> OperationResult {
242
7480
        let measurement = measurements.begin(self.label, Metric::LookupProduct);
243
7480
        let doc = Product::get(operation.id, &self.database)
244
5690
            .await
245
7480
            .unwrap()
246
7480
            .unwrap();
247
7480
        let rating = self
248
7480
            .database
249
7480
            .view::<ProductReviewsByProduct>()
250
7480
            .with_key(doc.header.id)
251
7480
            .with_access_policy(AccessPolicy::NoUpdate)
252
7480
            .reduce()
253
3740
            .await
254
7480
            .unwrap();
255
7480
        measurement.finish();
256
7480
        OperationResult::Product {
257
7480
            id: doc.header.id,
258
7480
            product: doc.contents,
259
7480
            rating: rating.average(),
260
7480
        }
261
14960
    }
262
}
263

            
264
#[async_trait]
265
impl Operator<CreateCart> for BonsaiOperator {
266
1600
    async fn operate(
267
1600
        &mut self,
268
1600
        _operation: &CreateCart,
269
1600
        _results: &[OperationResult],
270
1600
        measurements: &Measurements,
271
1600
    ) -> OperationResult {
272
1600
        let measurement = measurements.begin(self.label, Metric::CreateCart);
273
1600
        let cart = Cart::default().push_into(&self.database).await.unwrap();
274
1600
        measurement.finish();
275
1600
        OperationResult::Cart { id: cart.header.id }
276
3200
    }
277
}
278

            
279
#[async_trait]
280
impl Operator<AddProductToCart> for BonsaiOperator {
281
3840
    async fn operate(
282
3840
        &mut self,
283
3840
        operation: &AddProductToCart,
284
3840
        results: &[OperationResult],
285
3840
        measurements: &Measurements,
286
3840
    ) -> OperationResult {
287
3840
        let cart = match &results[operation.cart.0] {
288
3840
            OperationResult::Cart { id } => *id,
289
            _ => unreachable!("Invalid operation result"),
290
        };
291
3840
        let product = match &results[operation.product.0] {
292
3840
            OperationResult::Product { id, .. } => *id,
293
            _ => unreachable!("Invalid operation result"),
294
        };
295

            
296
3840
        let measurement = measurements.begin(self.label, Metric::AddProductToCart);
297
3840
        let mut cart = Cart::get(cart, &self.database).await.unwrap().unwrap();
298
3840
        cart.contents.product_ids.push(product);
299
3840
        cart.update(&self.database).await.unwrap();
300
3840
        measurement.finish();
301
3840

            
302
3840
        OperationResult::CartProduct { id: product }
303
7680
    }
304
}
305

            
306
#[async_trait]
307
impl Operator<Checkout> for BonsaiOperator {
308
416
    async fn operate(
309
416
        &mut self,
310
416
        operation: &Checkout,
311
416
        results: &[OperationResult],
312
416
        measurements: &Measurements,
313
416
    ) -> OperationResult {
314
416
        let cart = match &results[operation.cart.0] {
315
416
            OperationResult::Cart { id } => *id,
316
            _ => unreachable!("Invalid operation result"),
317
        };
318

            
319
416
        let measurement = measurements.begin(self.label, Metric::Checkout);
320
416
        let cart = Cart::get(cart, &self.database).await.unwrap().unwrap();
321
416
        cart.delete(&self.database).await.unwrap();
322
416
        Order {
323
416
            customer_id: operation.customer_id,
324
416
            product_ids: cart.contents.product_ids,
325
416
        }
326
416
        .push_into(&self.database)
327
332
        .await
328
416
        .unwrap();
329
416
        measurement.finish();
330
416

            
331
416
        OperationResult::Ok
332
832
    }
333
}
334

            
335
#[async_trait]
336
impl Operator<ReviewProduct> for BonsaiOperator {
337
260
    async fn operate(
338
260
        &mut self,
339
260
        operation: &ReviewProduct,
340
260
        results: &[OperationResult],
341
260
        measurements: &Measurements,
342
260
    ) -> OperationResult {
343
260
        let product_id = match &results[operation.product_id.0] {
344
            OperationResult::Product { id, .. } => *id,
345
260
            OperationResult::CartProduct { id, .. } => *id,
346
            other => unreachable!("Invalid operation result {:?}", other),
347
        };
348

            
349
260
        let measurement = measurements.begin(self.label, Metric::RateProduct);
350
260
        let review = ProductReview {
351
260
            customer_id: operation.customer_id,
352
260
            product_id,
353
260
            review: operation.review.clone(),
354
260
            rating: operation.rating,
355
260
        };
356
260
        // https://github.com/khonsulabs/bonsaidb/issues/189
357
260
        match review.push_into(&self.database).await {
358
260
            Ok(_) => {}
359
            Err(InsertError {
360
                error:
361
                    bonsaidb::core::Error::UniqueKeyViolation {
362
                        existing_document, ..
363
                    },
364
                contents,
365
            }) => {
366
                CollectionDocument::<ProductReview> {
367
                    header: CollectionHeader::try_from(*existing_document).unwrap(),
368
                    contents,
369
                }
370
                .update(&self.database)
371
                .await
372
                .unwrap();
373
            }
374
            other => {
375
                other.unwrap();
376
            }
377
        }
378
        // Force the view to update.
379
260
        self.database
380
260
            .view::<ProductReviewsByProduct>()
381
260
            .with_key(0)
382
280
            .reduce()
383
280
            .await
384
260
            .unwrap();
385
260
        measurement.finish();
386
260

            
387
260
        OperationResult::Ok
388
520
    }
389
}
390

            
391
impl Collection for Product {
392
    type PrimaryKey = u32;
393

            
394
185963
    fn collection_name() -> CollectionName {
395
185963
        CollectionName::new("benchmarks", "products")
396
185963
    }
397

            
398
    fn define_views(schema: &mut Schematic) -> Result<(), Error> {
399
20736
        schema.define_view(ProductsByName)?;
400
20736
        schema.define_view(ProductsByCategoryId)?;
401
20736
        Ok(())
402
20736
    }
403
}
404

            
405
impl DefaultSerialization for Product {}
406

            
407
define_basic_unique_mapped_view!(
408
    ProductsByName,
409
    Product,
410
    1,
411
    "by-name",
412
    String,
413
    (),
414
1460
    |document: CollectionDocument<Product>| { document.header.emit_key(document.contents.name) },
415
);
416

            
417
22212
#[derive(Debug, Clone, View)]
418
#[view(collection = Product, key = u32, value = u32, name = "by-category")]
419
pub struct ProductsByCategoryId;
420

            
421
impl CollectionViewSchema for ProductsByCategoryId {
422
    type View = Self;
423

            
424
    fn map(
425
        &self,
426
        document: CollectionDocument<<Self::View as View>::Collection>,
427
    ) -> ViewMapResult<Self::View> {
428
        let mut mappings = Mappings::default();
429
        for &id in &document.contents.category_ids {
430
            mappings = mappings.and(document.header.emit_key_and_value(id, 1)?);
431
        }
432
        Ok(mappings)
433
    }
434
}
435

            
436
impl NamedCollection for Product {
437
    type ByNameView = ProductsByName;
438
}
439

            
440
impl Collection for ProductReview {
441
    type PrimaryKey = u32;
442

            
443
113243
    fn collection_name() -> CollectionName {
444
113243
        CollectionName::new("benchmarks", "reviews")
445
113243
    }
446

            
447
    fn define_views(schema: &mut Schematic) -> Result<(), Error> {
448
20736
        schema.define_view(ProductReviewsByProduct)?;
449
20736
        Ok(())
450
20736
    }
451
}
452

            
453
impl DefaultSerialization for ProductReview {}
454

            
455
54079
#[derive(Debug, Clone, View)]
456
#[view(collection = ProductReview, key = u32, value = ProductRatings, name = "by-product")]
457
pub struct ProductReviewsByProduct;
458

            
459
impl CollectionViewSchema for ProductReviewsByProduct {
460
    type View = Self;
461

            
462
1496
    fn map(
463
1496
        &self,
464
1496
        document: CollectionDocument<<Self as View>::Collection>,
465
1496
    ) -> ViewMapResult<Self::View> {
466
1496
        document.header.emit_key_and_value(
467
1496
            document.contents.product_id,
468
1496
            ProductRatings {
469
1496
                total_score: document.contents.rating as u32,
470
1496
                ratings: 1,
471
1496
            },
472
1496
        )
473
1496
    }
474

            
475
7241
    fn reduce(
476
7241
        &self,
477
7241
        mappings: &[ViewMappedValue<Self::View>],
478
7241
        _rereduce: bool,
479
7241
    ) -> ReduceResult<Self::View> {
480
7241
        Ok(mappings
481
7241
            .iter()
482
7241
            .map(|mapping| mapping.value.clone())
483
7241
            .reduce(|a, b| ProductRatings {
484
1156
                total_score: a.total_score + b.total_score,
485
1156
                ratings: a.ratings + b.ratings,
486
7241
            })
487
7241
            .unwrap_or_default())
488
7241
    }
489
}
490

            
491
90240
#[derive(Debug, Serialize, Deserialize, Default, Clone)]
492
pub struct ProductRatings {
493
    pub total_score: u32,
494
    pub ratings: u32,
495
}
496

            
497
impl ProductRatings {
498
15135
    pub fn average(&self) -> Option<f32> {
499
15135
        if self.ratings > 0 {
500
9391
            Some(self.total_score as f32 / self.ratings as f32)
501
        } else {
502
5744
            None
503
        }
504
15135
    }
505
}
506

            
507
impl Collection for Category {
508
    type PrimaryKey = u32;
509

            
510
20948
    fn collection_name() -> CollectionName {
511
20948
        CollectionName::new("benchmarks", "categories")
512
20948
    }
513

            
514
20736
    fn define_views(_schema: &mut Schematic) -> Result<(), Error> {
515
20736
        Ok(())
516
20736
    }
517
}
518

            
519
impl DefaultSerialization for Category {}
520

            
521
impl Collection for Customer {
522
    type PrimaryKey = u32;
523

            
524
22568
    fn collection_name() -> CollectionName {
525
22568
        CollectionName::new("benchmarks", "customers")
526
22568
    }
527

            
528
20736
    fn define_views(_schema: &mut Schematic) -> Result<(), Error> {
529
20736
        Ok(())
530
20736
    }
531
}
532

            
533
impl DefaultSerialization for Customer {}
534

            
535
impl Collection for Order {
536
    type PrimaryKey = u32;
537

            
538
24188
    fn collection_name() -> CollectionName {
539
24188
        CollectionName::new("benchmarks", "orders")
540
24188
    }
541

            
542
20736
    fn define_views(_schema: &mut Schematic) -> Result<(), Error> {
543
20736
        Ok(())
544
20736
    }
545
}
546

            
547
impl DefaultSerialization for Order {}
548

            
549
impl Collection for Cart {
550
    type PrimaryKey = u32;
551

            
552
30848
    fn collection_name() -> CollectionName {
553
30848
        CollectionName::new("benchmarks", "carts")
554
30848
    }
555

            
556
20736
    fn define_views(_schema: &mut Schematic) -> Result<(), Error> {
557
20736
        Ok(())
558
20736
    }
559
}
560

            
561
impl DefaultSerialization for Cart {}