1
1
use std::{thread::sleep, time::Duration};
2

            
3
use bonsaidb::{
4
    core::pubsub::{PubSub, Subscriber},
5
    local::{
6
        config::{Builder, StorageConfiguration},
7
        Database,
8
    },
9
};
10

            
11
1
fn main() -> Result<(), bonsaidb::local::Error> {
12
    // This example is using a database with no collections, because PubSub is a
13
    // system independent of the data stored in the database.
14
1
    let db = Database::open::<()>(StorageConfiguration::new("pubsub.bonsaidb"))?;
15

            
16
1
    let subscriber = db.create_subscriber()?;
17
    // Subscribe for messages sent to the topic "pong"
18
1
    subscriber.subscribe_to(&"pong")?;
19

            
20
    // Launch a thread that sends out "ping" messages.
21
1
    let thread_db = db.clone();
22
1
    std::thread::spawn(move || pinger(thread_db));
23
1
    // Launch a thread that receives "ping" messages and sends "pong" responses.
24
1
    std::thread::spawn(move || ponger(db));
25

            
26
    // Loop until a we receive a message letting us know when the ponger() has
27
    // no pings remaining.
28
    loop {
29
5
        let message = subscriber.receiver().receive()?;
30
5
        let pings_remaining = message.payload::<usize>()?;
31
5
        println!(
32
5
            "<-- Received {}, pings remaining: {}",
33
5
            message.topic::<String>()?,
34
            pings_remaining
35
        );
36
5
        if pings_remaining == 0 {
37
1
            break;
38
4
        }
39
    }
40

            
41
1
    println!("Received all pongs.");
42
1

            
43
1
    Ok(())
44
1
}
45

            
46
1
fn pinger<P: PubSub>(pubsub: P) -> Result<(), bonsaidb::local::Error> {
47
1
    let mut ping_count = 0u32;
48
    loop {
49
6
        ping_count += 1;
50
6
        println!("-> Sending ping {}", ping_count);
51
6
        pubsub.publish(&"ping", &ping_count)?;
52
5
        sleep(Duration::from_millis(250));
53
    }
54
1
}
55

            
56
1
fn ponger<P: PubSub>(pubsub: P) -> Result<(), bonsaidb::local::Error> {
57
    const NUMBER_OF_PONGS: usize = 5;
58
1
    let subscriber = pubsub.create_subscriber()?;
59
1
    subscriber.subscribe_to(&"ping")?;
60
1
    let mut pings_remaining = NUMBER_OF_PONGS;
61
1

            
62
1
    println!(
63
1
        "Ponger started, waiting to respond to {} pings",
64
1
        pings_remaining
65
1
    );
66

            
67
6
    while pings_remaining > 0 {
68
5
        let message = subscriber.receiver().receive()?;
69
5
        println!(
70
5
            "<- Received {}, id {}",
71
5
            message.topic::<&str>().unwrap(),
72
5
            message.payload::<u32>().unwrap()
73
5
        );
74
5
        pings_remaining -= 1;
75
5
        pubsub.publish(&"pong", &pings_remaining)?;
76
    }
77

            
78
1
    println!("Ponger finished.");
79
1

            
80
1
    Ok(())
81
1
}
82

            
83
1
#[test]
84
1
fn runs() {
85
1
    main().unwrap()
86
1
}