1
1
use std::time::Duration;
2

            
3
use bonsaidb::{
4
    core::pubsub::{AsyncPubSub, AsyncSubscriber},
5
    local::{
6
        config::{Builder, StorageConfiguration},
7
        AsyncDatabase,
8
    },
9
};
10
use tokio::time::sleep;
11

            
12
#[tokio::main]
13
1
async fn main() -> Result<(), bonsaidb::local::Error> {
14
    // This example is using a database with no collections, because PubSub is a
15
    // system independent of the data stored in the database.
16
1
    let db = AsyncDatabase::open::<()>(StorageConfiguration::new("pubsub.bonsaidb")).await?;
17

            
18
1
    let subscriber = db.create_subscriber().await?;
19
    // Subscribe for messages sent to the topic "pong"
20
1
    subscriber.subscribe_to(&"pong").await?;
21

            
22
    // Launch a task that sends out "ping" messages.
23
1
    tokio::spawn(pinger(db.clone()));
24
1
    // Launch a task that receives "ping" messages and sends "pong" responses.
25
1
    tokio::spawn(ponger(db.clone()));
26

            
27
    // Loop until a we receive a message letting us know when the ponger() has
28
    // no pings remaining.
29
    loop {
30
5
        let message = subscriber.receiver().receive_async().await?;
31
5
        let pings_remaining = message.payload::<usize>()?;
32
5
        println!(
33
5
            "<-- Received {}, pings remaining: {}",
34
5
            message.topic::<String>()?,
35
            pings_remaining
36
        );
37
5
        if pings_remaining == 0 {
38
1
            break;
39
4
        }
40
    }
41

            
42
1
    println!("Received all pongs.");
43
1

            
44
1
    Ok(())
45
1
}
46

            
47
1
async fn pinger<P: AsyncPubSub>(pubsub: P) -> Result<(), bonsaidb::local::Error> {
48
1
    let mut ping_count = 0u32;
49
    loop {
50
6
        ping_count += 1;
51
6
        println!("-> Sending ping {}", ping_count);
52
6
        pubsub.publish(&"ping", &ping_count).await?;
53
6
        sleep(Duration::from_millis(250)).await;
54
    }
55
}
56

            
57
1
async fn ponger<P: AsyncPubSub<Subscriber = S>, S: AsyncSubscriber + std::fmt::Debug>(
58
1
    pubsub: P,
59
1
) -> Result<(), bonsaidb::local::Error> {
60
    const NUMBER_OF_PONGS: usize = 5;
61
1
    let subscriber = pubsub.create_subscriber().await?;
62
1
    subscriber.subscribe_to(&"ping").await?;
63
1
    let mut pings_remaining = NUMBER_OF_PONGS;
64
1

            
65
1
    println!(
66
1
        "Ponger started, waiting to respond to {} pings",
67
1
        pings_remaining
68
1
    );
69

            
70
6
    while pings_remaining > 0 {
71
5
        let message = subscriber.receiver().receive_async().await?;
72
5
        println!(
73
5
            "<- Received {}, id {}",
74
5
            message.topic::<&str>().unwrap(),
75
5
            message.payload::<u32>().unwrap()
76
5
        );
77
5
        pings_remaining -= 1;
78
5
        pubsub.publish(&"pong", &pings_remaining).await?;
79
    }
80

            
81
1
    println!("Ponger finished.");
82
1

            
83
1
    Ok(())
84
1
}
85

            
86
1
#[test]
87
1
fn runs() {
88
1
    main().unwrap()
89
1
}