1
use std::{path::Path, time::Duration};
2

            
3
#[cfg(feature = "compression")]
4
use bonsaidb::local::config::Compression;
5
use bonsaidb::{
6
    client::{url::Url, Client},
7
    core::{
8
        async_trait::async_trait,
9
        connection::{
10
            AccessPolicy, AsyncConnection, AsyncLowLevelConnection, AsyncStorageConnection,
11
        },
12
        define_basic_unique_mapped_view,
13
        document::{CollectionDocument, CollectionHeader, Emit},
14
        schema::{
15
            view::map::Mappings, Collection, CollectionName, CollectionViewSchema,
16
            DefaultSerialization, InsertError, NamedCollection, Qualified, ReduceResult, Schema,
17
            Schematic, SerializedCollection, View, ViewMapResult, ViewMappedValue,
18
        },
19
        transaction::{self, Transaction},
20
        Error,
21
    },
22
    local::config::Builder,
23
    server::{DefaultPermissions, Server, ServerConfiguration},
24
    AnyDatabase,
25
};
26
use serde::{Deserialize, Serialize};
27

            
28
use crate::{
29
    execute::{Backend, BackendOperator, Measurements, Metric, Operator},
30
    model::{Cart, Category, Customer, Order, Product, ProductReview},
31
    plan::{
32
        AddProductToCart, Checkout, CreateCart, FindProduct, Load, LookupProduct, OperationResult,
33
        ReviewProduct,
34
    },
35
};
36

            
37
pub enum Bonsai {
38
    Local,
39
    LocalLz4,
40
    Quic,
41
    WebSockets,
42
}
43

            
44
impl Bonsai {
45
48
    pub fn label(&self) -> &'static str {
46
48
        match self {
47
12
            Self::Local => "bonsaidb-local",
48
12
            Self::LocalLz4 => "bonsaidb-local+lz4",
49
12
            Self::Quic => "bonsaidb-quic",
50
12
            Self::WebSockets => "bonsaidb-ws",
51
        }
52
48
    }
53
}
54

            
55
pub struct BonsaiBackend {
56
    server: Server,
57
    kind: Bonsai,
58
}
59

            
60
pub struct BonsaiOperator {
61
    label: &'static str,
62
    database: AnyDatabase,
63
}
64

            
65
28578
#[derive(Debug, Schema)]
66
#[schema(name = "commerce", authority = "benchmarks", collections = [Product, Category, Customer, Order, Cart, ProductReview])]
67
pub enum Commerce {}
68

            
69
#[async_trait]
70
impl Backend for BonsaiBackend {
71
    type Operator = BonsaiOperator;
72
    type Config = Bonsai;
73

            
74
48
    fn label(&self) -> &'static str {
75
48
        self.kind.label()
76
48
    }
77

            
78
    #[cfg_attr(not(feature = "compression"), allow(unused_mut))]
79
16
    async fn new(config: Self::Config) -> Self {
80
16
        let path = Path::new("commerce-benchmarks.bonsaidb");
81
16
        if path.exists() {
82
15
            std::fs::remove_dir_all(path).unwrap();
83
15
        }
84
16
        let mut server_config = ServerConfiguration::new(path)
85
16
            .default_permissions(DefaultPermissions::AllowAll)
86
16
            .with_schema::<Commerce>()
87
16
            .unwrap();
88

            
89
        #[cfg(feature = "compression")]
90
        {
91
16
            if matches!(config, Bonsai::LocalLz4) {
92
4
                server_config = server_config.default_compression(Compression::Lz4);
93
12
            }
94
        }
95

            
96
48
        let server = Server::open(server_config).await.unwrap();
97
160
        server.install_self_signed_certificate(false).await.unwrap();
98
16
        server
99
32
            .create_database::<Commerce>("commerce", false)
100
32
            .await
101
16
            .unwrap();
102
16

            
103
16
        match config {
104
4
            Bonsai::Quic => {
105
4
                let server = server.clone();
106
4
                tokio::spawn(async move {
107
31
                    server.listen_on(7022).await.unwrap();
108
4
                });
109
4
            }
110
4
            Bonsai::WebSockets => {
111
4
                let server = server.clone();
112
4
                tokio::spawn(async move {
113
4
                    server
114
12
                        .listen_for_websockets_on("0.0.0.0:7023", false)
115
12
                        .await
116
                        .unwrap();
117
4
                });
118
4
            }
119
8
            Bonsai::Local | Bonsai::LocalLz4 => {}
120
        }
121
        // Allow the server time to start listening
122
16
        tokio::time::sleep(Duration::from_millis(1000)).await;
123

            
124
16
        BonsaiBackend {
125
16
            server,
126
16
            kind: config,
127
16
        }
128
32
    }
129

            
130
48
    async fn new_operator_async(&self) -> Self::Operator {
131
48
        let database = match self.kind {
132
            Bonsai::Local | Bonsai::LocalLz4 => {
133
24
                AnyDatabase::Local(self.server.database::<Commerce>("commerce").await.unwrap())
134
            }
135

            
136
            Bonsai::Quic => {
137
12
                let client = Client::build(Url::parse("bonsaidb://localhost:7022").unwrap())
138
12
                    .with_certificate(
139
12
                        self.server
140
12
                            .certificate_chain()
141
11
                            .await
142
12
                            .unwrap()
143
12
                            .into_end_entity_certificate(),
144
12
                    )
145
12
                    .finish()
146
12
                    .unwrap();
147
12
                AnyDatabase::Networked(client.database::<Commerce>("commerce").await.unwrap())
148
            }
149
            Bonsai::WebSockets => {
150
12
                let client = Client::build(Url::parse("ws://localhost:7023").unwrap())
151
12
                    .finish()
152
12
                    .unwrap();
153
12
                AnyDatabase::Networked(client.database::<Commerce>("commerce").await.unwrap())
154
            }
155
        };
156
48
        BonsaiOperator {
157
48
            database,
158
48
            label: self.label(),
159
48
        }
160
96
    }
161
}
162

            
163
impl BackendOperator for BonsaiOperator {}
164

            
165
#[async_trait]
166
impl Operator<Load> for BonsaiOperator {
167
16
    async fn operate(
168
16
        &mut self,
169
16
        operation: &Load,
170
16
        _results: &[OperationResult],
171
16
        measurements: &Measurements,
172
16
    ) -> OperationResult {
173
16
        let measurement = measurements.begin(self.label, Metric::Load);
174
16
        let mut tx = Transaction::default();
175
200
        for (id, category) in &operation.initial_data.categories {
176
200
            tx.push(
177
200
                transaction::Operation::insert_serialized::<Category>(Some(*id), category).unwrap(),
178
200
            );
179
200
        }
180
1832
        for (id, product) in &operation.initial_data.products {
181
1832
            tx.push(
182
1832
                transaction::Operation::insert_serialized::<Product>(Some(*id), product).unwrap(),
183
1832
            );
184
1832
        }
185
2860
        for (id, customer) in &operation.initial_data.customers {
186
2860
            tx.push(
187
2860
                transaction::Operation::insert_serialized::<Customer>(Some(*id), customer).unwrap(),
188
2860
            );
189
2860
        }
190
5336
        for (id, order) in &operation.initial_data.orders {
191
5336
            tx.push(transaction::Operation::insert_serialized::<Order>(Some(*id), order).unwrap());
192
5336
        }
193
1092
        for review in &operation.initial_data.reviews {
194
1092
            tx.push(
195
1092
                transaction::Operation::insert_serialized::<ProductReview>(None, review).unwrap(),
196
1092
            );
197
1092
        }
198
16
        self.database.apply_transaction(tx).await.unwrap();
199
16
        measurement.finish();
200
16
        OperationResult::Ok
201
32
    }
202
}
203

            
204
#[async_trait]
205
impl Operator<FindProduct> for BonsaiOperator {
206
8868
    async fn operate(
207
8868
        &mut self,
208
8868
        operation: &FindProduct,
209
8868
        _results: &[OperationResult],
210
8868
        measurements: &Measurements,
211
8868
    ) -> OperationResult {
212
8868
        let measurement = measurements.begin(self.label, Metric::FindProduct);
213
16431
        let doc = Product::load_async(&operation.name, &self.database)
214
16431
            .await
215
8868
            .unwrap()
216
8868
            .unwrap();
217
8868
        let rating = self
218
8868
            .database
219
8868
            .view::<ProductReviewsByProduct>()
220
8868
            .with_key(doc.header.id)
221
8868
            .with_access_policy(AccessPolicy::NoUpdate)
222
8868
            .reduce()
223
8162
            .await
224
8868
            .unwrap();
225
8868
        measurement.finish();
226
8868
        OperationResult::Product {
227
8868
            id: doc.header.id,
228
8868
            product: doc.contents,
229
8868
            rating: rating.average(),
230
8868
        }
231
17736
    }
232
}
233

            
234
#[async_trait]
235
impl Operator<LookupProduct> for BonsaiOperator {
236
8756
    async fn operate(
237
8756
        &mut self,
238
8756
        operation: &LookupProduct,
239
8756
        _results: &[OperationResult],
240
8756
        measurements: &Measurements,
241
8756
    ) -> OperationResult {
242
8756
        let measurement = measurements.begin(self.label, Metric::LookupProduct);
243
8756
        let doc = Product::get_async(operation.id, &self.database)
244
7962
            .await
245
8756
            .unwrap()
246
8756
            .unwrap();
247
8756
        let rating = self
248
8756
            .database
249
8756
            .view::<ProductReviewsByProduct>()
250
8756
            .with_key(doc.header.id)
251
8756
            .with_access_policy(AccessPolicy::NoUpdate)
252
8756
            .reduce()
253
8139
            .await
254
8756
            .unwrap();
255
8756
        measurement.finish();
256
8756
        OperationResult::Product {
257
8756
            id: doc.header.id,
258
8756
            product: doc.contents,
259
8756
            rating: rating.average(),
260
8756
        }
261
17512
    }
262
}
263

            
264
#[async_trait]
265
impl Operator<CreateCart> for BonsaiOperator {
266
1832
    async fn operate(
267
1832
        &mut self,
268
1832
        _operation: &CreateCart,
269
1832
        _results: &[OperationResult],
270
1832
        measurements: &Measurements,
271
1832
    ) -> OperationResult {
272
1832
        let measurement = measurements.begin(self.label, Metric::CreateCart);
273
1832
        let cart = Cart::default()
274
1832
            .push_into_async(&self.database)
275
1697
            .await
276
1832
            .unwrap();
277
1832
        measurement.finish();
278
1832
        OperationResult::Cart { id: cart.header.id }
279
3664
    }
280
}
281

            
282
#[async_trait]
283
impl Operator<AddProductToCart> for BonsaiOperator {
284
4516
    async fn operate(
285
4516
        &mut self,
286
4516
        operation: &AddProductToCart,
287
4516
        results: &[OperationResult],
288
4516
        measurements: &Measurements,
289
4516
    ) -> OperationResult {
290
4516
        let cart = match &results[operation.cart.0] {
291
4516
            OperationResult::Cart { id } => *id,
292
            _ => unreachable!("Invalid operation result"),
293
        };
294
4516
        let product = match &results[operation.product.0] {
295
4516
            OperationResult::Product { id, .. } => *id,
296
            _ => unreachable!("Invalid operation result"),
297
        };
298

            
299
4516
        let measurement = measurements.begin(self.label, Metric::AddProductToCart);
300
4516
        let mut cart = Cart::get_async(cart, &self.database)
301
4101
            .await
302
4516
            .unwrap()
303
4516
            .unwrap();
304
4516
        cart.contents.product_ids.push(product);
305
4516
        cart.update_async(&self.database).await.unwrap();
306
4516
        measurement.finish();
307
4516

            
308
4516
        OperationResult::CartProduct { id: product }
309
9032
    }
310
}
311

            
312
#[async_trait]
313
impl Operator<Checkout> for BonsaiOperator {
314
448
    async fn operate(
315
448
        &mut self,
316
448
        operation: &Checkout,
317
448
        results: &[OperationResult],
318
448
        measurements: &Measurements,
319
448
    ) -> OperationResult {
320
448
        let cart = match &results[operation.cart.0] {
321
448
            OperationResult::Cart { id } => *id,
322
            _ => unreachable!("Invalid operation result"),
323
        };
324

            
325
448
        let measurement = measurements.begin(self.label, Metric::Checkout);
326
448
        let cart = Cart::get_async(cart, &self.database)
327
385
            .await
328
448
            .unwrap()
329
448
            .unwrap();
330
448
        cart.delete_async(&self.database).await.unwrap();
331
448
        Order {
332
448
            customer_id: operation.customer_id,
333
448
            product_ids: cart.contents.product_ids,
334
448
        }
335
448
        .push_into_async(&self.database)
336
402
        .await
337
448
        .unwrap();
338
448
        measurement.finish();
339
448

            
340
448
        OperationResult::Ok
341
896
    }
342
}
343

            
344
#[async_trait]
345
impl Operator<ReviewProduct> for BonsaiOperator {
346
328
    async fn operate(
347
328
        &mut self,
348
328
        operation: &ReviewProduct,
349
328
        results: &[OperationResult],
350
328
        measurements: &Measurements,
351
328
    ) -> OperationResult {
352
328
        let product_id = match &results[operation.product_id.0] {
353
            OperationResult::Product { id, .. } => *id,
354
328
            OperationResult::CartProduct { id, .. } => *id,
355
            other => unreachable!("Invalid operation result {:?}", other),
356
        };
357

            
358
328
        let measurement = measurements.begin(self.label, Metric::RateProduct);
359
328
        let review = ProductReview {
360
328
            customer_id: operation.customer_id,
361
328
            product_id,
362
328
            review: operation.review.clone(),
363
328
            rating: operation.rating,
364
328
        };
365
328
        // https://github.com/khonsulabs/bonsaidb/issues/189
366
328
        match review.push_into_async(&self.database).await {
367
328
            Ok(_) => {}
368
            Err(InsertError {
369
                error:
370
                    bonsaidb::core::Error::UniqueKeyViolation {
371
                        existing_document, ..
372
                    },
373
                contents,
374
            }) => {
375
                CollectionDocument::<ProductReview> {
376
                    header: CollectionHeader::try_from(*existing_document).unwrap(),
377
                    contents,
378
                }
379
                .update_async(&self.database)
380
                .await
381
                .unwrap();
382
            }
383
            other => {
384
                other.unwrap();
385
            }
386
        }
387
        // Force the view to update.
388
328
        self.database
389
328
            .view::<ProductReviewsByProduct>()
390
328
            .with_key(0)
391
328
            .reduce()
392
328
            .await
393
328
            .unwrap();
394
328
        measurement.finish();
395
328

            
396
328
        OperationResult::Ok
397
656
    }
398
}
399

            
400
impl Collection for Product {
401
    type PrimaryKey = u32;
402

            
403
251735
    fn collection_name() -> CollectionName {
404
251735
        CollectionName::new("benchmarks", "products")
405
251735
    }
406

            
407
    fn define_views(schema: &mut Schematic) -> Result<(), Error> {
408
28578
        schema.define_view(ProductsByName)?;
409
28578
        schema.define_view(ProductsByCategoryId)?;
410
28578
        Ok(())
411
28578
    }
412
}
413

            
414
impl DefaultSerialization for Product {}
415

            
416
define_basic_unique_mapped_view!(
417
    ProductsByName,
418
    Product,
419
    1,
420
    "by-name",
421
    String,
422
    (),
423
1832
    |document: CollectionDocument<Product>| { document.header.emit_key(document.contents.name) },
424
);
425

            
426
30426
#[derive(Debug, Clone, View)]
427
#[view(collection = Product, key = u32, value = u32, name = "by-category")]
428
pub struct ProductsByCategoryId;
429

            
430
impl CollectionViewSchema for ProductsByCategoryId {
431
    type View = Self;
432

            
433
    fn map(
434
        &self,
435
        document: CollectionDocument<<Self::View as View>::Collection>,
436
    ) -> ViewMapResult<Self::View> {
437
        let mut mappings = Mappings::default();
438
        for &id in &document.contents.category_ids {
439
            mappings = mappings.and(document.header.emit_key_and_value(id, 1)?);
440
        }
441
        Ok(mappings)
442
    }
443
}
444

            
445
impl NamedCollection for Product {
446
    type ByNameView = ProductsByName;
447
}
448

            
449
impl Collection for ProductReview {
450
    type PrimaryKey = u32;
451

            
452
162719
    fn collection_name() -> CollectionName {
453
162719
        CollectionName::new("benchmarks", "reviews")
454
162719
    }
455

            
456
    fn define_views(schema: &mut Schematic) -> Result<(), Error> {
457
28578
        schema.define_view(ProductReviewsByProduct)?;
458
28578
        Ok(())
459
28578
    }
460
}
461

            
462
impl DefaultSerialization for ProductReview {}
463

            
464
85180
#[derive(Debug, Clone, View)]
465
#[view(collection = ProductReview, key = u32, value = ProductRatings, name = "by-product")]
466
pub struct ProductReviewsByProduct;
467

            
468
impl CollectionViewSchema for ProductReviewsByProduct {
469
    type View = Self;
470

            
471
1420
    fn map(
472
1420
        &self,
473
1420
        document: CollectionDocument<<Self as View>::Collection>,
474
1420
    ) -> ViewMapResult<Self::View> {
475
1420
        document.header.emit_key_and_value(
476
1420
            document.contents.product_id,
477
1420
            ProductRatings {
478
1420
                total_score: document.contents.rating as u32,
479
1420
                ratings: 1,
480
1420
            },
481
1420
        )
482
1420
    }
483

            
484
11031
    fn reduce(
485
11031
        &self,
486
11031
        mappings: &[ViewMappedValue<Self::View>],
487
11031
        _rereduce: bool,
488
11031
    ) -> ReduceResult<Self::View> {
489
11031
        Ok(mappings
490
11031
            .iter()
491
11031
            .map(|mapping| mapping.value.clone())
492
11031
            .reduce(|a, b| ProductRatings {
493
556
                total_score: a.total_score + b.total_score,
494
556
                ratings: a.ratings + b.ratings,
495
11031
            })
496
11031
            .unwrap_or_default())
497
11031
    }
498
}
499

            
500
98260
#[derive(Debug, Serialize, Deserialize, Default, Clone)]
501
pub struct ProductRatings {
502
    pub total_score: u32,
503
    pub ratings: u32,
504
}
505

            
506
impl ProductRatings {
507
17624
    pub fn average(&self) -> Option<f32> {
508
17624
        if self.ratings > 0 {
509
7909
            Some(self.total_score as f32 / self.ratings as f32)
510
        } else {
511
9715
            None
512
        }
513
17624
    }
514
}
515

            
516
impl Collection for Category {
517
    type PrimaryKey = u32;
518

            
519
28778
    fn collection_name() -> CollectionName {
520
28778
        CollectionName::new("benchmarks", "categories")
521
28778
    }
522

            
523
28578
    fn define_views(_schema: &mut Schematic) -> Result<(), Error> {
524
28578
        Ok(())
525
28578
    }
526
}
527

            
528
impl DefaultSerialization for Category {}
529

            
530
impl Collection for Customer {
531
    type PrimaryKey = u32;
532

            
533
31438
    fn collection_name() -> CollectionName {
534
31438
        CollectionName::new("benchmarks", "customers")
535
31438
    }
536

            
537
28578
    fn define_views(_schema: &mut Schematic) -> Result<(), Error> {
538
28578
        Ok(())
539
28578
    }
540
}
541

            
542
impl DefaultSerialization for Customer {}
543

            
544
impl Collection for Order {
545
    type PrimaryKey = u32;
546

            
547
34362
    fn collection_name() -> CollectionName {
548
34362
        CollectionName::new("benchmarks", "orders")
549
34362
    }
550

            
551
28578
    fn define_views(_schema: &mut Schematic) -> Result<(), Error> {
552
28578
        Ok(())
553
28578
    }
554
}
555

            
556
impl DefaultSerialization for Order {}
557

            
558
impl Collection for Cart {
559
    type PrimaryKey = u32;
560

            
561
40338
    fn collection_name() -> CollectionName {
562
40338
        CollectionName::new("benchmarks", "carts")
563
40338
    }
564

            
565
28578
    fn define_views(_schema: &mut Schematic) -> Result<(), Error> {
566
28578
        Ok(())
567
28578
    }
568
}
569

            
570
impl DefaultSerialization for Cart {}