1
use std::{path::Path, time::Duration};
2

            
3
#[cfg(feature = "compression")]
4
use bonsaidb::local::config::Compression;
5
use bonsaidb::{
6
    client::{url::Url, Client},
7
    core::{
8
        async_trait::async_trait,
9
        connection::{AccessPolicy, Connection, StorageConnection},
10
        define_basic_unique_mapped_view,
11
        document::{CollectionDocument, CollectionHeader, Emit},
12
        schema::{
13
            view::map::Mappings, Collection, CollectionName, CollectionViewSchema,
14
            DefaultSerialization, InsertError, NamedCollection, ReduceResult, Schema, Schematic,
15
            SerializedCollection, View, ViewMapResult, ViewMappedValue,
16
        },
17
        transaction::{self, Transaction},
18
        Error,
19
    },
20
    local::config::Builder,
21
    server::{DefaultPermissions, Server, ServerConfiguration},
22
    AnyDatabase,
23
};
24
use serde::{Deserialize, Serialize};
25

            
26
use crate::{
27
    execute::{Backend, BackendOperator, Measurements, Metric, Operator},
28
    model::{Cart, Category, Customer, Order, Product, ProductReview},
29
    plan::{
30
        AddProductToCart, Checkout, CreateCart, FindProduct, Load, LookupProduct, OperationResult,
31
        ReviewProduct,
32
    },
33
};
34

            
35
pub enum Bonsai {
36
    Local,
37
    LocalLz4,
38
    Quic,
39
    WebSockets,
40
}
41

            
42
impl Bonsai {
43
52
    pub fn label(&self) -> &'static str {
44
52
        match self {
45
13
            Self::Local => "bonsaidb-local",
46
13
            Self::LocalLz4 => "bonsaidb-local+lz4",
47
13
            Self::Quic => "bonsaidb-quic",
48
13
            Self::WebSockets => "bonsaidb-ws",
49
        }
50
52
    }
51
}
52

            
53
pub struct BonsaiBackend {
54
    server: Server,
55
    kind: Bonsai,
56
}
57

            
58
pub struct BonsaiOperator {
59
    label: &'static str,
60
    database: AnyDatabase,
61
}
62

            
63
21324
#[derive(Debug, Schema)]
64
#[schema(name = "commerce", authority = "benchmarks", collections = [Product, Category, Customer, Order, Cart, ProductReview])]
65
pub enum Commerce {}
66

            
67
#[async_trait]
68
impl Backend for BonsaiBackend {
69
    type Operator = BonsaiOperator;
70
    type Config = Bonsai;
71

            
72
52
    fn label(&self) -> &'static str {
73
52
        self.kind.label()
74
52
    }
75

            
76
    #[cfg_attr(not(feature = "compression"), allow(unused_mut))]
77
16
    async fn new(config: Self::Config) -> Self {
78
16
        let path = Path::new("commerce-benchmarks.bonsaidb");
79
16
        if path.exists() {
80
15
            std::fs::remove_dir_all(path).unwrap();
81
15
        }
82
16
        let mut server_config = ServerConfiguration::new(path)
83
16
            .default_permissions(DefaultPermissions::AllowAll)
84
16
            .with_schema::<Commerce>()
85
16
            .unwrap();
86

            
87
        #[cfg(feature = "compression")]
88
        {
89
16
            if matches!(config, Bonsai::LocalLz4) {
90
4
                server_config = server_config.default_compression(Compression::Lz4);
91
12
            }
92
        }
93

            
94
307
        let server = Server::open(server_config).await.unwrap();
95
154
        server.install_self_signed_certificate(false).await.unwrap();
96
16
        server
97
30
            .create_database::<Commerce>("commerce", false)
98
30
            .await
99
16
            .unwrap();
100
16

            
101
16
        match config {
102
4
            Bonsai::Quic => {
103
4
                let server = server.clone();
104
4
                tokio::spawn(async move {
105
26
                    server.listen_on(7022).await.unwrap();
106
4
                });
107
4
            }
108
4
            Bonsai::WebSockets => {
109
4
                let server = server.clone();
110
4
                tokio::spawn(async move {
111
4
                    server
112
8
                        .listen_for_websockets_on("0.0.0.0:7023", false)
113
8
                        .await
114
                        .unwrap();
115
4
                });
116
4
            }
117
8
            Bonsai::Local | Bonsai::LocalLz4 => {}
118
        }
119
        // Allow the server time to start listening
120
16
        tokio::time::sleep(Duration::from_millis(1000)).await;
121

            
122
16
        BonsaiBackend {
123
16
            server,
124
16
            kind: config,
125
16
        }
126
32
    }
127

            
128
52
    async fn new_operator_async(&self) -> Self::Operator {
129
52
        let database = match self.kind {
130
            Bonsai::Local | Bonsai::LocalLz4 => {
131
26
                AnyDatabase::Local(self.server.database::<Commerce>("commerce").await.unwrap())
132
            }
133

            
134
            Bonsai::Quic => {
135
13
                let client = Client::build(Url::parse("bonsaidb://localhost:7022").unwrap())
136
13
                    .with_certificate(
137
13
                        self.server
138
13
                            .certificate_chain()
139
                            .await
140
13
                            .unwrap()
141
13
                            .into_end_entity_certificate(),
142
13
                    )
143
13
                    .finish()
144
                    .await
145
13
                    .unwrap();
146
13
                AnyDatabase::Networked(client.database::<Commerce>("commerce").await.unwrap())
147
            }
148
            Bonsai::WebSockets => {
149
13
                let client = Client::build(Url::parse("ws://localhost:7023").unwrap())
150
13
                    .finish()
151
                    .await
152
13
                    .unwrap();
153
13
                AnyDatabase::Networked(client.database::<Commerce>("commerce").await.unwrap())
154
            }
155
        };
156
52
        BonsaiOperator {
157
52
            database,
158
52
            label: self.label(),
159
52
        }
160
104
    }
161
}
162

            
163
impl BackendOperator for BonsaiOperator {}
164

            
165
#[async_trait]
166
impl Operator<Load> for BonsaiOperator {
167
16
    async fn operate(
168
16
        &mut self,
169
16
        operation: &Load,
170
16
        _results: &[OperationResult],
171
16
        measurements: &Measurements,
172
16
    ) -> OperationResult {
173
16
        let measurement = measurements.begin(self.label, Metric::Load);
174
16
        let mut tx = Transaction::default();
175
172
        for (id, category) in &operation.initial_data.categories {
176
172
            tx.push(
177
172
                transaction::Operation::insert_serialized::<Category>(Some(*id), category).unwrap(),
178
172
            );
179
172
        }
180
1840
        for (id, product) in &operation.initial_data.products {
181
1840
            tx.push(
182
1840
                transaction::Operation::insert_serialized::<Product>(Some(*id), product).unwrap(),
183
1840
            );
184
1840
        }
185
2968
        for (id, customer) in &operation.initial_data.customers {
186
2968
            tx.push(
187
2968
                transaction::Operation::insert_serialized::<Customer>(Some(*id), customer).unwrap(),
188
2968
            );
189
2968
        }
190
2460
        for (id, order) in &operation.initial_data.orders {
191
2460
            tx.push(transaction::Operation::insert_serialized::<Order>(Some(*id), order).unwrap());
192
2460
        }
193
812
        for review in &operation.initial_data.reviews {
194
812
            tx.push(
195
812
                transaction::Operation::insert_serialized::<ProductReview>(None, review).unwrap(),
196
812
            );
197
812
        }
198
29
        self.database.apply_transaction(tx).await.unwrap();
199
16
        measurement.finish();
200
16
        OperationResult::Ok
201
32
    }
202
}
203

            
204
#[async_trait]
205
impl Operator<FindProduct> for BonsaiOperator {
206
7732
    async fn operate(
207
7732
        &mut self,
208
7732
        operation: &FindProduct,
209
7732
        _results: &[OperationResult],
210
7732
        measurements: &Measurements,
211
7732
    ) -> OperationResult {
212
7732
        let measurement = measurements.begin(self.label, Metric::FindProduct);
213
8181
        let doc = Product::load(&operation.name, &self.database)
214
8181
            .await
215
7732
            .unwrap()
216
7732
            .unwrap();
217
7732
        let rating = self
218
7732
            .database
219
7732
            .view::<ProductReviewsByProduct>()
220
7732
            .with_key(doc.header.id)
221
7732
            .with_access_policy(AccessPolicy::NoUpdate)
222
7732
            .reduce()
223
3866
            .await
224
7732
            .unwrap();
225
7732
        measurement.finish();
226
7732
        OperationResult::Product {
227
7732
            id: doc.header.id,
228
7732
            product: doc.contents,
229
7732
            rating: rating.average(),
230
7732
        }
231
15464
    }
232
}
233

            
234
#[async_trait]
235
impl Operator<LookupProduct> for BonsaiOperator {
236
7912
    async fn operate(
237
7912
        &mut self,
238
7912
        operation: &LookupProduct,
239
7912
        _results: &[OperationResult],
240
7912
        measurements: &Measurements,
241
7912
    ) -> OperationResult {
242
7912
        let measurement = measurements.begin(self.label, Metric::LookupProduct);
243
7912
        let doc = Product::get(operation.id, &self.database)
244
6346
            .await
245
7912
            .unwrap()
246
7912
            .unwrap();
247
7912
        let rating = self
248
7912
            .database
249
7912
            .view::<ProductReviewsByProduct>()
250
7912
            .with_key(doc.header.id)
251
7912
            .with_access_policy(AccessPolicy::NoUpdate)
252
7912
            .reduce()
253
3956
            .await
254
7912
            .unwrap();
255
7912
        measurement.finish();
256
7912
        OperationResult::Product {
257
7912
            id: doc.header.id,
258
7912
            product: doc.contents,
259
7912
            rating: rating.average(),
260
7912
        }
261
15824
    }
262
}
263

            
264
#[async_trait]
265
impl Operator<CreateCart> for BonsaiOperator {
266
1604
    async fn operate(
267
1604
        &mut self,
268
1604
        _operation: &CreateCart,
269
1604
        _results: &[OperationResult],
270
1604
        measurements: &Measurements,
271
1604
    ) -> OperationResult {
272
1604
        let measurement = measurements.begin(self.label, Metric::CreateCart);
273
1604
        let cart = Cart::default().push_into(&self.database).await.unwrap();
274
1604
        measurement.finish();
275
1604
        OperationResult::Cart { id: cart.header.id }
276
3208
    }
277
}
278

            
279
#[async_trait]
280
impl Operator<AddProductToCart> for BonsaiOperator {
281
3960
    async fn operate(
282
3960
        &mut self,
283
3960
        operation: &AddProductToCart,
284
3960
        results: &[OperationResult],
285
3960
        measurements: &Measurements,
286
3960
    ) -> OperationResult {
287
3960
        let cart = match &results[operation.cart.0] {
288
3960
            OperationResult::Cart { id } => *id,
289
            _ => unreachable!("Invalid operation result"),
290
        };
291
3960
        let product = match &results[operation.product.0] {
292
3960
            OperationResult::Product { id, .. } => *id,
293
            _ => unreachable!("Invalid operation result"),
294
        };
295

            
296
3960
        let measurement = measurements.begin(self.label, Metric::AddProductToCart);
297
3960
        let mut cart = Cart::get(cart, &self.database).await.unwrap().unwrap();
298
3960
        cart.contents.product_ids.push(product);
299
3960
        cart.update(&self.database).await.unwrap();
300
3959
        measurement.finish();
301
3959

            
302
3959
        OperationResult::CartProduct { id: product }
303
7919
    }
304
}
305

            
306
#[async_trait]
307
impl Operator<Checkout> for BonsaiOperator {
308
380
    async fn operate(
309
380
        &mut self,
310
380
        operation: &Checkout,
311
380
        results: &[OperationResult],
312
380
        measurements: &Measurements,
313
380
    ) -> OperationResult {
314
380
        let cart = match &results[operation.cart.0] {
315
380
            OperationResult::Cart { id } => *id,
316
            _ => unreachable!("Invalid operation result"),
317
        };
318

            
319
380
        let measurement = measurements.begin(self.label, Metric::Checkout);
320
380
        let cart = Cart::get(cart, &self.database).await.unwrap().unwrap();
321
380
        cart.delete(&self.database).await.unwrap();
322
380
        Order {
323
380
            customer_id: operation.customer_id,
324
380
            product_ids: cart.contents.product_ids,
325
380
        }
326
380
        .push_into(&self.database)
327
309
        .await
328
380
        .unwrap();
329
380
        measurement.finish();
330
380

            
331
380
        OperationResult::Ok
332
760
    }
333
}
334

            
335
#[async_trait]
336
impl Operator<ReviewProduct> for BonsaiOperator {
337
272
    async fn operate(
338
272
        &mut self,
339
272
        operation: &ReviewProduct,
340
272
        results: &[OperationResult],
341
272
        measurements: &Measurements,
342
272
    ) -> OperationResult {
343
272
        let product_id = match &results[operation.product_id.0] {
344
            OperationResult::Product { id, .. } => *id,
345
272
            OperationResult::CartProduct { id, .. } => *id,
346
            other => unreachable!("Invalid operation result {:?}", other),
347
        };
348

            
349
272
        let measurement = measurements.begin(self.label, Metric::RateProduct);
350
272
        let review = ProductReview {
351
272
            customer_id: operation.customer_id,
352
272
            product_id,
353
272
            review: operation.review.clone(),
354
272
            rating: operation.rating,
355
272
        };
356
272
        // https://github.com/khonsulabs/bonsaidb/issues/189
357
272
        match review.push_into(&self.database).await {
358
272
            Ok(_) => {}
359
            Err(InsertError {
360
                error:
361
                    bonsaidb::core::Error::UniqueKeyViolation {
362
                        existing_document, ..
363
                    },
364
                contents,
365
            }) => {
366
                CollectionDocument::<ProductReview> {
367
                    header: CollectionHeader::try_from(*existing_document).unwrap(),
368
                    contents,
369
                }
370
                .update(&self.database)
371
                .await
372
                .unwrap();
373
            }
374
            other => {
375
                other.unwrap();
376
            }
377
        }
378
        // Force the view to update.
379
272
        self.database
380
272
            .view::<ProductReviewsByProduct>()
381
272
            .with_key(0)
382
286
            .reduce()
383
286
            .await
384
272
            .unwrap();
385
272
        measurement.finish();
386
272

            
387
272
        OperationResult::Ok
388
544
    }
389
}
390

            
391
impl Collection for Product {
392
    type PrimaryKey = u32;
393

            
394
191537
    fn collection_name() -> CollectionName {
395
191537
        CollectionName::new("benchmarks", "products")
396
191537
    }
397

            
398
    fn define_views(schema: &mut Schematic) -> Result<(), Error> {
399
21324
        schema.define_view(ProductsByName)?;
400
21324
        schema.define_view(ProductsByCategoryId)?;
401
21324
        Ok(())
402
21324
    }
403
}
404

            
405
impl DefaultSerialization for Product {}
406

            
407
define_basic_unique_mapped_view!(
408
    ProductsByName,
409
    Product,
410
    1,
411
    "by-name",
412
    String,
413
    (),
414
1840
    |document: CollectionDocument<Product>| { document.header.emit_key(document.contents.name) },
415
);
416

            
417
23180
#[derive(Debug, Clone, View)]
418
#[view(collection = Product, key = u32, value = u32, name = "by-category")]
419
pub struct ProductsByCategoryId;
420

            
421
impl CollectionViewSchema for ProductsByCategoryId {
422
    type View = Self;
423

            
424
    fn map(
425
        &self,
426
        document: CollectionDocument<<Self::View as View>::Collection>,
427
    ) -> ViewMapResult<Self::View> {
428
        let mut mappings = Mappings::default();
429
        for &id in &document.contents.category_ids {
430
            mappings = mappings.and(document.header.emit_key_and_value(id, 1)?);
431
        }
432
        Ok(mappings)
433
    }
434
}
435

            
436
impl NamedCollection for Product {
437
    type ByNameView = ProductsByName;
438
}
439

            
440
impl Collection for ProductReview {
441
    type PrimaryKey = u32;
442

            
443
115827
    fn collection_name() -> CollectionName {
444
115827
        CollectionName::new("benchmarks", "reviews")
445
115827
    }
446

            
447
    fn define_views(schema: &mut Schematic) -> Result<(), Error> {
448
21324
        schema.define_view(ProductReviewsByProduct)?;
449
21324
        Ok(())
450
21324
    }
451
}
452

            
453
impl DefaultSerialization for ProductReview {}
454

            
455
55344
#[derive(Debug, Clone, View)]
456
#[view(collection = ProductReview, key = u32, value = ProductRatings, name = "by-product")]
457
pub struct ProductReviewsByProduct;
458

            
459
impl CollectionViewSchema for ProductReviewsByProduct {
460
    type View = Self;
461

            
462
1084
    fn map(
463
1084
        &self,
464
1084
        document: CollectionDocument<<Self as View>::Collection>,
465
1084
    ) -> ViewMapResult<Self::View> {
466
1084
        document.header.emit_key_and_value(
467
1084
            document.contents.product_id,
468
1084
            ProductRatings {
469
1084
                total_score: document.contents.rating as u32,
470
1084
                ratings: 1,
471
1084
            },
472
1084
        )
473
1084
    }
474

            
475
11761
    fn reduce(
476
11761
        &self,
477
11761
        mappings: &[ViewMappedValue<Self::View>],
478
11761
        _rereduce: bool,
479
11761
    ) -> ReduceResult<Self::View> {
480
11761
        Ok(mappings
481
11761
            .iter()
482
11761
            .map(|mapping| mapping.value.clone())
483
11761
            .reduce(|a, b| ProductRatings {
484
304
                total_score: a.total_score + b.total_score,
485
304
                ratings: a.ratings + b.ratings,
486
11761
            })
487
11761
            .unwrap_or_default())
488
11761
    }
489
}
490

            
491
85816
#[derive(Debug, Serialize, Deserialize, Default, Clone)]
492
pub struct ProductRatings {
493
    pub total_score: u32,
494
    pub ratings: u32,
495
}
496

            
497
impl ProductRatings {
498
15644
    pub fn average(&self) -> Option<f32> {
499
15644
        if self.ratings > 0 {
500
4943
            Some(self.total_score as f32 / self.ratings as f32)
501
        } else {
502
10701
            None
503
        }
504
15644
    }
505
}
506

            
507
impl Collection for Category {
508
    type PrimaryKey = u32;
509

            
510
21496
    fn collection_name() -> CollectionName {
511
21496
        CollectionName::new("benchmarks", "categories")
512
21496
    }
513

            
514
21324
    fn define_views(_schema: &mut Schematic) -> Result<(), Error> {
515
21324
        Ok(())
516
21324
    }
517
}
518

            
519
impl DefaultSerialization for Category {}
520

            
521
impl Collection for Customer {
522
    type PrimaryKey = u32;
523

            
524
24292
    fn collection_name() -> CollectionName {
525
24292
        CollectionName::new("benchmarks", "customers")
526
24292
    }
527

            
528
21324
    fn define_views(_schema: &mut Schematic) -> Result<(), Error> {
529
21324
        Ok(())
530
21324
    }
531
}
532

            
533
impl DefaultSerialization for Customer {}
534

            
535
impl Collection for Order {
536
    type PrimaryKey = u32;
537

            
538
24164
    fn collection_name() -> CollectionName {
539
24164
        CollectionName::new("benchmarks", "orders")
540
24164
    }
541

            
542
21324
    fn define_views(_schema: &mut Schematic) -> Result<(), Error> {
543
21324
        Ok(())
544
21324
    }
545
}
546

            
547
impl DefaultSerialization for Order {}
548

            
549
impl Collection for Cart {
550
    type PrimaryKey = u32;
551

            
552
31608
    fn collection_name() -> CollectionName {
553
31608
        CollectionName::new("benchmarks", "carts")
554
31608
    }
555

            
556
21324
    fn define_views(_schema: &mut Schematic) -> Result<(), Error> {
557
21324
        Ok(())
558
21324
    }
559
}
560

            
561
impl DefaultSerialization for Cart {}