1
use std::{path::Path, time::Duration};
2

            
3
#[cfg(feature = "compression")]
4
use bonsaidb::local::config::Compression;
5
use bonsaidb::{
6
    client::{url::Url, Client},
7
    core::{
8
        async_trait::async_trait,
9
        connection::{
10
            AccessPolicy, AsyncConnection, AsyncLowLevelConnection, AsyncStorageConnection,
11
        },
12
        define_basic_unique_mapped_view,
13
        document::{CollectionDocument, CollectionHeader, Emit},
14
        schema::{
15
            view::map::Mappings, Collection, CollectionName, CollectionViewSchema,
16
            DefaultSerialization, InsertError, NamedCollection, Qualified, ReduceResult, Schema,
17
            Schematic, SerializedCollection, View, ViewMapResult, ViewMappedValue,
18
        },
19
        transaction::{self, Transaction},
20
        Error,
21
    },
22
    local::config::Builder,
23
    server::{DefaultPermissions, Server, ServerConfiguration},
24
    AnyDatabase,
25
};
26
use serde::{Deserialize, Serialize};
27

            
28
use crate::{
29
    execute::{Backend, BackendOperator, Measurements, Metric, Operator},
30
    model::{Cart, Category, Customer, Order, Product, ProductReview},
31
    plan::{
32
        AddProductToCart, Checkout, CreateCart, FindProduct, Load, LookupProduct, OperationResult,
33
        ReviewProduct,
34
    },
35
};
36

            
37
pub enum Bonsai {
38
    Local,
39
    LocalLz4,
40
    Quic,
41
    WebSockets,
42
}
43

            
44
impl Bonsai {
45
48
    pub fn label(&self) -> &'static str {
46
48
        match self {
47
12
            Self::Local => "bonsaidb-local",
48
12
            Self::LocalLz4 => "bonsaidb-local+lz4",
49
12
            Self::Quic => "bonsaidb-quic",
50
12
            Self::WebSockets => "bonsaidb-ws",
51
        }
52
48
    }
53
}
54

            
55
pub struct BonsaiBackend {
56
    server: Server,
57
    kind: Bonsai,
58
}
59

            
60
pub struct BonsaiOperator {
61
    label: &'static str,
62
    database: AnyDatabase,
63
}
64

            
65
29856
#[derive(Debug, Schema)]
66
#[schema(name = "commerce", authority = "benchmarks", collections = [Product, Category, Customer, Order, Cart, ProductReview])]
67
pub enum Commerce {}
68

            
69
#[async_trait]
70
impl Backend for BonsaiBackend {
71
    type Operator = BonsaiOperator;
72
    type Config = Bonsai;
73

            
74
48
    fn label(&self) -> &'static str {
75
48
        self.kind.label()
76
48
    }
77

            
78
    #[cfg_attr(not(feature = "compression"), allow(unused_mut))]
79
16
    async fn new(config: Self::Config) -> Self {
80
16
        let path = Path::new("commerce-benchmarks.bonsaidb");
81
16
        if path.exists() {
82
15
            std::fs::remove_dir_all(path).unwrap();
83
15
        }
84
16
        let mut server_config = ServerConfiguration::new(path)
85
16
            .default_permissions(DefaultPermissions::AllowAll)
86
16
            .with_schema::<Commerce>()
87
16
            .unwrap();
88

            
89
        #[cfg(feature = "compression")]
90
        {
91
16
            if matches!(config, Bonsai::LocalLz4) {
92
4
                server_config = server_config.default_compression(Compression::Lz4);
93
12
            }
94
        }
95

            
96
48
        let server = Server::open(server_config).await.unwrap();
97
160
        server.install_self_signed_certificate(false).await.unwrap();
98
16
        server
99
32
            .create_database::<Commerce>("commerce", false)
100
32
            .await
101
16
            .unwrap();
102
16

            
103
16
        match config {
104
4
            Bonsai::Quic => {
105
4
                let server = server.clone();
106
4
                tokio::spawn(async move {
107
31
                    server.listen_on(7022).await.unwrap();
108
4
                });
109
4
            }
110
4
            Bonsai::WebSockets => {
111
4
                let server = server.clone();
112
4
                tokio::spawn(async move {
113
4
                    server
114
10
                        .listen_for_websockets_on("0.0.0.0:7023", false)
115
10
                        .await
116
                        .unwrap();
117
4
                });
118
4
            }
119
8
            Bonsai::Local | Bonsai::LocalLz4 => {}
120
        }
121
        // Allow the server time to start listening
122
16
        tokio::time::sleep(Duration::from_millis(1000)).await;
123

            
124
16
        BonsaiBackend {
125
16
            server,
126
16
            kind: config,
127
16
        }
128
32
    }
129

            
130
48
    async fn new_operator_async(&self) -> Self::Operator {
131
48
        let database = match self.kind {
132
            Bonsai::Local | Bonsai::LocalLz4 => {
133
24
                AnyDatabase::Local(self.server.database::<Commerce>("commerce").await.unwrap())
134
            }
135

            
136
            Bonsai::Quic => {
137
12
                let client = Client::build(Url::parse("bonsaidb://localhost:7022").unwrap())
138
12
                    .with_certificate(
139
12
                        self.server
140
16
                            .certificate_chain()
141
16
                            .await
142
12
                            .unwrap()
143
12
                            .into_end_entity_certificate(),
144
12
                    )
145
12
                    .finish()
146
12
                    .unwrap();
147
12
                AnyDatabase::Networked(client.database::<Commerce>("commerce").await.unwrap())
148
            }
149
            Bonsai::WebSockets => {
150
12
                let client = Client::build(Url::parse("ws://localhost:7023").unwrap())
151
12
                    .finish()
152
12
                    .unwrap();
153
12
                AnyDatabase::Networked(client.database::<Commerce>("commerce").await.unwrap())
154
            }
155
        };
156
48
        BonsaiOperator {
157
48
            database,
158
48
            label: self.label(),
159
48
        }
160
96
    }
161
}
162

            
163
impl BackendOperator for BonsaiOperator {}
164

            
165
#[async_trait]
166
impl Operator<Load> for BonsaiOperator {
167
16
    async fn operate(
168
16
        &mut self,
169
16
        operation: &Load,
170
16
        _results: &[OperationResult],
171
16
        measurements: &Measurements,
172
16
    ) -> OperationResult {
173
16
        let measurement = measurements.begin(self.label, Metric::Load);
174
16
        let mut tx = Transaction::default();
175
140
        for (id, category) in &operation.initial_data.categories {
176
140
            tx.push(
177
140
                transaction::Operation::insert_serialized::<Category>(Some(*id), category).unwrap(),
178
140
            );
179
140
        }
180
1296
        for (id, product) in &operation.initial_data.products {
181
1296
            tx.push(
182
1296
                transaction::Operation::insert_serialized::<Product>(Some(*id), product).unwrap(),
183
1296
            );
184
1296
        }
185
2248
        for (id, customer) in &operation.initial_data.customers {
186
2248
            tx.push(
187
2248
                transaction::Operation::insert_serialized::<Customer>(Some(*id), customer).unwrap(),
188
2248
            );
189
2248
        }
190
1868
        for (id, order) in &operation.initial_data.orders {
191
1868
            tx.push(transaction::Operation::insert_serialized::<Order>(Some(*id), order).unwrap());
192
1868
        }
193
1756
        for review in &operation.initial_data.reviews {
194
1756
            tx.push(
195
1756
                transaction::Operation::insert_serialized::<ProductReview>(None, review).unwrap(),
196
1756
            );
197
1756
        }
198
16
        self.database.apply_transaction(tx).await.unwrap();
199
16
        measurement.finish();
200
16
        OperationResult::Ok
201
32
    }
202
}
203

            
204
#[async_trait]
205
impl Operator<FindProduct> for BonsaiOperator {
206
9156
    async fn operate(
207
9156
        &mut self,
208
9156
        operation: &FindProduct,
209
9156
        _results: &[OperationResult],
210
9156
        measurements: &Measurements,
211
9156
    ) -> OperationResult {
212
9156
        let measurement = measurements.begin(self.label, Metric::FindProduct);
213
16836
        let doc = Product::load_async(&operation.name, &self.database)
214
16836
            .await
215
9156
            .unwrap()
216
9156
            .unwrap();
217
9156
        let rating = self
218
9156
            .database
219
9156
            .view::<ProductReviewsByProduct>()
220
9156
            .with_key(doc.header.id)
221
9156
            .with_access_policy(AccessPolicy::NoUpdate)
222
9156
            .reduce()
223
8380
            .await
224
9156
            .unwrap();
225
9156
        measurement.finish();
226
9156
        OperationResult::Product {
227
9156
            id: doc.header.id,
228
9156
            product: doc.contents,
229
9156
            rating: rating.average(),
230
9156
        }
231
18312
    }
232
}
233

            
234
#[async_trait]
235
impl Operator<LookupProduct> for BonsaiOperator {
236
9316
    async fn operate(
237
9316
        &mut self,
238
9316
        operation: &LookupProduct,
239
9316
        _results: &[OperationResult],
240
9316
        measurements: &Measurements,
241
9316
    ) -> OperationResult {
242
9316
        let measurement = measurements.begin(self.label, Metric::LookupProduct);
243
9316
        let doc = Product::get_async(operation.id, &self.database)
244
8359
            .await
245
9316
            .unwrap()
246
9316
            .unwrap();
247
9316
        let rating = self
248
9316
            .database
249
9316
            .view::<ProductReviewsByProduct>()
250
9316
            .with_key(doc.header.id)
251
9316
            .with_access_policy(AccessPolicy::NoUpdate)
252
9316
            .reduce()
253
8562
            .await
254
9316
            .unwrap();
255
9316
        measurement.finish();
256
9316
        OperationResult::Product {
257
9316
            id: doc.header.id,
258
9316
            product: doc.contents,
259
9316
            rating: rating.average(),
260
9316
        }
261
18632
    }
262
}
263

            
264
#[async_trait]
265
impl Operator<CreateCart> for BonsaiOperator {
266
1824
    async fn operate(
267
1824
        &mut self,
268
1824
        _operation: &CreateCart,
269
1824
        _results: &[OperationResult],
270
1824
        measurements: &Measurements,
271
1824
    ) -> OperationResult {
272
1824
        let measurement = measurements.begin(self.label, Metric::CreateCart);
273
1824
        let cart = Cart::default()
274
1824
            .push_into_async(&self.database)
275
1679
            .await
276
1824
            .unwrap();
277
1824
        measurement.finish();
278
1824
        OperationResult::Cart { id: cart.header.id }
279
3648
    }
280
}
281

            
282
#[async_trait]
283
impl Operator<AddProductToCart> for BonsaiOperator {
284
4776
    async fn operate(
285
4776
        &mut self,
286
4776
        operation: &AddProductToCart,
287
4776
        results: &[OperationResult],
288
4776
        measurements: &Measurements,
289
4776
    ) -> OperationResult {
290
4776
        let cart = match &results[operation.cart.0] {
291
4776
            OperationResult::Cart { id } => *id,
292
            _ => unreachable!("Invalid operation result"),
293
        };
294
4776
        let product = match &results[operation.product.0] {
295
4776
            OperationResult::Product { id, .. } => *id,
296
            _ => unreachable!("Invalid operation result"),
297
        };
298

            
299
4776
        let measurement = measurements.begin(self.label, Metric::AddProductToCart);
300
4776
        let mut cart = Cart::get_async(cart, &self.database)
301
4274
            .await
302
4776
            .unwrap()
303
4776
            .unwrap();
304
4776
        cart.contents.product_ids.push(product);
305
4776
        cart.update_async(&self.database).await.unwrap();
306
4776
        measurement.finish();
307
4776

            
308
4776
        OperationResult::CartProduct { id: product }
309
9552
    }
310
}
311

            
312
#[async_trait]
313
impl Operator<Checkout> for BonsaiOperator {
314
452
    async fn operate(
315
452
        &mut self,
316
452
        operation: &Checkout,
317
452
        results: &[OperationResult],
318
452
        measurements: &Measurements,
319
452
    ) -> OperationResult {
320
452
        let cart = match &results[operation.cart.0] {
321
452
            OperationResult::Cart { id } => *id,
322
            _ => unreachable!("Invalid operation result"),
323
        };
324

            
325
452
        let measurement = measurements.begin(self.label, Metric::Checkout);
326
452
        let cart = Cart::get_async(cart, &self.database)
327
388
            .await
328
452
            .unwrap()
329
452
            .unwrap();
330
452
        cart.delete_async(&self.database).await.unwrap();
331
452
        Order {
332
452
            customer_id: operation.customer_id,
333
452
            product_ids: cart.contents.product_ids,
334
452
        }
335
452
        .push_into_async(&self.database)
336
398
        .await
337
452
        .unwrap();
338
452
        measurement.finish();
339
452

            
340
452
        OperationResult::Ok
341
904
    }
342
}
343

            
344
#[async_trait]
345
impl Operator<ReviewProduct> for BonsaiOperator {
346
352
    async fn operate(
347
352
        &mut self,
348
352
        operation: &ReviewProduct,
349
352
        results: &[OperationResult],
350
352
        measurements: &Measurements,
351
352
    ) -> OperationResult {
352
352
        let product_id = match &results[operation.product_id.0] {
353
            OperationResult::Product { id, .. } => *id,
354
352
            OperationResult::CartProduct { id, .. } => *id,
355
            other => unreachable!("Invalid operation result {:?}", other),
356
        };
357

            
358
352
        let measurement = measurements.begin(self.label, Metric::RateProduct);
359
352
        let review = ProductReview {
360
352
            customer_id: operation.customer_id,
361
352
            product_id,
362
352
            review: operation.review.clone(),
363
352
            rating: operation.rating,
364
352
        };
365
352
        // https://github.com/khonsulabs/bonsaidb/issues/189
366
352
        match review.push_into_async(&self.database).await {
367
352
            Ok(_) => {}
368
            Err(InsertError {
369
                error:
370
                    bonsaidb::core::Error::UniqueKeyViolation {
371
                        existing_document, ..
372
                    },
373
                contents,
374
            }) => {
375
                CollectionDocument::<ProductReview> {
376
                    header: CollectionHeader::try_from(*existing_document).unwrap(),
377
                    contents,
378
                }
379
                .update_async(&self.database)
380
                .await
381
                .unwrap();
382
            }
383
            other => {
384
                other.unwrap();
385
            }
386
        }
387
        // Force the view to update.
388
352
        self.database
389
352
            .view::<ProductReviewsByProduct>()
390
352
            .with_key(0)
391
352
            .reduce()
392
352
            .await
393
352
            .unwrap();
394
352
        measurement.finish();
395
352

            
396
352
        OperationResult::Ok
397
704
    }
398
}
399

            
400
impl Collection for Product {
401
    type PrimaryKey = u32;
402

            
403
259581
    fn collection_name() -> CollectionName {
404
259581
        CollectionName::new("benchmarks", "products")
405
259581
    }
406

            
407
    fn define_views(schema: &mut Schematic) -> Result<(), Error> {
408
29856
        schema.define_view(ProductsByName)?;
409
29856
        schema.define_view(ProductsByCategoryId)?;
410
29856
        Ok(())
411
29856
    }
412
}
413

            
414
impl DefaultSerialization for Product {}
415

            
416
define_basic_unique_mapped_view!(
417
    ProductsByName,
418
    Product,
419
    1,
420
    "by-name",
421
    String,
422
    (),
423
1296
    |document: CollectionDocument<Product>| { document.header.emit_key(document.contents.name) },
424
);
425

            
426
31168
#[derive(Debug, Clone, View)]
427
#[view(collection = Product, key = u32, value = u32, name = "by-category")]
428
pub struct ProductsByCategoryId;
429

            
430
impl CollectionViewSchema for ProductsByCategoryId {
431
    type View = Self;
432

            
433
    fn map(
434
        &self,
435
        document: CollectionDocument<<Self::View as View>::Collection>,
436
    ) -> ViewMapResult<Self::View> {
437
        let mut mappings = Mappings::default();
438
        for &id in &document.contents.category_ids {
439
            mappings = mappings.and(document.header.emit_key_and_value(id, 1)?);
440
        }
441
        Ok(mappings)
442
    }
443
}
444

            
445
impl NamedCollection for Product {
446
    type ByNameView = ProductsByName;
447
}
448

            
449
impl Collection for ProductReview {
450
    type PrimaryKey = u32;
451

            
452
171588
    fn collection_name() -> CollectionName {
453
171588
        CollectionName::new("benchmarks", "reviews")
454
171588
    }
455

            
456
    fn define_views(schema: &mut Schematic) -> Result<(), Error> {
457
29856
        schema.define_view(ProductReviewsByProduct)?;
458
29856
        Ok(())
459
29856
    }
460
}
461

            
462
impl DefaultSerialization for ProductReview {}
463

            
464
89860
#[derive(Debug, Clone, View)]
465
#[view(collection = ProductReview, key = u32, value = ProductRatings, name = "by-product")]
466
pub struct ProductReviewsByProduct;
467

            
468
impl CollectionViewSchema for ProductReviewsByProduct {
469
    type View = Self;
470

            
471
2108
    fn map(
472
2108
        &self,
473
2108
        document: CollectionDocument<<Self as View>::Collection>,
474
2108
    ) -> ViewMapResult<Self::View> {
475
2108
        document.header.emit_key_and_value(
476
2108
            document.contents.product_id,
477
2108
            ProductRatings {
478
2108
                total_score: document.contents.rating as u32,
479
2108
                ratings: 1,
480
2108
            },
481
2108
        )
482
2108
    }
483

            
484
5980
    fn reduce(
485
5980
        &self,
486
5980
        mappings: &[ViewMappedValue<Self::View>],
487
5980
        _rereduce: bool,
488
5980
    ) -> ReduceResult<Self::View> {
489
5980
        Ok(mappings
490
5980
            .iter()
491
5980
            .map(|mapping| mapping.value.clone())
492
5980
            .reduce(|a, b| ProductRatings {
493
3896
                total_score: a.total_score + b.total_score,
494
3896
                ratings: a.ratings + b.ratings,
495
5980
            })
496
5980
            .unwrap_or_default())
497
5980
    }
498
}
499

            
500
118340
#[derive(Debug, Serialize, Deserialize, Default, Clone)]
501
pub struct ProductRatings {
502
    pub total_score: u32,
503
    pub ratings: u32,
504
}
505

            
506
impl ProductRatings {
507
18472
    pub fn average(&self) -> Option<f32> {
508
18472
        if self.ratings > 0 {
509
13440
            Some(self.total_score as f32 / self.ratings as f32)
510
        } else {
511
5032
            None
512
        }
513
18472
    }
514
}
515

            
516
impl Collection for Category {
517
    type PrimaryKey = u32;
518

            
519
29996
    fn collection_name() -> CollectionName {
520
29996
        CollectionName::new("benchmarks", "categories")
521
29996
    }
522

            
523
29856
    fn define_views(_schema: &mut Schematic) -> Result<(), Error> {
524
29856
        Ok(())
525
29856
    }
526
}
527

            
528
impl DefaultSerialization for Category {}
529

            
530
impl Collection for Customer {
531
    type PrimaryKey = u32;
532

            
533
32104
    fn collection_name() -> CollectionName {
534
32104
        CollectionName::new("benchmarks", "customers")
535
32104
    }
536

            
537
29856
    fn define_views(_schema: &mut Schematic) -> Result<(), Error> {
538
29856
        Ok(())
539
29856
    }
540
}
541

            
542
impl DefaultSerialization for Customer {}
543

            
544
impl Collection for Order {
545
    type PrimaryKey = u32;
546

            
547
32176
    fn collection_name() -> CollectionName {
548
32176
        CollectionName::new("benchmarks", "orders")
549
32176
    }
550

            
551
29856
    fn define_views(_schema: &mut Schematic) -> Result<(), Error> {
552
29856
        Ok(())
553
29856
    }
554
}
555

            
556
impl DefaultSerialization for Order {}
557

            
558
impl Collection for Cart {
559
    type PrimaryKey = u32;
560

            
561
42136
    fn collection_name() -> CollectionName {
562
42136
        CollectionName::new("benchmarks", "carts")
563
42136
    }
564

            
565
29856
    fn define_views(_schema: &mut Schematic) -> Result<(), Error> {
566
29856
        Ok(())
567
29856
    }
568
}
569

            
570
impl DefaultSerialization for Cart {}