1
use bonsaidb::core::actionable::async_trait;
2
use futures::StreamExt;
3
use sqlx::{postgres::PgArguments, Arguments, Connection, Executor, PgPool, Row, Statement};
4

            
5
use crate::{
6
    execute::{Backend, BackendOperator, Measurements, Metric, Operator},
7
    model::Product,
8
    plan::{
9
        AddProductToCart, Checkout, CreateCart, FindProduct, Load, LookupProduct, OperationResult,
10
        ReviewProduct,
11
    },
12
};
13

            
14
pub struct Postgres {
15
    pool: PgPool,
16
}
17

            
18
#[async_trait]
19
impl Backend for Postgres {
20
    type Operator = PostgresOperator;
21
    type Config = String;
22

            
23
    fn label(&self) -> &'static str {
24
        "postgresql"
25
    }
26

            
27
4
    async fn new(url: Self::Config) -> Self {
28
24
        let pool = PgPool::connect(&url).await.unwrap();
29

            
30
4
        let mut conn = pool.acquire().await.unwrap();
31
12
        conn.execute(r#"DROP SCHEMA IF EXISTS commerce_bench CASCADE"#)
32
12
            .await
33
4
            .unwrap();
34
8
        conn.execute(r#"CREATE SCHEMA commerce_bench"#)
35
8
            .await
36
4
            .unwrap();
37
8
        conn.execute("SET search_path='commerce_bench';")
38
8
            .await
39
4
            .unwrap();
40
4
        conn.execute(
41
4
            r#"CREATE TABLE customers (
42
4
            id SERIAL PRIMARY KEY,
43
4
            name TEXT, 
44
4
            email TEXT, 
45
4
            address TEXT, 
46
4
            city TEXT, 
47
4
            region TEXT, 
48
4
            country TEXT, 
49
4
            postal_code TEXT, 
50
4
            phone TEXT
51
4
        )"#,
52
8
        )
53
8
        .await
54
4
        .unwrap();
55
4
        conn.execute(
56
4
            r#"CREATE TABLE products (
57
4
                    id SERIAL PRIMARY KEY,
58
4
                    name TEXT
59
4
                )"#,
60
8
        )
61
8
        .await
62
4
        .unwrap();
63
8
        conn.execute(r#"CREATE INDEX products_by_name ON products(name)"#)
64
8
            .await
65
4
            .unwrap();
66
4
        conn.execute(
67
4
            r#"CREATE TABLE product_reviews (
68
4
                product_id INTEGER NOT NULL,-- REFERENCES products(id),
69
4
                customer_id INTEGER NOT NULL,-- REFERENCES customers(id),
70
4
                rating INTEGER NOT NULL,
71
4
                review TEXT
72
4
            )"#,
73
8
        )
74
8
        .await
75
4
        .unwrap();
76
8
        conn.execute(r#"CREATE INDEX product_reviews_by_product ON product_reviews(product_id)"#)
77
8
            .await
78
4
            .unwrap();
79
8
        conn.execute(r#"CREATE UNIQUE INDEX product_reviews_by_customer ON product_reviews(customer_id, product_id)"#)
80
8
            .await
81
4
            .unwrap();
82
4
        conn.execute(
83
4
            r#"CREATE MATERIALIZED VIEW 
84
4
                    product_ratings 
85
4
                AS 
86
4
                    SELECT 
87
4
                        product_id,
88
4
                        sum(rating)::int as total_rating, 
89
4
                        count(rating)::int as ratings
90
4
                    FROM
91
4
                        product_reviews
92
4
                    GROUP BY product_id
93
4
            "#,
94
8
        )
95
8
        .await
96
4
        .unwrap();
97
4
        conn.execute(
98
4
            r#"CREATE TABLE categories (
99
4
                    id SERIAL PRIMARY KEY,
100
4
                    name TEXT
101
4
                )"#,
102
8
        )
103
8
        .await
104
4
        .unwrap();
105
4
        conn.execute(
106
4
            r#"CREATE TABLE product_categories (
107
4
                    product_id INTEGER,-- REFERENCES products(id),
108
4
                    category_id INTEGER-- REFERENCES categories(id)
109
4
                )"#,
110
8
        )
111
8
        .await
112
4
        .unwrap();
113
4
        conn.execute(
114
4
            r#"CREATE INDEX product_categories_by_product ON product_categories(product_id)"#,
115
8
        )
116
8
        .await
117
4
        .unwrap();
118
4
        conn.execute(
119
4
            r#"CREATE INDEX product_categories_by_category ON product_categories(category_id)"#,
120
8
        )
121
8
        .await
122
4
        .unwrap();
123
4
        conn.execute(
124
4
            r#"CREATE TABLE orders (
125
4
                    id SERIAL PRIMARY KEY,
126
4
                    customer_id INTEGER -- REFERENCES customers(id)
127
4
                )"#,
128
8
        )
129
8
        .await
130
4
        .unwrap();
131
4
        conn.execute(
132
4
            r#"CREATE TABLE order_products (
133
4
                    order_id INTEGER NOT NULL,-- REFERENCES orders(id),
134
4
                    product_id INTEGER NOT NULL -- REFERENCES products(id)
135
4
                )"#,
136
8
        )
137
8
        .await
138
4
        .unwrap();
139
4
        conn.execute(
140
4
            r#"CREATE TABLE carts (
141
4
            id SERIAL PRIMARY KEY,
142
4
            customer_id INTEGER
143
4
        )"#,
144
8
        )
145
8
        .await
146
4
        .unwrap();
147
4
        conn.execute(
148
4
            r#"CREATE TABLE cart_products (
149
4
                cart_id INTEGER,
150
4
                product_id INTEGER
151
4
            )"#,
152
8
        )
153
8
        .await
154
4
        .unwrap();
155
4

            
156
4
        Self { pool }
157
8
    }
158

            
159
12
    async fn new_operator_async(&self) -> Self::Operator {
160
12
        PostgresOperator {
161
12
            sqlite: self.pool.clone(),
162
12
        }
163
12
    }
164
}
165

            
166
pub struct PostgresOperator {
167
    sqlite: PgPool,
168
}
169

            
170
impl BackendOperator for PostgresOperator {}
171

            
172
#[async_trait]
173
impl Operator<Load> for PostgresOperator {
174
4
    async fn operate(
175
4
        &mut self,
176
4
        operation: &Load,
177
4
        _results: &[OperationResult],
178
4
        measurements: &Measurements,
179
4
    ) -> OperationResult {
180
4
        let measurement = measurements.begin("postgresql", Metric::Load);
181
26
        let mut conn = self.sqlite.acquire().await.unwrap();
182
4
        let mut tx = conn.begin().await.unwrap();
183
4
        let insert_category = tx
184
8
            .prepare("INSERT INTO commerce_bench.categories (id, name) VALUES ($1, $2)")
185
8
            .await
186
4
            .unwrap();
187
50
        for (id, category) in &operation.initial_data.categories {
188
50
            let mut args = PgArguments::default();
189
50
            args.reserve(2, 0);
190
50
            args.add(*id);
191
50
            args.add(&category.name);
192
95
            tx.execute(insert_category.query_with(args)).await.unwrap();
193
        }
194

            
195
4
        let insert_product = tx
196
8
            .prepare("INSERT INTO commerce_bench.products (id, name) VALUES ($1, $2)")
197
8
            .await
198
4
            .unwrap();
199
4
        let insert_product_category = tx
200
4
            .prepare("INSERT INTO commerce_bench.product_categories (product_id, category_id) VALUES ($1, $2)")
201
4
            .await
202
4
            .unwrap();
203
458
        for (&id, product) in &operation.initial_data.products {
204
458
            let mut args = PgArguments::default();
205
458
            args.reserve(2, 0);
206
458
            args.add(id);
207
458
            args.add(&product.name);
208
909
            tx.execute(insert_product.query_with(args)).await.unwrap();
209
1730
            for &category_id in &product.category_ids {
210
1272
                let mut args = PgArguments::default();
211
1272
                args.reserve(2, 0);
212
1272
                args.add(id);
213
1272
                args.add(category_id);
214
2534
                tx.execute(insert_product_category.query_with(args))
215
2534
                    .await
216
1272
                    .unwrap();
217
            }
218
        }
219

            
220
4
        let insert_customer = tx
221
8
            .prepare("INSERT INTO commerce_bench.customers (id, name, email, address, city, region, country, postal_code, phone) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)")
222
8
            .await
223
4
            .unwrap();
224
715
        for (id, customer) in &operation.initial_data.customers {
225
715
            let mut args = PgArguments::default();
226
715
            args.reserve(9, 0);
227
715
            args.add(*id);
228
715
            args.add(&customer.name);
229
715
            args.add(&customer.email);
230
715
            args.add(&customer.address);
231
715
            args.add(&customer.city);
232
715
            args.add(&customer.region);
233
715
            args.add(&customer.country);
234
715
            args.add(&customer.postal_code);
235
715
            args.add(&customer.phone);
236
1423
            tx.execute(insert_customer.query_with(args)).await.unwrap();
237
        }
238

            
239
4
        let insert_order = tx
240
8
            .prepare("INSERT INTO commerce_bench.orders (id, customer_id) VALUES ($1, $2)")
241
8
            .await
242
4
            .unwrap();
243
4
        let insert_order_product = tx
244
4
            .prepare(
245
4
                "INSERT INTO commerce_bench.order_products (order_id, product_id) VALUES ($1, $2)",
246
4
            )
247
4
            .await
248
4
            .unwrap();
249
1334
        for (&id, order) in &operation.initial_data.orders {
250
1334
            let mut args = PgArguments::default();
251
1334
            args.reserve(2, 0);
252
1334
            args.add(id);
253
1334
            args.add(order.customer_id);
254
2631
            tx.execute(insert_order.query_with(args)).await.unwrap();
255
13635
            for &product_id in &order.product_ids {
256
12301
                let mut args = PgArguments::default();
257
12301
                args.reserve(2, 0);
258
12301
                args.add(id);
259
12301
                args.add(product_id);
260
24162
                tx.execute(insert_order_product.query_with(args))
261
24162
                    .await
262
12301
                    .unwrap();
263
            }
264
        }
265

            
266
4
        let insert_review = tx
267
8
            .prepare("INSERT INTO commerce_bench.product_reviews (product_id, customer_id, rating, review) VALUES ($1, $2, $3, $4)")
268
8
            .await
269
4
            .unwrap();
270
273
        for review in &operation.initial_data.reviews {
271
273
            let mut args = PgArguments::default();
272
273
            args.reserve(4, 0);
273
273
            args.add(review.product_id);
274
273
            args.add(review.customer_id);
275
273
            args.add(review.rating as u32);
276
273
            args.add(&review.review);
277
540
            tx.execute(insert_review.query_with(args)).await.unwrap();
278
        }
279
4
        tx.execute(
280
4
            "SELECT setval('commerce_bench.orders_id_seq', COALESCE((SELECT MAX(id)+1 FROM commerce_bench.orders), 1), false)",
281
8
        )
282
8
        .await
283
4
        .unwrap();
284
4

            
285
8
        tx.commit().await.unwrap();
286
4
        // Make sure all ratings show up in the view.
287
8
        conn.execute("REFRESH MATERIALIZED VIEW commerce_bench.product_ratings")
288
8
            .await
289
4
            .unwrap();
290
4
        // This makes a significant difference.
291
8
        conn.execute("ANALYZE").await.unwrap();
292
4
        measurement.finish();
293
4

            
294
4
        OperationResult::Ok
295
8
    }
296
}
297
#[async_trait]
298
impl Operator<CreateCart> for PostgresOperator {
299
458
    async fn operate(
300
458
        &mut self,
301
458
        _operation: &CreateCart,
302
458
        _results: &[OperationResult],
303
458
        measurements: &Measurements,
304
458
    ) -> OperationResult {
305
458
        let measurement = measurements.begin("postgresql", Metric::CreateCart);
306
729
        let mut conn = self.sqlite.acquire().await.unwrap();
307
696
        let mut tx = conn.begin().await.unwrap();
308
458
        let statement = tx
309
458
            .prepare("insert into commerce_bench.carts (customer_id) values (null) returning id")
310
31
            .await
311
458
            .unwrap();
312

            
313
753
        let result = tx.fetch_one(statement.query()).await.unwrap();
314
912
        tx.commit().await.unwrap();
315
458
        let id: i32 = result.get(0);
316
458
        measurement.finish();
317
458

            
318
458
        OperationResult::Cart { id: id as u32 }
319
916
    }
320
}
321
#[async_trait]
322
impl Operator<AddProductToCart> for PostgresOperator {
323
1129
    async fn operate(
324
1129
        &mut self,
325
1129
        operation: &AddProductToCart,
326
1129
        results: &[OperationResult],
327
1129
        measurements: &Measurements,
328
1129
    ) -> OperationResult {
329
1129
        let cart = match &results[operation.cart.0] {
330
1129
            OperationResult::Cart { id } => *id,
331
            _ => unreachable!("Invalid operation result"),
332
        };
333
1129
        let product = match &results[operation.product.0] {
334
1129
            OperationResult::Product { id, .. } => *id,
335
            _ => unreachable!("Invalid operation result"),
336
        };
337

            
338
1129
        let measurement = measurements.begin("postgresql", Metric::AddProductToCart);
339
1778
        let mut conn = self.sqlite.acquire().await.unwrap();
340
1771
        let mut tx = conn.begin().await.unwrap();
341
1129
        let statement = tx
342
1129
            .prepare(
343
1129
                "insert into commerce_bench.cart_products (cart_id, product_id) values ($1, $2)",
344
1129
            )
345
39
            .await
346
1129
            .unwrap();
347
1129

            
348
1129
        let mut args = PgArguments::default();
349
1129
        args.reserve(2, 0);
350
1129
        args.add(cart);
351
1129
        args.add(product);
352
1129

            
353
1954
        tx.execute(statement.query_with(args)).await.unwrap();
354
2220
        tx.commit().await.unwrap();
355
1129
        measurement.finish();
356
1129

            
357
1129
        OperationResult::CartProduct { id: product }
358
2258
    }
359
}
360
#[async_trait]
361
impl Operator<FindProduct> for PostgresOperator {
362
2217
    async fn operate(
363
2217
        &mut self,
364
2217
        operation: &FindProduct,
365
2217
        _results: &[OperationResult],
366
2217
        measurements: &Measurements,
367
2217
    ) -> OperationResult {
368
2217
        let measurement = measurements.begin("postgresql", Metric::FindProduct);
369
3537
        let mut conn = self.sqlite.acquire().await.unwrap();
370
2217
        let statement = conn
371
2217
            .prepare(
372
2217
                r#"
373
2217
                SELECT 
374
2217
                    id, 
375
2217
                    name, 
376
2217
                    category_id,
377
2217
                    commerce_bench.product_ratings.total_rating as "total_rating: Option<i32>",
378
2217
                    commerce_bench.product_ratings.ratings as "ratings: Option<i32>"
379
2217
                FROM 
380
2217
                    commerce_bench.products 
381
2217
                LEFT OUTER JOIN commerce_bench.product_categories ON 
382
2217
                    commerce_bench.product_categories.product_id = id 
383
2217
                LEFT OUTER JOIN commerce_bench.product_ratings ON
384
2217
                    commerce_bench.product_ratings.product_id = id
385
2217
                WHERE name = $1
386
2217
                GROUP BY id, name, category_id, commerce_bench.product_ratings.total_rating, commerce_bench.product_ratings.ratings
387
2217
            "#,
388
2217
            )
389
35
            .await
390
2217
            .unwrap();
391
2217

            
392
2217
        let mut args = PgArguments::default();
393
2217
        args.reserve(1, 0);
394
2217
        args.add(&operation.name);
395
2217

            
396
2217
        let mut results = conn.fetch(statement.query_with(args));
397
2217
        let mut id: Option<i32> = None;
398
2217
        let mut name = None;
399
2217
        let mut category_ids = Vec::new();
400
2217
        let mut total_rating: Option<i32> = None;
401
2217
        let mut rating_count: Option<i32> = None;
402
10067
        while let Some(row) = results.next().await {
403
7850
            let row = row.unwrap();
404
7850
            id = Some(row.get(0));
405
7850
            name = Some(row.get(1));
406
7850
            total_rating = row.get(2);
407
7850
            rating_count = row.get(3);
408
7850
            if let Some(category_id) = row.get::<Option<i32>, _>(2) {
409
7494
                category_ids.push(category_id as u32);
410
7494
            }
411
        }
412
2217
        let rating_count = rating_count.unwrap_or_default();
413
2217
        let total_rating = total_rating.unwrap_or_default();
414
2217
        measurement.finish();
415
2217
        OperationResult::Product {
416
2217
            id: id.unwrap() as u32,
417
2217
            product: Product {
418
2217
                name: name.unwrap(),
419
2217
                category_ids,
420
2217
            },
421
2217
            rating: if rating_count > 0 {
422
1120
                Some(total_rating as f32 / rating_count as f32)
423
            } else {
424
1097
                None
425
            },
426
        }
427
4434
    }
428
}
429
#[async_trait]
430
impl Operator<LookupProduct> for PostgresOperator {
431
2189
    async fn operate(
432
2189
        &mut self,
433
2189
        operation: &LookupProduct,
434
2189
        _results: &[OperationResult],
435
2189
        measurements: &Measurements,
436
2189
    ) -> OperationResult {
437
2189
        let measurement = measurements.begin("postgresql", Metric::LookupProduct);
438
3489
        let mut conn = self.sqlite.acquire().await.unwrap();
439
2189
        let statement = conn
440
2189
            .prepare(
441
2189
                r#"
442
2189
                    SELECT 
443
2189
                        id, 
444
2189
                        name, 
445
2189
                        category_id,
446
2189
                        commerce_bench.product_ratings.total_rating as "total_rating: Option<i32>",
447
2189
                        commerce_bench.product_ratings.ratings as "ratings: Option<i32>"
448
2189
                    FROM 
449
2189
                        commerce_bench.products 
450
2189
                    LEFT OUTER JOIN commerce_bench.product_categories ON 
451
2189
                        commerce_bench.product_categories.product_id = id 
452
2189
                    LEFT OUTER JOIN commerce_bench.product_ratings ON
453
2189
                        commerce_bench.product_ratings.product_id = id
454
2189
                    WHERE id = $1
455
2189
                    GROUP BY id, name, category_id, commerce_bench.product_ratings.total_rating, commerce_bench.product_ratings.ratings
456
2189
                "#,
457
2189
            )
458
35
            .await
459
2189
            .unwrap();
460
2189

            
461
2189
        let mut args = PgArguments::default();
462
2189
        args.reserve(1, 0);
463
2189
        args.add(operation.id);
464
2189

            
465
2189
        let mut results = conn.fetch(statement.query_with(args));
466
2189
        let mut id: Option<i32> = None;
467
2189
        let mut name = None;
468
2189
        let mut category_ids = Vec::new();
469
2189
        let mut total_rating: Option<i32> = None;
470
2189
        let mut rating_count: Option<i32> = None;
471
10101
        while let Some(row) = results.next().await {
472
7912
            let row = row.unwrap();
473
7912
            id = Some(row.get(0));
474
7912
            name = Some(row.get(1));
475
7912
            total_rating = row.get(2);
476
7912
            rating_count = row.get(3);
477
7912
            if let Some(category_id) = row.get::<Option<i32>, _>(2) {
478
7540
                category_ids.push(category_id as u32);
479
7540
            }
480
        }
481
2189
        let rating_count = rating_count.unwrap_or_default();
482
2189
        let total_rating = total_rating.unwrap_or_default();
483
2189

            
484
2189
        measurement.finish();
485
2189
        OperationResult::Product {
486
2189
            id: id.unwrap() as u32,
487
2189
            product: Product {
488
2189
                name: name.unwrap(),
489
2189
                category_ids,
490
2189
            },
491
2189
            rating: if rating_count > 0 {
492
1085
                Some(total_rating as f32 / rating_count as f32)
493
            } else {
494
1104
                None
495
            },
496
        }
497
4378
    }
498
}
499

            
500
#[async_trait]
501
impl Operator<Checkout> for PostgresOperator {
502
112
    async fn operate(
503
112
        &mut self,
504
112
        operation: &Checkout,
505
112
        results: &[OperationResult],
506
112
        measurements: &Measurements,
507
112
    ) -> OperationResult {
508
112
        let cart = match &results[operation.cart.0] {
509
112
            OperationResult::Cart { id } => *id as i32,
510
            _ => unreachable!("Invalid operation result"),
511
        };
512

            
513
112
        let measurement = measurements.begin("postgresql", Metric::Checkout);
514
174
        let mut conn = self.sqlite.acquire().await.unwrap();
515
173
        let mut tx = conn.begin().await.unwrap();
516
        // Create a new order
517
112
        let statement = tx
518
112
            .prepare(r#"INSERT INTO commerce_bench.orders (customer_id) VALUES ($1) RETURNING ID"#)
519
32
            .await
520
112
            .unwrap();
521
112
        let mut args = PgArguments::default();
522
112
        args.reserve(1, 0);
523
112
        args.add(operation.customer_id);
524
177
        let result = tx.fetch_one(statement.query_with(args)).await.unwrap();
525
112
        let order_id: i32 = result.get(0);
526

            
527
112
        let statement = tx
528
112
            .prepare(r#"
529
112
                WITH products_in_cart AS (
530
112
                    DELETE FROM commerce_bench.cart_products WHERE cart_id = $1 RETURNING $2::int as order_id, product_id
531
112
                )
532
112
                INSERT INTO commerce_bench.order_products (order_id, product_id) SELECT * from products_in_cart;"#)
533
33
            .await
534
112
            .unwrap();
535
112
        let mut args = PgArguments::default();
536
112
        args.reserve(2, 0);
537
112
        args.add(cart);
538
112
        args.add(order_id);
539
179
        tx.execute(statement.query_with(args)).await.unwrap();
540

            
541
112
        let statement = tx
542
112
            .prepare(r#"DELETE FROM commerce_bench.carts WHERE id = $1"#)
543
37
            .await
544
112
            .unwrap();
545
112
        let mut args = PgArguments::default();
546
112
        args.reserve(1, 0);
547
112
        args.add(cart);
548
171
        tx.execute(statement.query_with(args)).await.unwrap();
549
112

            
550
112
        measurement.finish();
551
112

            
552
112
        OperationResult::Ok
553
224
    }
554
}
555

            
556
#[async_trait]
557
impl Operator<ReviewProduct> for PostgresOperator {
558
82
    async fn operate(
559
82
        &mut self,
560
82
        operation: &ReviewProduct,
561
82
        results: &[OperationResult],
562
82
        measurements: &Measurements,
563
82
    ) -> OperationResult {
564
82
        let product = match &results[operation.product_id.0] {
565
            OperationResult::Product { id, .. } => *id,
566
82
            OperationResult::CartProduct { id, .. } => *id,
567
            _ => unreachable!("Invalid operation result"),
568
        };
569
82
        let measurement = measurements.begin("postgresql", Metric::RateProduct);
570
145
        let mut conn = self.sqlite.acquire().await.unwrap();
571
114
        let mut tx = conn.begin().await.unwrap();
572
82
        let statement = tx
573
82
            .prepare(
574
82
                r#"INSERT INTO commerce_bench.product_reviews (
575
82
                        product_id, 
576
82
                        customer_id, 
577
82
                        rating, 
578
82
                        review)
579
82
                    VALUES ($1, $2, $3, $4)
580
82
                    ON CONFLICT (customer_id, product_id) DO UPDATE SET rating = $3, review = $4"#,
581
82
            )
582
33
            .await
583
82
            .unwrap();
584
82

            
585
82
        let mut args = PgArguments::default();
586
82
        args.reserve(4, 0);
587
82
        args.add(product);
588
82
        args.add(operation.customer_id);
589
82
        args.add(operation.rating as u32);
590
82
        args.add(&operation.review);
591
82

            
592
132
        tx.execute(statement.query_with(args)).await.unwrap();
593
163
        tx.commit().await.unwrap();
594
82
        // Make this rating show up
595
162
        conn.execute("REFRESH MATERIALIZED VIEW commerce_bench.product_ratings")
596
162
            .await
597
82
            .unwrap();
598
82
        measurement.finish();
599
82

            
600
82
        OperationResult::Ok
601
164
    }
602
}