1
use std::path::Path;
2
use std::time::Duration;
3

            
4
use bonsaidb::client::url::Url;
5
use bonsaidb::client::AsyncClient;
6
use bonsaidb::core::async_trait::async_trait;
7
use bonsaidb::core::connection::{
8
    AccessPolicy, AsyncConnection, AsyncLowLevelConnection, AsyncStorageConnection,
9
};
10
use bonsaidb::core::document::{CollectionDocument, CollectionHeader, Emit};
11
use bonsaidb::core::schema::view::map::Mappings;
12
use bonsaidb::core::schema::{
13
    Collection, CollectionMapReduce, CollectionName, DefaultSerialization, InsertError,
14
    NamedCollection, Qualified, ReduceResult, Schema, Schematic, SerializedCollection, View,
15
    ViewMapResult, ViewMappedValue, ViewSchema,
16
};
17
use bonsaidb::core::transaction::Transaction;
18
use bonsaidb::core::{define_basic_unique_mapped_view, Error};
19
use bonsaidb::local::config::Builder;
20
#[cfg(feature = "compression")]
21
use bonsaidb::local::config::Compression;
22
use bonsaidb::server::{DefaultPermissions, Server, ServerConfiguration};
23
use bonsaidb::AnyDatabase;
24
use serde::{Deserialize, Serialize};
25

            
26
use crate::execute::{Backend, BackendOperator, Measurements, Metric, Operator};
27
use crate::model::{Cart, Category, Customer, Order, Product, ProductReview};
28
use crate::plan::{
29
    AddProductToCart, Checkout, CreateCart, FindProduct, Load, LookupProduct, OperationResult,
30
    ReviewProduct,
31
};
32

            
33
pub enum Bonsai {
34
    Local,
35
    LocalLz4,
36
    Quic,
37
    WebSockets,
38
}
39

            
40
impl Bonsai {
41
48
    pub fn label(&self) -> &'static str {
42
48
        match self {
43
12
            Self::Local => "bonsaidb-local",
44
12
            Self::LocalLz4 => "bonsaidb-local+lz4",
45
12
            Self::Quic => "bonsaidb-quic",
46
12
            Self::WebSockets => "bonsaidb-ws",
47
        }
48
48
    }
49
}
50

            
51
pub struct BonsaiBackend {
52
    server: Server,
53
    kind: Bonsai,
54
}
55

            
56
pub struct BonsaiOperator {
57
    label: &'static str,
58
    database: AnyDatabase,
59
}
60

            
61
30262
#[derive(Debug, Schema)]
62
#[schema(name = "commerce", authority = "benchmarks", collections = [Product, Category, Customer, Order, Cart, ProductReview])]
63
pub enum Commerce {}
64

            
65
#[async_trait]
66
impl Backend for BonsaiBackend {
67
    type Config = Bonsai;
68
    type Operator = BonsaiOperator;
69

            
70
48
    fn label(&self) -> &'static str {
71
48
        self.kind.label()
72
48
    }
73

            
74
    #[cfg_attr(not(feature = "compression"), allow(unused_mut))]
75
16
    async fn new(config: Self::Config) -> Self {
76
16
        let path = Path::new("commerce-benchmarks.bonsaidb");
77
16
        if path.exists() {
78
15
            std::fs::remove_dir_all(path).unwrap();
79
15
        }
80
16
        let mut server_config = ServerConfiguration::new(path)
81
16
            .default_permissions(DefaultPermissions::AllowAll)
82
16
            .with_schema::<Commerce>()
83
16
            .unwrap();
84

            
85
        #[cfg(feature = "compression")]
86
        {
87
16
            if matches!(config, Bonsai::LocalLz4) {
88
4
                server_config = server_config.default_compression(Compression::Lz4);
89
12
            }
90
        }
91

            
92
48
        let server = Server::open(server_config).await.unwrap();
93
160
        server.install_self_signed_certificate(false).await.unwrap();
94
16
        server
95
16
            .create_database::<Commerce>("commerce", false)
96
32
            .await
97
16
            .unwrap();
98
16

            
99
16
        match config {
100
4
            Bonsai::Quic => {
101
4
                let server = server.clone();
102
4
                tokio::spawn(async move {
103
31
                    server.listen_on(7022).await.unwrap();
104
4
                });
105
4
            }
106
4
            Bonsai::WebSockets => {
107
4
                let server = server.clone();
108
4
                tokio::spawn(async move {
109
4
                    server
110
4
                        .listen_for_websockets_on("0.0.0.0:7023", false)
111
10
                        .await
112
                        .unwrap();
113
4
                });
114
4
            }
115
8
            Bonsai::Local | Bonsai::LocalLz4 => {}
116
        }
117
        // Allow the server time to start listening
118
16
        tokio::time::sleep(Duration::from_millis(1000)).await;
119

            
120
16
        BonsaiBackend {
121
16
            server,
122
16
            kind: config,
123
16
        }
124
32
    }
125

            
126
48
    async fn new_operator_async(&self) -> Self::Operator {
127
48
        let database = match self.kind {
128
            Bonsai::Local | Bonsai::LocalLz4 => {
129
24
                AnyDatabase::Local(self.server.database::<Commerce>("commerce").await.unwrap())
130
            }
131

            
132
            Bonsai::Quic => {
133
12
                let client = AsyncClient::build(Url::parse("bonsaidb://localhost:7022").unwrap())
134
12
                    .with_certificate(
135
12
                        self.server
136
12
                            .certificate_chain()
137
18
                            .await
138
12
                            .unwrap()
139
12
                            .into_end_entity_certificate(),
140
12
                    )
141
12
                    .build()
142
12
                    .unwrap();
143
12
                AnyDatabase::Networked(client.database::<Commerce>("commerce").await.unwrap())
144
            }
145
            Bonsai::WebSockets => {
146
12
                let client = AsyncClient::build(Url::parse("ws://localhost:7023").unwrap())
147
12
                    .build()
148
12
                    .unwrap();
149
12
                AnyDatabase::Networked(client.database::<Commerce>("commerce").await.unwrap())
150
            }
151
        };
152
48
        BonsaiOperator {
153
48
            database,
154
48
            label: self.label(),
155
48
        }
156
96
    }
157
}
158

            
159
impl BackendOperator for BonsaiOperator {
160
    type Id = u32;
161
}
162

            
163
#[async_trait]
164
impl Operator<Load, u32> for BonsaiOperator {
165
16
    async fn operate(
166
16
        &mut self,
167
16
        operation: &Load,
168
16
        _results: &[OperationResult<u32>],
169
16
        measurements: &Measurements,
170
16
    ) -> OperationResult<u32> {
171
16
        let measurement = measurements.begin(self.label, Metric::Load);
172
16
        let mut tx = Transaction::default();
173
240
        for (id, category) in &operation.initial_data.categories {
174
240
            category.insert_in_transaction(id, &mut tx).unwrap();
175
240
        }
176
1480
        for (id, product) in &operation.initial_data.products {
177
1480
            product.insert_in_transaction(id, &mut tx).unwrap();
178
1480
        }
179
2228
        for (id, customer) in &operation.initial_data.customers {
180
2228
            customer.insert_in_transaction(id, &mut tx).unwrap();
181
2228
        }
182
5052
        for (id, order) in &operation.initial_data.orders {
183
5052
            order.insert_in_transaction(id, &mut tx).unwrap();
184
5052
        }
185
1784
        for review in &operation.initial_data.reviews {
186
1784
            review.push_in_transaction(&mut tx).unwrap();
187
1784
        }
188
16
        self.database.apply_transaction(tx).await.unwrap();
189
16
        measurement.finish();
190
16
        OperationResult::Ok
191
32
    }
192
}
193

            
194
#[async_trait]
195
impl Operator<FindProduct, u32> for BonsaiOperator {
196
9452
    async fn operate(
197
9452
        &mut self,
198
9452
        operation: &FindProduct,
199
9452
        _results: &[OperationResult<u32>],
200
9452
        measurements: &Measurements,
201
9452
    ) -> OperationResult<u32> {
202
9452
        let measurement = measurements.begin(self.label, Metric::FindProduct);
203
9452
        let doc = Product::load_async(&operation.name, &self.database)
204
17353
            .await
205
9452
            .unwrap()
206
9452
            .unwrap();
207
9452
        let rating = self
208
9452
            .database
209
9452
            .view::<ProductReviewsByProduct>()
210
9452
            .with_key(&doc.header.id)
211
9452
            .with_access_policy(AccessPolicy::NoUpdate)
212
9452
            .reduce()
213
8645
            .await
214
9452
            .unwrap();
215
9452
        measurement.finish();
216
9452
        OperationResult::Product {
217
9452
            id: doc.header.id,
218
9452
            product: doc.contents,
219
9452
            rating: rating.average(),
220
9452
        }
221
18904
    }
222
}
223

            
224
#[async_trait]
225
impl Operator<LookupProduct, u32> for BonsaiOperator {
226
9200
    async fn operate(
227
9200
        &mut self,
228
9200
        operation: &LookupProduct,
229
9200
        _results: &[OperationResult<u32>],
230
9200
        measurements: &Measurements,
231
9200
    ) -> OperationResult<u32> {
232
9200
        let measurement = measurements.begin(self.label, Metric::LookupProduct);
233
9200
        let doc = Product::get_async(&operation.id, &self.database)
234
8306
            .await
235
9200
            .unwrap()
236
9200
            .unwrap();
237
9200
        let rating = self
238
9200
            .database
239
9200
            .view::<ProductReviewsByProduct>()
240
9200
            .with_key(&doc.header.id)
241
9200
            .with_access_policy(AccessPolicy::NoUpdate)
242
9200
            .reduce()
243
8442
            .await
244
9200
            .unwrap();
245
9200
        measurement.finish();
246
9200
        OperationResult::Product {
247
9200
            id: doc.header.id,
248
9200
            product: doc.contents,
249
9200
            rating: rating.average(),
250
9200
        }
251
18400
    }
252
}
253

            
254
#[async_trait]
255
impl Operator<CreateCart, u32> for BonsaiOperator {
256
1844
    async fn operate(
257
1844
        &mut self,
258
1844
        _operation: &CreateCart,
259
1844
        _results: &[OperationResult<u32>],
260
1844
        measurements: &Measurements,
261
1844
    ) -> OperationResult<u32> {
262
1844
        let measurement = measurements.begin(self.label, Metric::CreateCart);
263
1844
        let cart = Cart::default()
264
1844
            .push_into_async(&self.database)
265
1842
            .await
266
1844
            .unwrap();
267
1844
        measurement.finish();
268
1844
        OperationResult::Cart { id: cart.header.id }
269
3688
    }
270
}
271

            
272
#[async_trait]
273
impl Operator<AddProductToCart, u32> for BonsaiOperator {
274
4752
    async fn operate(
275
4752
        &mut self,
276
4752
        operation: &AddProductToCart,
277
4752
        results: &[OperationResult<u32>],
278
4752
        measurements: &Measurements,
279
4752
    ) -> OperationResult<u32> {
280
4752
        let cart = match &results[operation.cart.0] {
281
4752
            OperationResult::Cart { id } => *id,
282
            _ => unreachable!("Invalid operation result"),
283
        };
284
4752
        let product = match &results[operation.product.0] {
285
4752
            OperationResult::Product { id, .. } => *id,
286
            _ => unreachable!("Invalid operation result"),
287
        };
288

            
289
4752
        let measurement = measurements.begin(self.label, Metric::AddProductToCart);
290
4752
        let mut cart = Cart::get_async(&cart, &self.database)
291
4256
            .await
292
4752
            .unwrap()
293
4752
            .unwrap();
294
4752
        cart.contents.product_ids.push(product);
295
4752
        cart.update_async(&self.database).await.unwrap();
296
4752
        measurement.finish();
297
4752

            
298
4752
        OperationResult::CartProduct { id: product }
299
9504
    }
300
}
301

            
302
#[async_trait]
303
impl Operator<Checkout, u32> for BonsaiOperator {
304
484
    async fn operate(
305
484
        &mut self,
306
484
        operation: &Checkout,
307
484
        results: &[OperationResult<u32>],
308
484
        measurements: &Measurements,
309
484
    ) -> OperationResult<u32> {
310
484
        let cart = match &results[operation.cart.0] {
311
484
            OperationResult::Cart { id } => *id,
312
            _ => unreachable!("Invalid operation result"),
313
        };
314

            
315
484
        let measurement = measurements.begin(self.label, Metric::Checkout);
316
484
        let cart = Cart::get_async(&cart, &self.database)
317
410
            .await
318
484
            .unwrap()
319
484
            .unwrap();
320
484
        cart.delete_async(&self.database).await.unwrap();
321
484
        Order {
322
484
            customer_id: operation.customer_id,
323
484
            product_ids: cart.contents.product_ids,
324
484
        }
325
484
        .push_into_async(&self.database)
326
483
        .await
327
484
        .unwrap();
328
484
        measurement.finish();
329
484

            
330
484
        OperationResult::Ok
331
968
    }
332
}
333

            
334
#[async_trait]
335
impl Operator<ReviewProduct, u32> for BonsaiOperator {
336
396
    async fn operate(
337
396
        &mut self,
338
396
        operation: &ReviewProduct,
339
396
        results: &[OperationResult<u32>],
340
396
        measurements: &Measurements,
341
396
    ) -> OperationResult<u32> {
342
396
        let product_id = match &results[operation.product_id.0] {
343
            OperationResult::Product { id, .. } => *id,
344
396
            OperationResult::CartProduct { id, .. } => *id,
345
            other => unreachable!("Invalid operation result {:?}", other),
346
        };
347

            
348
396
        let measurement = measurements.begin(self.label, Metric::RateProduct);
349
396
        let review = ProductReview {
350
396
            customer_id: operation.customer_id,
351
396
            product_id,
352
396
            review: operation.review.clone(),
353
396
            rating: operation.rating,
354
396
        };
355
396
        // https://github.com/khonsulabs/bonsaidb/issues/189
356
396
        match review.push_into_async(&self.database).await {
357
396
            Ok(_) => {}
358
            Err(InsertError {
359
                error:
360
                    bonsaidb::core::Error::UniqueKeyViolation {
361
                        existing_document, ..
362
                    },
363
                contents,
364
            }) => {
365
                CollectionDocument::<ProductReview> {
366
                    header: CollectionHeader::try_from(*existing_document).unwrap(),
367
                    contents,
368
                }
369
                .update_async(&self.database)
370
                .await
371
                .unwrap();
372
            }
373
            other => {
374
                other.unwrap();
375
            }
376
        }
377
        // Force the view to update.
378
396
        self.database
379
396
            .view::<ProductReviewsByProduct>()
380
396
            .with_key(&0)
381
396
            .reduce()
382
396
            .await
383
396
            .unwrap();
384
396
        measurement.finish();
385
396

            
386
396
        OperationResult::Ok
387
792
    }
388
}
389

            
390
impl Collection for Product {
391
    type PrimaryKey = u32;
392

            
393
265276
    fn collection_name() -> CollectionName {
394
265276
        CollectionName::new("benchmarks", "products")
395
265276
    }
396

            
397
    fn define_views(schema: &mut Schematic) -> Result<(), Error> {
398
30262
        schema.define_view(ProductsByName)?;
399
30262
        schema.define_view(ProductsByCategoryId)?;
400
30262
        Ok(())
401
30262
    }
402
}
403

            
404
impl DefaultSerialization for Product {}
405

            
406
define_basic_unique_mapped_view!(
407
    ProductsByName,
408
    Product,
409
    1,
410
    "by-name",
411
    String,
412
    (),
413
1480
    |document: CollectionDocument<Product>| { document.header.emit_key(document.contents.name) },
414
);
415

            
416
31758
#[derive(Debug, Clone, View, ViewSchema)]
417
#[view(collection = Product, key = u32, value = u32, name = "by-category")]
418
pub struct ProductsByCategoryId;
419

            
420
impl CollectionMapReduce for ProductsByCategoryId {
421
    fn map<'doc>(
422
        &self,
423
        document: CollectionDocument<<Self::View as View>::Collection>,
424
    ) -> ViewMapResult<'doc, Self> {
425
        let mut mappings = Mappings::default();
426
        for &id in &document.contents.category_ids {
427
            mappings = mappings.and(document.header.emit_key_and_value(id, 1)?);
428
        }
429
        Ok(mappings)
430
    }
431
}
432

            
433
impl NamedCollection for Product {
434
    type ByNameView = ProductsByName;
435
}
436

            
437
impl Collection for ProductReview {
438
    type PrimaryKey = u32;
439

            
440
211464
    fn collection_name() -> CollectionName {
441
211464
        CollectionName::new("benchmarks", "reviews")
442
211464
    }
443

            
444
    fn define_views(schema: &mut Schematic) -> Result<(), Error> {
445
30262
        schema.define_view(ProductReviewsByProduct)?;
446
30262
        Ok(())
447
30262
    }
448
}
449

            
450
impl DefaultSerialization for ProductReview {}
451

            
452
109838
#[derive(Debug, Clone, View, ViewSchema)]
453
#[view(collection = ProductReview, key = u32, value = ProductRatings, name = "by-product")]
454
pub struct ProductReviewsByProduct;
455

            
456
impl CollectionMapReduce for ProductReviewsByProduct {
457
2180
    fn map<'doc>(
458
2180
        &self,
459
2180
        document: CollectionDocument<<Self as View>::Collection>,
460
2180
    ) -> ViewMapResult<'doc, Self> {
461
2180
        document.header.emit_key_and_value(
462
2180
            document.contents.product_id,
463
2180
            ProductRatings {
464
2180
                total_score: document.contents.rating as u32,
465
2180
                ratings: 1,
466
2180
            },
467
2180
        )
468
2180
    }
469

            
470
5694
    fn reduce(
471
5694
        &self,
472
5694
        mappings: &[ViewMappedValue<Self::View>],
473
5694
        _rereduce: bool,
474
5694
    ) -> ReduceResult<Self::View> {
475
5694
        Ok(mappings
476
5694
            .iter()
477
5694
            .map(|mapping| mapping.value.clone())
478
5694
            .reduce(|a, b| ProductRatings {
479
2176
                total_score: a.total_score + b.total_score,
480
2176
                ratings: a.ratings + b.ratings,
481
5694
            })
482
5694
            .unwrap_or_default())
483
5694
    }
484
}
485

            
486
111900
#[derive(Debug, Serialize, Deserialize, Default, Clone)]
487
pub struct ProductRatings {
488
    pub total_score: u32,
489
    pub ratings: u32,
490
}
491

            
492
impl ProductRatings {
493
18652
    pub fn average(&self) -> Option<f32> {
494
18652
        if self.ratings > 0 {
495
14306
            Some(self.total_score as f32 / self.ratings as f32)
496
        } else {
497
4346
            None
498
        }
499
18652
    }
500
}
501

            
502
impl Collection for Category {
503
    type PrimaryKey = u32;
504

            
505
30502
    fn collection_name() -> CollectionName {
506
30502
        CollectionName::new("benchmarks", "categories")
507
30502
    }
508

            
509
30262
    fn define_views(_schema: &mut Schematic) -> Result<(), Error> {
510
30262
        Ok(())
511
30262
    }
512
}
513

            
514
impl DefaultSerialization for Category {}
515

            
516
impl Collection for Customer {
517
    type PrimaryKey = u32;
518

            
519
32490
    fn collection_name() -> CollectionName {
520
32490
        CollectionName::new("benchmarks", "customers")
521
32490
    }
522

            
523
30262
    fn define_views(_schema: &mut Schematic) -> Result<(), Error> {
524
30262
        Ok(())
525
30262
    }
526
}
527

            
528
impl DefaultSerialization for Customer {}
529

            
530
impl Collection for Order {
531
    type PrimaryKey = u32;
532

            
533
35798
    fn collection_name() -> CollectionName {
534
35798
        CollectionName::new("benchmarks", "orders")
535
35798
    }
536

            
537
30262
    fn define_views(_schema: &mut Schematic) -> Result<(), Error> {
538
30262
        Ok(())
539
30262
    }
540
}
541

            
542
impl DefaultSerialization for Order {}
543

            
544
impl Collection for Cart {
545
    type PrimaryKey = u32;
546

            
547
42578
    fn collection_name() -> CollectionName {
548
42578
        CollectionName::new("benchmarks", "carts")
549
42578
    }
550

            
551
30262
    fn define_views(_schema: &mut Schematic) -> Result<(), Error> {
552
30262
        Ok(())
553
30262
    }
554
}
555

            
556
impl DefaultSerialization for Cart {}