1
use bonsaidb::core::actionable::async_trait;
2
use futures::StreamExt;
3
use sqlx::postgres::PgArguments;
4
use sqlx::{Arguments, Connection, Executor, PgPool, Row, Statement};
5

            
6
use crate::execute::{Backend, BackendOperator, Measurements, Metric, Operator};
7
use crate::model::Product;
8
use crate::plan::{
9
    AddProductToCart, Checkout, CreateCart, FindProduct, Load, LookupProduct, OperationResult,
10
    ReviewProduct,
11
};
12

            
13
pub struct Postgres {
14
    pool: PgPool,
15
}
16

            
17
#[async_trait]
18
impl Backend for Postgres {
19
    type Config = String;
20
    type Operator = PostgresOperator;
21

            
22
    fn label(&self) -> &'static str {
23
        "postgresql"
24
    }
25

            
26
4
    async fn new(url: Self::Config) -> Self {
27
24
        let pool = PgPool::connect(&url).await.unwrap();
28

            
29
4
        let mut conn = pool.acquire().await.unwrap();
30
4
        conn.execute(r#"DROP SCHEMA IF EXISTS commerce_bench CASCADE"#)
31
8
            .await
32
4
            .unwrap();
33
4
        conn.execute(r#"CREATE SCHEMA commerce_bench"#)
34
8
            .await
35
4
            .unwrap();
36
4
        conn.execute("SET search_path='commerce_bench';")
37
8
            .await
38
4
            .unwrap();
39
4
        conn.execute(
40
4
            r#"CREATE TABLE customers (
41
4
            id SERIAL PRIMARY KEY,
42
4
            name TEXT,
43
4
            email TEXT,
44
4
            address TEXT,
45
4
            city TEXT,
46
4
            region TEXT,
47
4
            country TEXT,
48
4
            postal_code TEXT,
49
4
            phone TEXT
50
4
        )"#,
51
4
        )
52
8
        .await
53
4
        .unwrap();
54
4
        conn.execute(
55
4
            r#"CREATE TABLE products (
56
4
                    id SERIAL PRIMARY KEY,
57
4
                    name TEXT
58
4
                )"#,
59
4
        )
60
8
        .await
61
4
        .unwrap();
62
4
        conn.execute(r#"CREATE INDEX products_by_name ON products(name)"#)
63
8
            .await
64
4
            .unwrap();
65
4
        conn.execute(
66
4
            r#"CREATE TABLE product_reviews (
67
4
                product_id INTEGER NOT NULL,-- REFERENCES products(id),
68
4
                customer_id INTEGER NOT NULL,-- REFERENCES customers(id),
69
4
                rating INTEGER NOT NULL,
70
4
                review TEXT
71
4
            )"#,
72
4
        )
73
8
        .await
74
4
        .unwrap();
75
4
        conn.execute(r#"CREATE INDEX product_reviews_by_product ON product_reviews(product_id)"#)
76
8
            .await
77
4
            .unwrap();
78
4
        conn.execute(r#"CREATE UNIQUE INDEX product_reviews_by_customer ON product_reviews(customer_id, product_id)"#)
79
8
            .await
80
4
            .unwrap();
81
4
        conn.execute(
82
4
            r#"CREATE MATERIALIZED VIEW
83
4
                    product_ratings
84
4
                AS
85
4
                    SELECT
86
4
                        product_id,
87
4
                        sum(rating)::int as total_rating,
88
4
                        count(rating)::int as ratings
89
4
                    FROM
90
4
                        product_reviews
91
4
                    GROUP BY product_id
92
4
            "#,
93
4
        )
94
8
        .await
95
4
        .unwrap();
96
4
        conn.execute(
97
4
            r#"CREATE TABLE categories (
98
4
                    id SERIAL PRIMARY KEY,
99
4
                    name TEXT
100
4
                )"#,
101
4
        )
102
8
        .await
103
4
        .unwrap();
104
4
        conn.execute(
105
4
            r#"CREATE TABLE product_categories (
106
4
                    product_id INTEGER,-- REFERENCES products(id),
107
4
                    category_id INTEGER-- REFERENCES categories(id)
108
4
                )"#,
109
4
        )
110
8
        .await
111
4
        .unwrap();
112
4
        conn.execute(
113
4
            r#"CREATE INDEX product_categories_by_product ON product_categories(product_id)"#,
114
4
        )
115
8
        .await
116
4
        .unwrap();
117
4
        conn.execute(
118
4
            r#"CREATE INDEX product_categories_by_category ON product_categories(category_id)"#,
119
4
        )
120
8
        .await
121
4
        .unwrap();
122
4
        conn.execute(
123
4
            r#"CREATE TABLE orders (
124
4
                    id SERIAL PRIMARY KEY,
125
4
                    customer_id INTEGER -- REFERENCES customers(id)
126
4
                )"#,
127
4
        )
128
8
        .await
129
4
        .unwrap();
130
4
        conn.execute(
131
4
            r#"CREATE TABLE order_products (
132
4
                    order_id INTEGER NOT NULL,-- REFERENCES orders(id),
133
4
                    product_id INTEGER NOT NULL -- REFERENCES products(id)
134
4
                )"#,
135
4
        )
136
8
        .await
137
4
        .unwrap();
138
4
        conn.execute(
139
4
            r#"CREATE TABLE carts (
140
4
            id SERIAL PRIMARY KEY,
141
4
            customer_id INTEGER
142
4
        )"#,
143
4
        )
144
8
        .await
145
4
        .unwrap();
146
4
        conn.execute(
147
4
            r#"CREATE TABLE cart_products (
148
4
                cart_id INTEGER,
149
4
                product_id INTEGER
150
4
            )"#,
151
4
        )
152
8
        .await
153
4
        .unwrap();
154
4

            
155
4
        Self { pool }
156
12
    }
157

            
158
18
    async fn new_operator_async(&self) -> Self::Operator {
159
18
        PostgresOperator {
160
18
            sqlite: self.pool.clone(),
161
18
        }
162
36
    }
163
}
164

            
165
pub struct PostgresOperator {
166
    sqlite: PgPool,
167
}
168

            
169
impl BackendOperator for PostgresOperator {
170
    type Id = u32;
171
}
172

            
173
#[async_trait]
174
impl Operator<Load, u32> for PostgresOperator {
175
4
    async fn operate(
176
4
        &mut self,
177
4
        operation: &Load,
178
4
        _results: &[OperationResult<u32>],
179
4
        measurements: &Measurements,
180
4
    ) -> OperationResult<u32> {
181
4
        let measurement = measurements.begin("postgresql", Metric::Load);
182
28
        let mut conn = self.sqlite.acquire().await.unwrap();
183
4
        let mut tx = conn.begin().await.unwrap();
184
4
        let insert_category = tx
185
4
            .prepare("INSERT INTO commerce_bench.categories (id, name) VALUES ($1, $2)")
186
8
            .await
187
4
            .unwrap();
188
59
        for (id, category) in &operation.initial_data.categories {
189
59
            let mut args = PgArguments::default();
190
59
            args.reserve(2, 0);
191
59
            args.add(*id as i32);
192
59
            args.add(&category.name);
193
114
            tx.execute(insert_category.query_with(args)).await.unwrap();
194
        }
195

            
196
4
        let insert_product = tx
197
4
            .prepare("INSERT INTO commerce_bench.products (id, name) VALUES ($1, $2)")
198
8
            .await
199
4
            .unwrap();
200
4
        let insert_product_category = tx
201
4
            .prepare("INSERT INTO commerce_bench.product_categories (product_id, category_id) VALUES ($1, $2)")
202
4
            .await
203
4
            .unwrap();
204
437
        for (&id, product) in &operation.initial_data.products {
205
437
            let mut args = PgArguments::default();
206
437
            args.reserve(2, 0);
207
437
            args.add(id as i32);
208
437
            args.add(&product.name);
209
870
            tx.execute(insert_product.query_with(args)).await.unwrap();
210
1982
            for &category_id in &product.category_ids {
211
1545
                let mut args = PgArguments::default();
212
1545
                args.reserve(2, 0);
213
1545
                args.add(id as i32);
214
1545
                args.add(category_id as i32);
215
1545
                tx.execute(insert_product_category.query_with(args))
216
3090
                    .await
217
1545
                    .unwrap();
218
            }
219
        }
220

            
221
4
        let insert_customer = tx
222
4
            .prepare("INSERT INTO commerce_bench.customers (id, name, email, address, city, region, country, postal_code, phone) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)")
223
8
            .await
224
4
            .unwrap();
225
427
        for (id, customer) in &operation.initial_data.customers {
226
427
            let mut args = PgArguments::default();
227
427
            args.reserve(9, 0);
228
427
            args.add(*id as i32);
229
427
            args.add(&customer.name);
230
427
            args.add(&customer.email);
231
427
            args.add(&customer.address);
232
427
            args.add(&customer.city);
233
427
            args.add(&customer.region);
234
427
            args.add(&customer.country);
235
427
            args.add(&customer.postal_code);
236
427
            args.add(&customer.phone);
237
850
            tx.execute(insert_customer.query_with(args)).await.unwrap();
238
        }
239

            
240
4
        let insert_order = tx
241
4
            .prepare("INSERT INTO commerce_bench.orders (id, customer_id) VALUES ($1, $2)")
242
8
            .await
243
4
            .unwrap();
244
4
        let insert_order_product = tx
245
4
            .prepare(
246
4
                "INSERT INTO commerce_bench.order_products (order_id, product_id) VALUES ($1, $2)",
247
4
            )
248
4
            .await
249
4
            .unwrap();
250
1008
        for (&id, order) in &operation.initial_data.orders {
251
1008
            let mut args = PgArguments::default();
252
1008
            args.reserve(2, 0);
253
1008
            args.add(id as i32);
254
1008
            args.add(order.customer_id as i32);
255
2012
            tx.execute(insert_order.query_with(args)).await.unwrap();
256
10120
            for &product_id in &order.product_ids {
257
9112
                let mut args = PgArguments::default();
258
9112
                args.reserve(2, 0);
259
9112
                args.add(id as i32);
260
9112
                args.add(product_id as i32);
261
9112
                tx.execute(insert_order_product.query_with(args))
262
18224
                    .await
263
9112
                    .unwrap();
264
            }
265
        }
266

            
267
4
        let insert_review = tx
268
4
            .prepare("INSERT INTO commerce_bench.product_reviews (product_id, customer_id, rating, review) VALUES ($1, $2, $3, $4)")
269
8
            .await
270
4
            .unwrap();
271
487
        for review in &operation.initial_data.reviews {
272
487
            let mut args = PgArguments::default();
273
487
            args.reserve(4, 0);
274
487
            args.add(review.product_id as i32);
275
487
            args.add(review.customer_id as i32);
276
487
            args.add(review.rating as i32);
277
487
            args.add(&review.review);
278
970
            tx.execute(insert_review.query_with(args)).await.unwrap();
279
        }
280
4
        tx.execute(
281
4
            "SELECT setval('commerce_bench.orders_id_seq', COALESCE((SELECT MAX(id)+1 FROM commerce_bench.orders), 1), false)",
282
4
        )
283
8
        .await
284
4
        .unwrap();
285
4

            
286
8
        tx.commit().await.unwrap();
287
4
        // Make sure all ratings show up in the view.
288
4
        conn.execute("REFRESH MATERIALIZED VIEW commerce_bench.product_ratings")
289
8
            .await
290
4
            .unwrap();
291
4
        // This makes a significant difference.
292
8
        conn.execute("ANALYZE").await.unwrap();
293
4
        measurement.finish();
294
4

            
295
4
        OperationResult::Ok
296
12
    }
297
}
298
#[async_trait]
299
impl Operator<CreateCart, u32> for PostgresOperator {
300
460
    async fn operate(
301
460
        &mut self,
302
460
        _operation: &CreateCart,
303
460
        _results: &[OperationResult<u32>],
304
460
        measurements: &Measurements,
305
460
    ) -> OperationResult<u32> {
306
460
        let measurement = measurements.begin("postgresql", Metric::CreateCart);
307
941
        let mut conn = self.sqlite.acquire().await.unwrap();
308
460
        let mut tx = conn.begin().await.unwrap();
309
460
        let statement = tx
310
460
            .prepare("insert into commerce_bench.carts (customer_id) values (null) returning id")
311
46
            .await
312
460
            .unwrap();
313

            
314
853
        let result = tx.fetch_one(statement.query()).await.unwrap();
315
904
        tx.commit().await.unwrap();
316
460
        let id: i32 = result.get(0);
317
460
        measurement.finish();
318
460

            
319
460
        OperationResult::Cart { id: id as u32 }
320
1380
    }
321
}
322
#[async_trait]
323
impl Operator<AddProductToCart, u32> for PostgresOperator {
324
1148
    async fn operate(
325
1148
        &mut self,
326
1148
        operation: &AddProductToCart,
327
1148
        results: &[OperationResult<u32>],
328
1148
        measurements: &Measurements,
329
1148
    ) -> OperationResult<u32> {
330
1148
        let cart = match &results[operation.cart.0] {
331
1148
            OperationResult::Cart { id } => *id,
332
            _ => unreachable!("Invalid operation result"),
333
        };
334
1148
        let product = match &results[operation.product.0] {
335
1148
            OperationResult::Product { id, .. } => *id,
336
            _ => unreachable!("Invalid operation result"),
337
        };
338

            
339
1148
        let measurement = measurements.begin("postgresql", Metric::AddProductToCart);
340
2273
        let mut conn = self.sqlite.acquire().await.unwrap();
341
1148
        let mut tx = conn.begin().await.unwrap();
342
1148
        let statement = tx
343
1148
            .prepare(
344
1148
                "insert into commerce_bench.cart_products (cart_id, product_id) values ($1, $2)",
345
1148
            )
346
42
            .await
347
1148
            .unwrap();
348
1148

            
349
1148
        let mut args = PgArguments::default();
350
1148
        args.reserve(2, 0);
351
1148
        args.add(cart as i32);
352
1148
        args.add(product as i32);
353
1148

            
354
2170
        tx.execute(statement.query_with(args)).await.unwrap();
355
2279
        tx.commit().await.unwrap();
356
1148
        measurement.finish();
357
1148

            
358
1148
        OperationResult::CartProduct { id: product }
359
3444
    }
360
}
361
#[async_trait]
362
impl Operator<FindProduct, u32> for PostgresOperator {
363
2302
    async fn operate(
364
2302
        &mut self,
365
2302
        operation: &FindProduct,
366
2302
        _results: &[OperationResult<u32>],
367
2302
        measurements: &Measurements,
368
2302
    ) -> OperationResult<u32> {
369
2302
        let measurement = measurements.begin("postgresql", Metric::FindProduct);
370
4590
        let mut conn = self.sqlite.acquire().await.unwrap();
371
2302
        let statement = conn
372
2302
            .prepare(
373
2302
                r#"
374
2302
                SELECT
375
2302
                    id,
376
2302
                    name,
377
2302
                    category_id,
378
2302
                    commerce_bench.product_ratings.total_rating as "total_rating: Option<i32>",
379
2302
                    commerce_bench.product_ratings.ratings as "ratings: Option<i32>"
380
2302
                FROM
381
2302
                    commerce_bench.products
382
2302
                LEFT OUTER JOIN commerce_bench.product_categories ON
383
2302
                    commerce_bench.product_categories.product_id = id
384
2302
                LEFT OUTER JOIN commerce_bench.product_ratings ON
385
2302
                    commerce_bench.product_ratings.product_id = id
386
2302
                WHERE name = $1
387
2302
                GROUP BY id, name, category_id, commerce_bench.product_ratings.total_rating, commerce_bench.product_ratings.ratings
388
2302
            "#,
389
2302
            )
390
23
            .await
391
2302
            .unwrap();
392
2302

            
393
2302
        let mut args = PgArguments::default();
394
2302
        args.reserve(1, 0);
395
2302
        args.add(&operation.name);
396
2302

            
397
2302
        let mut results = conn.fetch(statement.query_with(args));
398
2302
        let mut id: Option<i32> = None;
399
2302
        let mut name = None;
400
2302
        let mut category_ids = Vec::new();
401
2302
        let mut total_rating: Option<i32> = None;
402
2302
        let mut rating_count: Option<i32> = None;
403
14072
        while let Some(row) = results.next().await {
404
11770
            let row = row.unwrap();
405
11770
            id = Some(row.get(0));
406
11770
            name = Some(row.get(1));
407
11770
            total_rating = row.get(2);
408
11770
            rating_count = row.get(3);
409
11770
            if let Some(category_id) = row.get::<Option<i32>, _>(2) {
410
11478
                category_ids.push(category_id as u32);
411
11478
            }
412
        }
413
2302
        let rating_count = rating_count.unwrap_or_default();
414
2302
        let total_rating = total_rating.unwrap_or_default();
415
2302
        measurement.finish();
416
2302
        OperationResult::Product {
417
2302
            id: id.unwrap() as u32,
418
2302
            product: Product {
419
2302
                name: name.unwrap(),
420
2302
                category_ids,
421
2302
            },
422
2302
            rating: if rating_count > 0 {
423
1663
                Some(total_rating as f32 / rating_count as f32)
424
            } else {
425
639
                None
426
            },
427
        }
428
6906
    }
429
}
430
#[async_trait]
431
impl Operator<LookupProduct, u32> for PostgresOperator {
432
2200
    async fn operate(
433
2200
        &mut self,
434
2200
        operation: &LookupProduct,
435
2200
        _results: &[OperationResult<u32>],
436
2200
        measurements: &Measurements,
437
2200
    ) -> OperationResult<u32> {
438
2200
        let measurement = measurements.begin("postgresql", Metric::LookupProduct);
439
4348
        let mut conn = self.sqlite.acquire().await.unwrap();
440
2200
        let statement = conn
441
2200
            .prepare(
442
2200
                r#"
443
2200
                    SELECT
444
2200
                        id,
445
2200
                        name,
446
2200
                        category_id,
447
2200
                        commerce_bench.product_ratings.total_rating as "total_rating: Option<i32>",
448
2200
                        commerce_bench.product_ratings.ratings as "ratings: Option<i32>"
449
2200
                    FROM
450
2200
                        commerce_bench.products
451
2200
                    LEFT OUTER JOIN commerce_bench.product_categories ON
452
2200
                        commerce_bench.product_categories.product_id = id
453
2200
                    LEFT OUTER JOIN commerce_bench.product_ratings ON
454
2200
                        commerce_bench.product_ratings.product_id = id
455
2200
                    WHERE id = $1
456
2200
                    GROUP BY id, name, category_id, commerce_bench.product_ratings.total_rating, commerce_bench.product_ratings.ratings
457
2200
                "#,
458
2200
            )
459
20
            .await
460
2200
            .unwrap();
461
2200

            
462
2200
        let mut args = PgArguments::default();
463
2200
        args.reserve(1, 0);
464
2200
        args.add(operation.id as i32);
465
2200

            
466
2200
        let mut results = conn.fetch(statement.query_with(args));
467
2200
        let mut id: Option<i32> = None;
468
2200
        let mut name = None;
469
2200
        let mut category_ids = Vec::new();
470
2200
        let mut total_rating: Option<i32> = None;
471
2200
        let mut rating_count: Option<i32> = None;
472
13108
        while let Some(row) = results.next().await {
473
10908
            let row = row.unwrap();
474
10908
            id = Some(row.get(0));
475
10908
            name = Some(row.get(1));
476
10908
            total_rating = row.get(2);
477
10908
            rating_count = row.get(3);
478
10908
            if let Some(category_id) = row.get::<Option<i32>, _>(2) {
479
10618
                category_ids.push(category_id as u32);
480
10618
            }
481
        }
482
2200
        let rating_count = rating_count.unwrap_or_default();
483
2200
        let total_rating = total_rating.unwrap_or_default();
484
2200

            
485
2200
        measurement.finish();
486
2200
        OperationResult::Product {
487
2200
            id: id.unwrap() as u32,
488
2200
            product: Product {
489
2200
                name: name.unwrap(),
490
2200
                category_ids,
491
2200
            },
492
2200
            rating: if rating_count > 0 {
493
1604
                Some(total_rating as f32 / rating_count as f32)
494
            } else {
495
596
                None
496
            },
497
        }
498
6600
    }
499
}
500

            
501
#[async_trait]
502
impl Operator<Checkout, u32> for PostgresOperator {
503
129
    async fn operate(
504
129
        &mut self,
505
129
        operation: &Checkout,
506
129
        results: &[OperationResult<u32>],
507
129
        measurements: &Measurements,
508
129
    ) -> OperationResult<u32> {
509
129
        let cart = match &results[operation.cart.0] {
510
129
            OperationResult::Cart { id } => *id as i32,
511
            _ => unreachable!("Invalid operation result"),
512
        };
513

            
514
129
        let measurement = measurements.begin("postgresql", Metric::Checkout);
515
264
        let mut conn = self.sqlite.acquire().await.unwrap();
516
129
        let mut tx = conn.begin().await.unwrap();
517
        // Create a new order
518
129
        let statement = tx
519
129
            .prepare(r#"INSERT INTO commerce_bench.orders (customer_id) VALUES ($1) RETURNING ID"#)
520
44
            .await
521
129
            .unwrap();
522
129
        let mut args = PgArguments::default();
523
129
        args.reserve(1, 0);
524
129
        args.add(operation.customer_id as i32);
525
231
        let result = tx.fetch_one(statement.query_with(args)).await.unwrap();
526
129
        let order_id: i32 = result.get(0);
527

            
528
129
        let statement = tx
529
129
            .prepare(r#"
530
129
                WITH products_in_cart AS (
531
129
                    DELETE FROM commerce_bench.cart_products WHERE cart_id = $1 RETURNING $2::int as order_id, product_id
532
129
                )
533
129
                INSERT INTO commerce_bench.order_products (order_id, product_id) SELECT * from products_in_cart;"#)
534
44
            .await
535
129
            .unwrap();
536
129
        let mut args = PgArguments::default();
537
129
        args.reserve(2, 0);
538
129
        args.add(cart);
539
129
        args.add(order_id);
540
231
        tx.execute(statement.query_with(args)).await.unwrap();
541

            
542
129
        let statement = tx
543
129
            .prepare(r#"DELETE FROM commerce_bench.carts WHERE id = $1"#)
544
33
            .await
545
129
            .unwrap();
546
129
        let mut args = PgArguments::default();
547
129
        args.reserve(1, 0);
548
129
        args.add(cart);
549
225
        tx.execute(statement.query_with(args)).await.unwrap();
550
255
        tx.commit().await.unwrap();
551
129

            
552
129
        measurement.finish();
553
129

            
554
129
        OperationResult::Ok
555
387
    }
556
}
557

            
558
#[async_trait]
559
impl Operator<ReviewProduct, u32> for PostgresOperator {
560
93
    async fn operate(
561
93
        &mut self,
562
93
        operation: &ReviewProduct,
563
93
        results: &[OperationResult<u32>],
564
93
        measurements: &Measurements,
565
93
    ) -> OperationResult<u32> {
566
93
        let product = match &results[operation.product_id.0] {
567
            OperationResult::Product { id, .. } => *id,
568
93
            OperationResult::CartProduct { id, .. } => *id,
569
            _ => unreachable!("Invalid operation result"),
570
        };
571
93
        let measurement = measurements.begin("postgresql", Metric::RateProduct);
572
175
        let mut conn = self.sqlite.acquire().await.unwrap();
573
93
        let mut tx = conn.begin().await.unwrap();
574
93
        let statement = tx
575
93
            .prepare(
576
93
                r#"INSERT INTO commerce_bench.product_reviews (
577
93
                        product_id,
578
93
                        customer_id,
579
93
                        rating,
580
93
                        review)
581
93
                    VALUES ($1, $2, $3, $4)
582
93
                    ON CONFLICT (customer_id, product_id) DO UPDATE SET rating = $3, review = $4"#,
583
93
            )
584
36
            .await
585
93
            .unwrap();
586
93

            
587
93
        let mut args = PgArguments::default();
588
93
        args.reserve(4, 0);
589
93
        args.add(product as i32);
590
93
        args.add(operation.customer_id as i32);
591
93
        args.add(operation.rating as i32);
592
93
        args.add(&operation.review);
593
93

            
594
156
        tx.execute(statement.query_with(args)).await.unwrap();
595
183
        tx.commit().await.unwrap();
596
93
        // Make this rating show up
597
93
        conn.execute("REFRESH MATERIALIZED VIEW commerce_bench.product_ratings")
598
186
            .await
599
93
            .unwrap();
600
93
        measurement.finish();
601
93

            
602
93
        OperationResult::Ok
603
279
    }
604
}