1
use std::time::Duration;
2

            
3
use bonsaidb::core::pubsub::{AsyncPubSub, AsyncSubscriber};
4
use bonsaidb::local::config::{Builder, StorageConfiguration};
5
use bonsaidb::local::AsyncDatabase;
6
use tokio::time::sleep;
7

            
8
#[tokio::main]
9
1
async fn main() -> Result<(), bonsaidb::local::Error> {
10
    // This example is using a database with no collections, because PubSub is a
11
    // system independent of the data stored in the database.
12
1
    let db = AsyncDatabase::open::<()>(StorageConfiguration::new("pubsub.bonsaidb")).await?;
13

            
14
1
    let subscriber = db.create_subscriber().await?;
15
    // Subscribe for messages sent to the topic "pong"
16
1
    subscriber.subscribe_to(&"pong").await?;
17

            
18
    // Launch a task that sends out "ping" messages.
19
1
    tokio::spawn(pinger(db.clone()));
20
1
    // Launch a task that receives "ping" messages and sends "pong" responses.
21
1
    tokio::spawn(ponger(db.clone()));
22

            
23
    // Loop until a we receive a message letting us know when the ponger() has
24
    // no pings remaining.
25
    loop {
26
5
        let message = subscriber.receiver().receive_async().await?;
27
5
        let pings_remaining = message.payload::<usize>()?;
28
5
        println!(
29
5
            "<-- Received {}, pings remaining: {}",
30
5
            message.topic::<String>()?,
31
            pings_remaining
32
        );
33
5
        if pings_remaining == 0 {
34
1
            break;
35
4
        }
36
    }
37

            
38
1
    println!("Received all pongs.");
39
1

            
40
1
    Ok(())
41
}
42

            
43
1
async fn pinger<P: AsyncPubSub>(pubsub: P) -> Result<(), bonsaidb::local::Error> {
44
1
    let mut ping_count = 0u32;
45
    loop {
46
6
        ping_count += 1;
47
6
        println!("-> Sending ping {ping_count}");
48
6
        pubsub.publish(&"ping", &ping_count).await?;
49
6
        sleep(Duration::from_millis(250)).await;
50
    }
51
}
52

            
53
1
async fn ponger<P: AsyncPubSub<Subscriber = S>, S: AsyncSubscriber + std::fmt::Debug>(
54
1
    pubsub: P,
55
1
) -> Result<(), bonsaidb::local::Error> {
56
    const NUMBER_OF_PONGS: usize = 5;
57
1
    let subscriber = pubsub.create_subscriber().await?;
58
1
    subscriber.subscribe_to(&"ping").await?;
59
1
    let mut pings_remaining = NUMBER_OF_PONGS;
60
1

            
61
1
    println!("Ponger started, waiting to respond to {pings_remaining} pings");
62

            
63
6
    while pings_remaining > 0 {
64
5
        let message = subscriber.receiver().receive_async().await?;
65
5
        println!(
66
5
            "<- Received {}, id {}",
67
5
            message.topic::<&str>().unwrap(),
68
5
            message.payload::<u32>().unwrap()
69
5
        );
70
5
        pings_remaining -= 1;
71
5
        pubsub.publish(&"pong", &pings_remaining).await?;
72
    }
73

            
74
1
    println!("Ponger finished.");
75
1

            
76
1
    Ok(())
77
1
}
78

            
79
1
#[test]
80
1
fn runs() {
81
1
    main().unwrap()
82
1
}