1
use std::path::Path;
2
use std::time::Duration;
3

            
4
use bonsaidb::client::url::Url;
5
use bonsaidb::client::AsyncClient;
6
use bonsaidb::core::async_trait::async_trait;
7
use bonsaidb::core::connection::{
8
    AccessPolicy, AsyncConnection, AsyncLowLevelConnection, AsyncStorageConnection,
9
};
10
use bonsaidb::core::document::{CollectionDocument, CollectionHeader, Emit};
11
use bonsaidb::core::schema::view::map::Mappings;
12
use bonsaidb::core::schema::{
13
    Collection, CollectionMapReduce, CollectionName, DefaultSerialization, InsertError,
14
    NamedCollection, Qualified, ReduceResult, Schema, Schematic, SerializedCollection, View,
15
    ViewMapResult, ViewMappedValue, ViewSchema,
16
};
17
use bonsaidb::core::transaction::Transaction;
18
use bonsaidb::core::{define_basic_unique_mapped_view, Error};
19
use bonsaidb::local::config::Builder;
20
#[cfg(feature = "compression")]
21
use bonsaidb::local::config::Compression;
22
use bonsaidb::server::{DefaultPermissions, Server, ServerConfiguration};
23
use bonsaidb::AnyDatabase;
24
use serde::{Deserialize, Serialize};
25

            
26
use crate::execute::{Backend, BackendOperator, Measurements, Metric, Operator};
27
use crate::model::{Cart, Category, Customer, Order, Product, ProductReview};
28
use crate::plan::{
29
    AddProductToCart, Checkout, CreateCart, FindProduct, Load, LookupProduct, OperationResult,
30
    ReviewProduct,
31
};
32

            
33
pub enum Bonsai {
34
    Local,
35
    LocalLz4,
36
    Quic,
37
    WebSockets,
38
}
39

            
40
impl Bonsai {
41
72
    pub fn label(&self) -> &'static str {
42
72
        match self {
43
18
            Self::Local => "bonsaidb-local",
44
18
            Self::LocalLz4 => "bonsaidb-local+lz4",
45
18
            Self::Quic => "bonsaidb-quic",
46
18
            Self::WebSockets => "bonsaidb-ws",
47
        }
48
72
    }
49
}
50

            
51
pub struct BonsaiBackend {
52
    server: Server,
53
    kind: Bonsai,
54
}
55

            
56
pub struct BonsaiOperator {
57
    label: &'static str,
58
    database: AnyDatabase,
59
}
60

            
61
29382
#[derive(Debug, Schema)]
62
#[schema(name = "commerce", authority = "benchmarks", collections = [Product, Category, Customer, Order, Cart, ProductReview])]
63
pub enum Commerce {}
64

            
65
#[async_trait]
66
impl Backend for BonsaiBackend {
67
    type Config = Bonsai;
68
    type Operator = BonsaiOperator;
69

            
70
72
    fn label(&self) -> &'static str {
71
72
        self.kind.label()
72
72
    }
73

            
74
    #[cfg_attr(not(feature = "compression"), allow(unused_mut))]
75
16
    async fn new(config: Self::Config) -> Self {
76
16
        let path = Path::new("commerce-benchmarks.bonsaidb");
77
16
        if path.exists() {
78
15
            std::fs::remove_dir_all(path).unwrap();
79
15
        }
80
16
        let mut server_config = ServerConfiguration::new(path)
81
16
            .default_permissions(DefaultPermissions::AllowAll)
82
16
            .with_schema::<Commerce>()
83
16
            .unwrap();
84

            
85
        #[cfg(feature = "compression")]
86
        {
87
16
            if matches!(config, Bonsai::LocalLz4) {
88
4
                server_config = server_config.default_compression(Compression::Lz4);
89
12
            }
90
        }
91

            
92
48
        let server = Server::open(server_config).await.unwrap();
93
160
        server.install_self_signed_certificate(false).await.unwrap();
94
16
        server
95
16
            .create_database::<Commerce>("commerce", false)
96
32
            .await
97
16
            .unwrap();
98
16

            
99
16
        match config {
100
4
            Bonsai::Quic => {
101
4
                let server = server.clone();
102
4
                tokio::spawn(async move {
103
30
                    server.listen_on(7022).await.unwrap();
104
4
                });
105
4
            }
106
4
            Bonsai::WebSockets => {
107
4
                let server = server.clone();
108
4
                tokio::spawn(async move {
109
4
                    server
110
4
                        .listen_for_websockets_on("0.0.0.0:7023", false)
111
11
                        .await
112
                        .unwrap();
113
4
                });
114
4
            }
115
8
            Bonsai::Local | Bonsai::LocalLz4 => {}
116
        }
117
        // Allow the server time to start listening
118
16
        tokio::time::sleep(Duration::from_millis(1000)).await;
119

            
120
16
        BonsaiBackend {
121
16
            server,
122
16
            kind: config,
123
16
        }
124
48
    }
125

            
126
72
    async fn new_operator_async(&self) -> Self::Operator {
127
72
        let database = match self.kind {
128
            Bonsai::Local | Bonsai::LocalLz4 => {
129
36
                AnyDatabase::Local(self.server.database::<Commerce>("commerce").await.unwrap())
130
            }
131

            
132
            Bonsai::Quic => {
133
18
                let client = AsyncClient::build(Url::parse("bonsaidb://localhost:7022").unwrap())
134
18
                    .with_certificate(
135
18
                        self.server
136
18
                            .certificate_chain()
137
36
                            .await
138
18
                            .unwrap()
139
18
                            .into_end_entity_certificate(),
140
18
                    )
141
18
                    .build()
142
18
                    .unwrap();
143
18
                AnyDatabase::Networked(client.database::<Commerce>("commerce").await.unwrap())
144
            }
145
            Bonsai::WebSockets => {
146
18
                let client = AsyncClient::build(Url::parse("ws://localhost:7023").unwrap())
147
18
                    .build()
148
18
                    .unwrap();
149
18
                AnyDatabase::Networked(client.database::<Commerce>("commerce").await.unwrap())
150
            }
151
        };
152
72
        BonsaiOperator {
153
72
            database,
154
72
            label: self.label(),
155
72
        }
156
216
    }
157
}
158

            
159
impl BackendOperator for BonsaiOperator {
160
    type Id = u32;
161
}
162

            
163
#[async_trait]
164
impl Operator<Load, u32> for BonsaiOperator {
165
16
    async fn operate(
166
16
        &mut self,
167
16
        operation: &Load,
168
16
        _results: &[OperationResult<u32>],
169
16
        measurements: &Measurements,
170
16
    ) -> OperationResult<u32> {
171
16
        let measurement = measurements.begin(self.label, Metric::Load);
172
16
        let mut tx = Transaction::default();
173
236
        for (id, category) in &operation.initial_data.categories {
174
236
            category.insert_in_transaction(id, &mut tx).unwrap();
175
236
        }
176
1748
        for (id, product) in &operation.initial_data.products {
177
1748
            product.insert_in_transaction(id, &mut tx).unwrap();
178
1748
        }
179
1708
        for (id, customer) in &operation.initial_data.customers {
180
1708
            customer.insert_in_transaction(id, &mut tx).unwrap();
181
1708
        }
182
4032
        for (id, order) in &operation.initial_data.orders {
183
4032
            order.insert_in_transaction(id, &mut tx).unwrap();
184
4032
        }
185
1948
        for review in &operation.initial_data.reviews {
186
1948
            review.push_in_transaction(&mut tx).unwrap();
187
1948
        }
188
16
        self.database.apply_transaction(tx).await.unwrap();
189
16
        measurement.finish();
190
16
        OperationResult::Ok
191
48
    }
192
}
193

            
194
#[async_trait]
195
impl Operator<FindProduct, u32> for BonsaiOperator {
196
9208
    async fn operate(
197
9208
        &mut self,
198
9208
        operation: &FindProduct,
199
9208
        _results: &[OperationResult<u32>],
200
9208
        measurements: &Measurements,
201
9208
    ) -> OperationResult<u32> {
202
9208
        let measurement = measurements.begin(self.label, Metric::FindProduct);
203
9208
        let doc = Product::load_async(&operation.name, &self.database)
204
18336
            .await
205
9208
            .unwrap()
206
9208
            .unwrap();
207
9208
        let rating = self
208
9208
            .database
209
9208
            .view::<ProductReviewsByProduct>()
210
9208
            .with_key(&doc.header.id)
211
9208
            .with_access_policy(AccessPolicy::NoUpdate)
212
9208
            .reduce()
213
9158
            .await
214
9208
            .unwrap();
215
9208
        measurement.finish();
216
9208
        OperationResult::Product {
217
9208
            id: doc.header.id,
218
9208
            product: doc.contents,
219
9208
            rating: rating.average(),
220
9208
        }
221
27624
    }
222
}
223

            
224
#[async_trait]
225
impl Operator<LookupProduct, u32> for BonsaiOperator {
226
8800
    async fn operate(
227
8800
        &mut self,
228
8800
        operation: &LookupProduct,
229
8800
        _results: &[OperationResult<u32>],
230
8800
        measurements: &Measurements,
231
8800
    ) -> OperationResult<u32> {
232
8800
        let measurement = measurements.begin(self.label, Metric::LookupProduct);
233
8800
        let doc = Product::get_async(&operation.id, &self.database)
234
8741
            .await
235
8800
            .unwrap()
236
8800
            .unwrap();
237
8800
        let rating = self
238
8800
            .database
239
8800
            .view::<ProductReviewsByProduct>()
240
8800
            .with_key(&doc.header.id)
241
8800
            .with_access_policy(AccessPolicy::NoUpdate)
242
8800
            .reduce()
243
8749
            .await
244
8800
            .unwrap();
245
8800
        measurement.finish();
246
8800
        OperationResult::Product {
247
8800
            id: doc.header.id,
248
8800
            product: doc.contents,
249
8800
            rating: rating.average(),
250
8800
        }
251
26400
    }
252
}
253

            
254
#[async_trait]
255
impl Operator<CreateCart, u32> for BonsaiOperator {
256
1840
    async fn operate(
257
1840
        &mut self,
258
1840
        _operation: &CreateCart,
259
1840
        _results: &[OperationResult<u32>],
260
1840
        measurements: &Measurements,
261
1840
    ) -> OperationResult<u32> {
262
1840
        let measurement = measurements.begin(self.label, Metric::CreateCart);
263
1840
        let cart = Cart::default()
264
1840
            .push_into_async(&self.database)
265
1840
            .await
266
1840
            .unwrap();
267
1840
        measurement.finish();
268
1840
        OperationResult::Cart { id: cart.header.id }
269
5520
    }
270
}
271

            
272
#[async_trait]
273
impl Operator<AddProductToCart, u32> for BonsaiOperator {
274
4592
    async fn operate(
275
4592
        &mut self,
276
4592
        operation: &AddProductToCart,
277
4592
        results: &[OperationResult<u32>],
278
4592
        measurements: &Measurements,
279
4592
    ) -> OperationResult<u32> {
280
4592
        let cart = match &results[operation.cart.0] {
281
4592
            OperationResult::Cart { id } => *id,
282
            _ => unreachable!("Invalid operation result"),
283
        };
284
4592
        let product = match &results[operation.product.0] {
285
4592
            OperationResult::Product { id, .. } => *id,
286
            _ => unreachable!("Invalid operation result"),
287
        };
288

            
289
4592
        let measurement = measurements.begin(self.label, Metric::AddProductToCart);
290
4592
        let mut cart = Cart::get_async(&cart, &self.database)
291
4557
            .await
292
4592
            .unwrap()
293
4592
            .unwrap();
294
4592
        cart.contents.product_ids.push(product);
295
4592
        cart.update_async(&self.database).await.unwrap();
296
4592
        measurement.finish();
297
4592

            
298
4592
        OperationResult::CartProduct { id: product }
299
13776
    }
300
}
301

            
302
#[async_trait]
303
impl Operator<Checkout, u32> for BonsaiOperator {
304
516
    async fn operate(
305
516
        &mut self,
306
516
        operation: &Checkout,
307
516
        results: &[OperationResult<u32>],
308
516
        measurements: &Measurements,
309
516
    ) -> OperationResult<u32> {
310
516
        let cart = match &results[operation.cart.0] {
311
516
            OperationResult::Cart { id } => *id,
312
            _ => unreachable!("Invalid operation result"),
313
        };
314

            
315
516
        let measurement = measurements.begin(self.label, Metric::Checkout);
316
516
        let cart = Cart::get_async(&cart, &self.database)
317
511
            .await
318
516
            .unwrap()
319
516
            .unwrap();
320
516
        cart.delete_async(&self.database).await.unwrap();
321
516
        Order {
322
516
            customer_id: operation.customer_id,
323
516
            product_ids: cart.contents.product_ids,
324
516
        }
325
516
        .push_into_async(&self.database)
326
516
        .await
327
516
        .unwrap();
328
516
        measurement.finish();
329
516

            
330
516
        OperationResult::Ok
331
1548
    }
332
}
333

            
334
#[async_trait]
335
impl Operator<ReviewProduct, u32> for BonsaiOperator {
336
372
    async fn operate(
337
372
        &mut self,
338
372
        operation: &ReviewProduct,
339
372
        results: &[OperationResult<u32>],
340
372
        measurements: &Measurements,
341
372
    ) -> OperationResult<u32> {
342
372
        let product_id = match &results[operation.product_id.0] {
343
            OperationResult::Product { id, .. } => *id,
344
372
            OperationResult::CartProduct { id, .. } => *id,
345
            other => unreachable!("Invalid operation result {:?}", other),
346
        };
347

            
348
372
        let measurement = measurements.begin(self.label, Metric::RateProduct);
349
372
        let review = ProductReview {
350
372
            customer_id: operation.customer_id,
351
372
            product_id,
352
372
            review: operation.review.clone(),
353
372
            rating: operation.rating,
354
372
        };
355
372
        // https://github.com/khonsulabs/bonsaidb/issues/189
356
372
        match review.push_into_async(&self.database).await {
357
372
            Ok(_) => {}
358
            Err(InsertError {
359
                error:
360
                    bonsaidb::core::Error::UniqueKeyViolation {
361
                        existing_document, ..
362
                    },
363
                contents,
364
            }) => {
365
                CollectionDocument::<ProductReview> {
366
                    header: CollectionHeader::try_from(*existing_document).unwrap(),
367
                    contents,
368
                }
369
                .update_async(&self.database)
370
                .await
371
                .unwrap();
372
            }
373
            other => {
374
                other.unwrap();
375
            }
376
        }
377
        // Force the view to update.
378
372
        self.database
379
372
            .view::<ProductReviewsByProduct>()
380
372
            .with_key(&0)
381
372
            .reduce()
382
372
            .await
383
372
            .unwrap();
384
372
        measurement.finish();
385
372

            
386
372
        OperationResult::Ok
387
1116
    }
388
}
389

            
390
impl Collection for Product {
391
    type PrimaryKey = u32;
392

            
393
259126
    fn collection_name() -> CollectionName {
394
259126
        CollectionName::new("benchmarks", "products")
395
259126
    }
396

            
397
29382
    fn define_views(schema: &mut Schematic) -> Result<(), Error> {
398
29382
        schema.define_view(ProductsByName)?;
399
29382
        schema.define_view(ProductsByCategoryId)?;
400
29382
        Ok(())
401
29382
    }
402
}
403

            
404
impl DefaultSerialization for Product {}
405

            
406
define_basic_unique_mapped_view!(
407
    ProductsByName,
408
    Product,
409
    1,
410
    "by-name",
411
    String,
412
    (),
413
1748
    |document: CollectionDocument<Product>| { document.header.emit_key(document.contents.name) },
414
);
415

            
416
31146
#[derive(Debug, Clone, View, ViewSchema)]
417
#[view(collection = Product, key = u32, value = u32, name = "by-category")]
418
pub struct ProductsByCategoryId;
419

            
420
impl CollectionMapReduce for ProductsByCategoryId {
421
    fn map<'doc>(
422
        &self,
423
        document: CollectionDocument<<Self::View as View>::Collection>,
424
    ) -> ViewMapResult<'doc, Self> {
425
        let mut mappings = Mappings::default();
426
        for &id in &document.contents.category_ids {
427
            mappings = mappings.and(document.header.emit_key_and_value(id, 1)?);
428
        }
429
        Ok(mappings)
430
    }
431
}
432

            
433
impl NamedCollection for Product {
434
    type ByNameView = ProductsByName;
435
}
436

            
437
impl Collection for ProductReview {
438
    type PrimaryKey = u32;
439

            
440
205011
    fn collection_name() -> CollectionName {
441
205011
        CollectionName::new("benchmarks", "reviews")
442
205011
    }
443

            
444
29382
    fn define_views(schema: &mut Schematic) -> Result<(), Error> {
445
29382
        schema.define_view(ProductReviewsByProduct)?;
446
29382
        Ok(())
447
29382
    }
448
}
449

            
450
impl DefaultSerialization for ProductReview {}
451

            
452
106354
#[derive(Debug, Clone, View, ViewSchema)]
453
#[view(collection = ProductReview, key = u32, value = ProductRatings, name = "by-product")]
454
pub struct ProductReviewsByProduct;
455

            
456
impl CollectionMapReduce for ProductReviewsByProduct {
457
2320
    fn map<'doc>(
458
2320
        &self,
459
2320
        document: CollectionDocument<<Self as View>::Collection>,
460
2320
    ) -> ViewMapResult<'doc, Self> {
461
2320
        document.header.emit_key_and_value(
462
2320
            document.contents.product_id,
463
2320
            ProductRatings {
464
2320
                total_score: document.contents.rating as u32,
465
2320
                ratings: 1,
466
2320
            },
467
2320
        )
468
2320
    }
469

            
470
6575
    fn reduce(
471
6575
        &self,
472
6575
        mappings: &[ViewMappedValue<Self::View>],
473
6575
        _rereduce: bool,
474
6575
    ) -> ReduceResult<Self::View> {
475
6575
        Ok(mappings
476
6575
            .iter()
477
6575
            .map(|mapping| mapping.value.clone())
478
6575
            .reduce(|a, b| ProductRatings {
479
1492
                total_score: a.total_score + b.total_score,
480
1492
                ratings: a.ratings + b.ratings,
481
6575
            })
482
6575
            .unwrap_or_default())
483
6575
    }
484
}
485

            
486
106160
#[derive(Debug, Serialize, Deserialize, Default, Clone)]
487
pub struct ProductRatings {
488
    pub total_score: u32,
489
    pub ratings: u32,
490
}
491

            
492
impl ProductRatings {
493
18008
    pub fn average(&self) -> Option<f32> {
494
18008
        if self.ratings > 0 {
495
12985
            Some(self.total_score as f32 / self.ratings as f32)
496
        } else {
497
5023
            None
498
        }
499
18008
    }
500
}
501

            
502
impl Collection for Category {
503
    type PrimaryKey = u32;
504

            
505
29618
    fn collection_name() -> CollectionName {
506
29618
        CollectionName::new("benchmarks", "categories")
507
29618
    }
508

            
509
29382
    fn define_views(_schema: &mut Schematic) -> Result<(), Error> {
510
29382
        Ok(())
511
29382
    }
512
}
513

            
514
impl DefaultSerialization for Category {}
515

            
516
impl Collection for Customer {
517
    type PrimaryKey = u32;
518

            
519
31090
    fn collection_name() -> CollectionName {
520
31090
        CollectionName::new("benchmarks", "customers")
521
31090
    }
522

            
523
29382
    fn define_views(_schema: &mut Schematic) -> Result<(), Error> {
524
29382
        Ok(())
525
29382
    }
526
}
527

            
528
impl DefaultSerialization for Customer {}
529

            
530
impl Collection for Order {
531
    type PrimaryKey = u32;
532

            
533
33930
    fn collection_name() -> CollectionName {
534
33930
        CollectionName::new("benchmarks", "orders")
535
33930
    }
536

            
537
29382
    fn define_views(_schema: &mut Schematic) -> Result<(), Error> {
538
29382
        Ok(())
539
29382
    }
540
}
541

            
542
impl DefaultSerialization for Order {}
543

            
544
impl Collection for Cart {
545
    type PrimaryKey = u32;
546

            
547
41438
    fn collection_name() -> CollectionName {
548
41438
        CollectionName::new("benchmarks", "carts")
549
41438
    }
550

            
551
29382
    fn define_views(_schema: &mut Schematic) -> Result<(), Error> {
552
29382
        Ok(())
553
29382
    }
554
}
555

            
556
impl DefaultSerialization for Cart {}