1
use async_trait::async_trait;
2
use circulate::{flume, Message};
3
use serde::Serialize;
4

            
5
use crate::Error;
6

            
7
/// Publishes and Subscribes to messages on topics.
8
pub trait PubSub {
9
    /// The Subscriber type for this `PubSub` connection.
10
    type Subscriber: Subscriber;
11

            
12
    /// Create a new [`Subscriber`] for this relay.
13
    fn create_subscriber(&self) -> Result<Self::Subscriber, Error>;
14

            
15
    /// Publishes a `payload` to all subscribers of `topic`.
16
41
    fn publish<Topic: Serialize, Payload: Serialize>(
17
41
        &self,
18
41
        topic: &Topic,
19
41
        payload: &Payload,
20
41
    ) -> Result<(), Error> {
21
41
        self.publish_bytes(pot::to_vec(topic)?, pot::to_vec(payload)?)
22
41
    }
23

            
24
    /// Publishes a `payload` to all subscribers of `topic`.
25
    fn publish_bytes(&self, topic: Vec<u8>, payload: Vec<u8>) -> Result<(), Error>;
26

            
27
    /// Publishes a `payload` to all subscribers of all `topics`.
28
3
    fn publish_to_all<
29
3
        'topics,
30
3
        Topics: IntoIterator<Item = &'topics Topic> + 'topics,
31
3
        Topic: Serialize + 'topics,
32
3
        Payload: Serialize,
33
3
    >(
34
3
        &self,
35
3
        topics: Topics,
36
3
        payload: &Payload,
37
3
    ) -> Result<(), Error> {
38
3
        let topics = topics
39
3
            .into_iter()
40
3
            .map(pot::to_vec)
41
3
            .collect::<Result<Vec<_>, _>>()?;
42
3
        self.publish_bytes_to_all(topics, pot::to_vec(payload)?)
43
3
    }
44

            
45
    /// Publishes a `payload` to all subscribers of all `topics`.
46
    fn publish_bytes_to_all(
47
        &self,
48
        topics: impl IntoIterator<Item = Vec<u8>> + Send,
49
        payload: Vec<u8>,
50
    ) -> Result<(), Error>;
51
}
52

            
53
/// A subscriber to one or more topics.
54
pub trait Subscriber {
55
    /// Subscribe to [`Message`]s published to `topic`.
56
41
    fn subscribe_to<Topic: Serialize>(&self, topic: &Topic) -> Result<(), Error> {
57
41
        self.subscribe_to_bytes(pot::to_vec(topic)?)
58
41
    }
59

            
60
    /// Subscribe to [`Message`]s published to `topic`.
61
    fn subscribe_to_bytes(&self, topic: Vec<u8>) -> Result<(), Error>;
62

            
63
    /// Unsubscribe from [`Message`]s published to `topic`.
64
3
    fn unsubscribe_from<Topic: Serialize>(&self, topic: &Topic) -> Result<(), Error> {
65
3
        self.unsubscribe_from_bytes(&pot::to_vec(topic)?)
66
3
    }
67

            
68
    /// Unsubscribe from [`Message`]s published to `topic`.
69
    fn unsubscribe_from_bytes(&self, topic: &[u8]) -> Result<(), Error>;
70

            
71
    /// Returns the receiver to receive [`Message`]s.
72
    fn receiver(&self) -> &Receiver;
73
}
74

            
75
/// Publishes and Subscribes to messages on topics.
76
#[async_trait]
77
pub trait AsyncPubSub: Send + Sync {
78
    /// The Subscriber type for this `PubSub` connection.
79
    type Subscriber: AsyncSubscriber;
80

            
81
    /// Create a new [`Subscriber`] for this relay.
82
    async fn create_subscriber(&self) -> Result<Self::Subscriber, Error>;
83

            
84
    /// Publishes a `payload` to all subscribers of `topic`.
85
60
    async fn publish<Topic: Serialize + Send + Sync, Payload: Serialize + Send + Sync>(
86
60
        &self,
87
60
        topic: &Topic,
88
60
        payload: &Payload,
89
60
    ) -> Result<(), Error> {
90
60
        let topic = pot::to_vec(topic)?;
91
60
        let payload = pot::to_vec(payload)?;
92
60
        self.publish_bytes(topic, payload).await
93
180
    }
94

            
95
    /// Publishes a `payload` to all subscribers of `topic`.
96
    async fn publish_bytes(&self, topic: Vec<u8>, payload: Vec<u8>) -> Result<(), Error>;
97

            
98
    /// Publishes a `payload` to all subscribers of all `topics`.
99
5
    async fn publish_to_all<
100
5
        'topics,
101
5
        Topics: IntoIterator<Item = &'topics Topic> + Send + 'topics,
102
5
        Topic: Serialize + Send + 'topics,
103
5
        Payload: Serialize + Send + Sync,
104
5
    >(
105
5
        &self,
106
5
        topics: Topics,
107
5
        payload: &Payload,
108
5
    ) -> Result<(), Error> {
109
5
        let topics = topics
110
5
            .into_iter()
111
15
            .map(|topic| pot::to_vec(topic))
112
5
            .collect::<Result<Vec<_>, _>>()?;
113
5
        self.publish_bytes_to_all(topics, pot::to_vec(payload)?)
114
2
            .await
115
15
    }
116

            
117
    /// Publishes a `payload` to all subscribers of all `topics`.
118
    async fn publish_bytes_to_all(
119
        &self,
120
        topics: impl IntoIterator<Item = Vec<u8>> + Send + 'async_trait,
121
        payload: Vec<u8>,
122
    ) -> Result<(), Error>;
123
}
124

            
125
/// A subscriber to one or more topics.
126
#[async_trait]
127
pub trait AsyncSubscriber: Send + Sync {
128
    /// Subscribe to [`Message`]s published to `topic`.
129
67
    async fn subscribe_to<Topic: Serialize + Send + Sync>(
130
67
        &self,
131
67
        topic: &Topic,
132
67
    ) -> Result<(), Error> {
133
67
        self.subscribe_to_bytes(pot::to_vec(topic)?).await
134
201
    }
135

            
136
    /// Subscribe to [`Message`]s published to `topic`.
137
    async fn subscribe_to_bytes(&self, topic: Vec<u8>) -> Result<(), Error>;
138

            
139
    /// Unsubscribe from [`Message`]s published to `topic`.
140
5
    async fn unsubscribe_from<Topic: Serialize + Send + Sync>(
141
5
        &self,
142
5
        topic: &Topic,
143
5
    ) -> Result<(), Error> {
144
5
        self.unsubscribe_from_bytes(&pot::to_vec(topic)?).await
145
15
    }
146

            
147
    /// Unsubscribe from [`Message`]s published to `topic`.
148
    async fn unsubscribe_from_bytes(&self, topic: &[u8]) -> Result<(), Error>;
149

            
150
    /// Returns the receiver to receive [`Message`]s.
151
    fn receiver(&self) -> &Receiver;
152
}
153

            
154
/// Receiver of PubSub [`Message`]s.
155
40
#[derive(Clone, Debug)]
156
#[must_use]
157
pub struct Receiver {
158
    receiver: flume::Receiver<Message>,
159
    strip_database: bool,
160
}
161

            
162
impl Receiver {
163
    #[doc(hidden)]
164
2720
    pub fn new_stripping_prefixes(receiver: flume::Receiver<Message>) -> Self {
165
2720
        Self {
166
2720
            receiver,
167
2720
            strip_database: true,
168
2720
        }
169
2720
    }
170

            
171
    #[doc(hidden)]
172
960
    pub fn new(receiver: flume::Receiver<Message>) -> Self {
173
960
        Self {
174
960
            receiver,
175
960
            strip_database: false,
176
960
        }
177
960
    }
178

            
179
    /// Receive the next [`Message`]. Blocks the current thread until a message
180
    /// is available. If the receiver becomes disconnected, an error will be
181
    /// returned.
182
2320
    pub fn receive(&self) -> Result<Message, Disconnected> {
183
2320
        self.receiver
184
2320
            .recv()
185
2320
            .map(|message| self.remove_database_prefix(message))
186
2320
            .map_err(|_| Disconnected)
187
2320
    }
188

            
189
    /// Receive the next [`Message`]. Blocks the current task until a new
190
    /// message is available. If the receiver becomes disconnected, an error
191
    /// will be returned.
192
6360
    pub async fn receive_async(&self) -> Result<Message, Disconnected> {
193
159
        self.receiver
194
159
            .recv_async()
195
61
            .await
196
145
            .map(|message| self.remove_database_prefix(message))
197
145
            .map_err(|_| Disconnected)
198
145
    }
199

            
200
    /// Try to receive the next [`Message`]. This function will not block, and
201
    /// only returns a message if one is already available.
202
1280
    pub fn try_receive(&self) -> Result<Message, TryReceiveError> {
203
1280
        self.receiver
204
1280
            .try_recv()
205
1280
            .map(|message| self.remove_database_prefix(message))
206
1280
            .map_err(TryReceiveError::from)
207
1280
    }
208

            
209
7400
    fn remove_database_prefix(&self, mut message: Message) -> Message {
210
7400
        if self.strip_database {
211
83080
            if let Some(database_length) = message.topic.iter().position(|b| b == 0) {
212
5600
                message.topic.0.read_bytes(database_length + 1).unwrap();
213
5600
            }
214
1800
        }
215

            
216
7400
        message
217
7400
    }
218
}
219

            
220
impl Iterator for Receiver {
221
    type Item = Message;
222

            
223
    fn next(&mut self) -> Option<Self::Item> {
224
        self.receive().ok()
225
    }
226
}
227

            
228
/// The [`Receiver`] was disconnected
229
#[derive(thiserror::Error, Debug, Clone, Eq, PartialEq)]
230
#[error("the receiver is disconnected")]
231
pub struct Disconnected;
232

            
233
/// An error occurred trying to receive a message.
234
#[derive(thiserror::Error, Debug, Clone, Eq, PartialEq)]
235
pub enum TryReceiveError {
236
    /// The receiver was disconnected
237
    #[error("the receiver is disconnected")]
238
    Disconnected,
239
    /// No message was avaiable
240
    #[error("the receiver was empty")]
241
    Empty,
242
}
243

            
244
impl From<flume::TryRecvError> for TryReceiveError {
245
1280
    fn from(err: flume::TryRecvError) -> Self {
246
1280
        match err {
247
1280
            flume::TryRecvError::Empty => Self::Empty,
248
            flume::TryRecvError::Disconnected => Self::Disconnected,
249
        }
250
1280
    }
251
}
252

            
253
/// Creates a topic for use in a server. This is an internal API, which is why
254
/// the documentation is hidden. This is an implementation detail, but both
255
/// Client and Server must agree on this format, which is why it lives in core.
256
#[doc(hidden)]
257
#[must_use]
258
9640
pub fn database_topic(database: &str, topic: &[u8]) -> Vec<u8> {
259
9640
    let mut namespaced_topic = Vec::with_capacity(database.len() + topic.len() + 1);
260
9640

            
261
9640
    namespaced_topic.extend(database.bytes());
262
9640
    namespaced_topic.push(b'\0');
263
9640
    namespaced_topic.extend(topic);
264
9640

            
265
9640
    namespaced_topic
266
9640
}
267

            
268
/// Expands into a suite of pubsub unit tests using the passed type as the test harness.
269
#[cfg(feature = "test-util")]
270
#[macro_export]
271
macro_rules! define_async_pubsub_test_suite {
272
    ($harness:ident) => {
273
        #[cfg(test)]
274
        mod r#async_pubsub {
275
            use $crate::pubsub::{AsyncPubSub, AsyncSubscriber};
276

            
277
            use super::$harness;
278
            #[tokio::test]
279
5
            async fn simple_pubsub_test() -> anyhow::Result<()> {
280
5
                let harness = $harness::new($crate::test_util::HarnessTest::PubSubSimple).await?;
281
5
                let pubsub = harness.connect().await?;
282
5
                let subscriber = AsyncPubSub::create_subscriber(&pubsub).await?;
283
5
                AsyncSubscriber::subscribe_to(&subscriber, &"mytopic").await?;
284
5
                AsyncPubSub::publish(&pubsub, &"mytopic", &String::from("test")).await?;
285
5
                AsyncPubSub::publish(&pubsub, &"othertopic", &String::from("test")).await?;
286
5
                let receiver = subscriber.receiver().clone();
287
5
                let message = receiver.receive_async().await.expect("No message received");
288
5
                assert_eq!(message.topic::<String>()?, "mytopic");
289
5
                assert_eq!(message.payload::<String>()?, "test");
290
5
                // The message should only be received once.
291
5
                assert!(matches!(
292
5
                    receiver.try_receive(),
293
5
                    Err($crate::pubsub::TryReceiveError::Empty)
294
5
                ));
295
5
                Ok(())
296
5
            }
297

            
298
            #[tokio::test]
299
5
            async fn multiple_subscribers_test() -> anyhow::Result<()> {
300
5
                let harness =
301
5
                    $harness::new($crate::test_util::HarnessTest::PubSubMultipleSubscribers)
302
5
                        .await?;
303
5
                let pubsub = harness.connect().await?;
304
5
                let subscriber_a = AsyncPubSub::create_subscriber(&pubsub).await?;
305
5
                let subscriber_ab = AsyncPubSub::create_subscriber(&pubsub).await?;
306
5
                AsyncSubscriber::subscribe_to(&subscriber_a, &"a").await?;
307
5
                AsyncSubscriber::subscribe_to(&subscriber_ab, &"a").await?;
308
5
                AsyncSubscriber::subscribe_to(&subscriber_ab, &"b").await?;
309
5

            
310
5
                let mut messages_a = Vec::new();
311
5
                let mut messages_ab = Vec::new();
312
5
                AsyncPubSub::publish(&pubsub, &"a", &String::from("a1")).await?;
313
5
                messages_a.push(
314
5
                    subscriber_a
315
5
                        .receiver()
316
5
                        .receive_async()
317
5
                        .await?
318
5
                        .payload::<String>()?,
319
5
                );
320
5
                messages_ab.push(
321
5
                    subscriber_ab
322
5
                        .receiver()
323
5
                        .receive_async()
324
5
                        .await?
325
5
                        .payload::<String>()?,
326
5
                );
327
5

            
328
5
                AsyncPubSub::publish(&pubsub, &"b", &String::from("b1")).await?;
329
5
                messages_ab.push(
330
5
                    subscriber_ab
331
5
                        .receiver()
332
5
                        .receive_async()
333
5
                        .await?
334
5
                        .payload::<String>()?,
335
5
                );
336
5

            
337
5
                AsyncPubSub::publish(&pubsub, &"a", &String::from("a2")).await?;
338
5
                messages_a.push(
339
5
                    subscriber_a
340
5
                        .receiver()
341
5
                        .receive_async()
342
5
                        .await?
343
5
                        .payload::<String>()?,
344
5
                );
345
5
                messages_ab.push(
346
5
                    subscriber_ab
347
5
                        .receiver()
348
5
                        .receive_async()
349
5
                        .await?
350
5
                        .payload::<String>()?,
351
5
                );
352
5

            
353
5
                assert_eq!(&messages_a[0], "a1");
354
5
                assert_eq!(&messages_a[1], "a2");
355
5

            
356
5
                assert_eq!(&messages_ab[0], "a1");
357
5
                assert_eq!(&messages_ab[1], "b1");
358
5
                assert_eq!(&messages_ab[2], "a2");
359
5

            
360
5
                Ok(())
361
5
            }
362

            
363
            #[tokio::test]
364
5
            async fn unsubscribe_test() -> anyhow::Result<()> {
365
5
                let harness =
366
5
                    $harness::new($crate::test_util::HarnessTest::PubSubUnsubscribe).await?;
367
5
                let pubsub = harness.connect().await?;
368
5
                let subscriber = AsyncPubSub::create_subscriber(&pubsub).await?;
369
5
                AsyncSubscriber::subscribe_to(&subscriber, &"a").await?;
370
5

            
371
5
                AsyncPubSub::publish(&pubsub, &"a", &String::from("a1")).await?;
372
5
                AsyncSubscriber::unsubscribe_from(&subscriber, &"a").await?;
373
5
                AsyncPubSub::publish(&pubsub, &"a", &String::from("a2")).await?;
374
5
                AsyncSubscriber::subscribe_to(&subscriber, &"a").await?;
375
5
                AsyncPubSub::publish(&pubsub, &"a", &String::from("a3")).await?;
376
5

            
377
5
                // Check subscriber_a for a1 and a2.
378
5
                let message = subscriber.receiver().receive_async().await?;
379
5
                assert_eq!(message.payload::<String>()?, "a1");
380
5
                let message = subscriber.receiver().receive_async().await?;
381
5
                assert_eq!(message.payload::<String>()?, "a3");
382
5

            
383
5
                Ok(())
384
5
            }
385

            
386
            #[tokio::test]
387
5
            async fn pubsub_drop_cleanup_test() -> anyhow::Result<()> {
388
5
                let harness =
389
5
                    $harness::new($crate::test_util::HarnessTest::PubSubDropCleanup).await?;
390
5
                let pubsub = harness.connect().await?;
391
5
                let subscriber = AsyncPubSub::create_subscriber(&pubsub).await?;
392
5
                AsyncSubscriber::subscribe_to(&subscriber, &"a").await?;
393
5

            
394
5
                AsyncPubSub::publish(&pubsub, &"a", &String::from("a1")).await?;
395
5
                let receiver = subscriber.receiver().clone();
396
5
                drop(subscriber);
397
5

            
398
5
                // The receiver should now be disconnected, but after receiving the
399
5
                // first message. For when we're testing network connections, we
400
5
                // need to insert a little delay here to allow the server to process
401
5
                // the drop.
402
5
                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
403
5

            
404
5
                AsyncPubSub::publish(&pubsub, &"a", &String::from("a1")).await?;
405
5

            
406
5
                let message = receiver.receive_async().await?;
407
5
                assert_eq!(message.payload::<String>()?, "a1");
408
5
                let $crate::pubsub::Disconnected = receiver.receive_async().await.unwrap_err();
409
5

            
410
5
                Ok(())
411
5
            }
412

            
413
            #[tokio::test]
414
5
            async fn publish_to_all_test() -> anyhow::Result<()> {
415
5
                let harness =
416
5
                    $harness::new($crate::test_util::HarnessTest::PubSubPublishAll).await?;
417
5
                let pubsub = harness.connect().await?;
418
5
                let subscriber_a = AsyncPubSub::create_subscriber(&pubsub).await?;
419
5
                let subscriber_b = AsyncPubSub::create_subscriber(&pubsub).await?;
420
5
                let subscriber_c = AsyncPubSub::create_subscriber(&pubsub).await?;
421
5
                AsyncSubscriber::subscribe_to(&subscriber_a, &"1").await?;
422
5
                AsyncSubscriber::subscribe_to(&subscriber_b, &"1").await?;
423
5
                AsyncSubscriber::subscribe_to(&subscriber_b, &"2").await?;
424
5
                AsyncSubscriber::subscribe_to(&subscriber_c, &"2").await?;
425
5
                AsyncSubscriber::subscribe_to(&subscriber_a, &"3").await?;
426
5
                AsyncSubscriber::subscribe_to(&subscriber_c, &"3").await?;
427
5

            
428
5
                AsyncPubSub::publish_to_all(&pubsub, [&"1", &"2", &"3"], &String::from("1"))
429
5
                    .await?;
430
5

            
431
5
                // Each subscriber should get "1" twice on separate topics
432
5
                for subscriber in &[subscriber_a, subscriber_b, subscriber_c] {
433
5
                    let mut message_topics = Vec::new();
434
5
                    for _ in 0..2_u8 {
435
5
                        let message = subscriber.receiver().receive_async().await?;
436
5
                        assert_eq!(message.payload::<String>()?, "1");
437
5
                        message_topics.push(message.topic.clone());
438
5
                    }
439
5
                    assert!(matches!(
440
5
                        subscriber.receiver().try_receive(),
441
5
                        Err($crate::pubsub::TryReceiveError::Empty)
442
5
                    ));
443
5
                    assert!(message_topics[0] != message_topics[1]);
444
5
                }
445
5

            
446
5
                Ok(())
447
5
            }
448
        }
449
    };
450
}
451

            
452
/// Expands into a suite of pubsub unit tests using the passed type as the test harness.
453
#[cfg(feature = "test-util")]
454
#[macro_export]
455
macro_rules! define_blocking_pubsub_test_suite {
456
    ($harness:ident) => {
457
        #[cfg(test)]
458
        mod blocking_pubsub {
459
            use $crate::pubsub::{PubSub, Subscriber};
460

            
461
            use super::$harness;
462
            #[test]
463
3
            fn simple_pubsub_test() -> anyhow::Result<()> {
464
3
                let harness = $harness::new($crate::test_util::HarnessTest::PubSubSimple)?;
465
3
                let pubsub = harness.connect()?;
466
3
                let subscriber = PubSub::create_subscriber(&pubsub)?;
467
3
                Subscriber::subscribe_to(&subscriber, &"mytopic")?;
468
3
                PubSub::publish(&pubsub, &"mytopic", &String::from("test"))?;
469
3
                PubSub::publish(&pubsub, &"othertopic", &String::from("test"))?;
470
3
                let receiver = subscriber.receiver().clone();
471
3
                let message = receiver.receive().expect("No message received");
472
3
                assert_eq!(message.topic::<String>()?, "mytopic");
473
3
                assert_eq!(message.payload::<String>()?, "test");
474
                // The message should only be received once.
475
                assert!(matches!(
476
3
                    receiver.try_receive(),
477
                    Err($crate::pubsub::TryReceiveError::Empty)
478
                ));
479
3
                Ok(())
480
3
            }
481

            
482
            #[test]
483
3
            fn multiple_subscribers_test() -> anyhow::Result<()> {
484
3
                let harness =
485
3
                    $harness::new($crate::test_util::HarnessTest::PubSubMultipleSubscribers)?;
486
3
                let pubsub = harness.connect()?;
487
3
                let subscriber_a = PubSub::create_subscriber(&pubsub)?;
488
3
                let subscriber_ab = PubSub::create_subscriber(&pubsub)?;
489
3
                Subscriber::subscribe_to(&subscriber_a, &"a")?;
490
3
                Subscriber::subscribe_to(&subscriber_ab, &"a")?;
491
3
                Subscriber::subscribe_to(&subscriber_ab, &"b")?;
492

            
493
3
                let mut messages_a = Vec::new();
494
3
                let mut messages_ab = Vec::new();
495
3
                PubSub::publish(&pubsub, &"a", &String::from("a1"))?;
496
3
                messages_a.push(subscriber_a.receiver().receive()?.payload::<String>()?);
497
3
                messages_ab.push(subscriber_ab.receiver().receive()?.payload::<String>()?);
498

            
499
3
                PubSub::publish(&pubsub, &"b", &String::from("b1"))?;
500
3
                messages_ab.push(subscriber_ab.receiver().receive()?.payload::<String>()?);
501

            
502
3
                PubSub::publish(&pubsub, &"a", &String::from("a2"))?;
503
3
                messages_a.push(subscriber_a.receiver().receive()?.payload::<String>()?);
504
3
                messages_ab.push(subscriber_ab.receiver().receive()?.payload::<String>()?);
505

            
506
3
                assert_eq!(&messages_a[0], "a1");
507
3
                assert_eq!(&messages_a[1], "a2");
508

            
509
3
                assert_eq!(&messages_ab[0], "a1");
510
3
                assert_eq!(&messages_ab[1], "b1");
511
3
                assert_eq!(&messages_ab[2], "a2");
512

            
513
3
                Ok(())
514
3
            }
515

            
516
            #[test]
517
3
            fn unsubscribe_test() -> anyhow::Result<()> {
518
3
                let harness = $harness::new($crate::test_util::HarnessTest::PubSubUnsubscribe)?;
519
3
                let pubsub = harness.connect()?;
520
3
                let subscriber = PubSub::create_subscriber(&pubsub)?;
521
3
                Subscriber::subscribe_to(&subscriber, &"a")?;
522

            
523
3
                PubSub::publish(&pubsub, &"a", &String::from("a1"))?;
524
3
                Subscriber::unsubscribe_from(&subscriber, &"a")?;
525
3
                PubSub::publish(&pubsub, &"a", &String::from("a2"))?;
526
3
                Subscriber::subscribe_to(&subscriber, &"a")?;
527
3
                PubSub::publish(&pubsub, &"a", &String::from("a3"))?;
528

            
529
                // Check subscriber_a for a1 and a2.
530
3
                let message = subscriber.receiver().receive()?;
531
3
                assert_eq!(message.payload::<String>()?, "a1");
532
3
                let message = subscriber.receiver().receive()?;
533
3
                assert_eq!(message.payload::<String>()?, "a3");
534

            
535
3
                Ok(())
536
3
            }
537

            
538
            #[test]
539
3
            fn pubsub_drop_cleanup_test() -> anyhow::Result<()> {
540
3
                let harness = $harness::new($crate::test_util::HarnessTest::PubSubDropCleanup)?;
541
3
                let pubsub = harness.connect()?;
542
3
                let subscriber = PubSub::create_subscriber(&pubsub)?;
543
3
                Subscriber::subscribe_to(&subscriber, &"a")?;
544

            
545
3
                PubSub::publish(&pubsub, &"a", &String::from("a1"))?;
546
3
                let receiver = subscriber.receiver().clone();
547
3
                drop(subscriber);
548
3

            
549
3
                // The receiver should now be disconnected, but after receiving the
550
3
                // first message. For when we're testing network connections, we
551
3
                // need to insert a little delay here to allow the server to process
552
3
                // the drop.
553
3
                std::thread::sleep(std::time::Duration::from_millis(100));
554
3

            
555
3
                PubSub::publish(&pubsub, &"a", &String::from("a1"))?;
556

            
557
3
                let message = receiver.receive()?;
558
3
                assert_eq!(message.payload::<String>()?, "a1");
559
3
                let $crate::pubsub::Disconnected = receiver.receive().unwrap_err();
560
3

            
561
3
                Ok(())
562
3
            }
563

            
564
            #[test]
565
3
            fn publish_to_all_test() -> anyhow::Result<()> {
566
3
                let harness = $harness::new($crate::test_util::HarnessTest::PubSubPublishAll)?;
567
3
                let pubsub = harness.connect()?;
568
3
                let subscriber_a = PubSub::create_subscriber(&pubsub)?;
569
3
                let subscriber_b = PubSub::create_subscriber(&pubsub)?;
570
3
                let subscriber_c = PubSub::create_subscriber(&pubsub)?;
571
3
                Subscriber::subscribe_to(&subscriber_a, &"1")?;
572
3
                Subscriber::subscribe_to(&subscriber_b, &"1")?;
573
3
                Subscriber::subscribe_to(&subscriber_b, &"2")?;
574
3
                Subscriber::subscribe_to(&subscriber_c, &"2")?;
575
3
                Subscriber::subscribe_to(&subscriber_a, &"3")?;
576
3
                Subscriber::subscribe_to(&subscriber_c, &"3")?;
577

            
578
3
                PubSub::publish_to_all(&pubsub, [&"1", &"2", &"3"], &String::from("1"))?;
579

            
580
                // Each subscriber should get "1" twice on separate topics
581
9
                for subscriber in &[subscriber_a, subscriber_b, subscriber_c] {
582
9
                    let mut message_topics = Vec::new();
583
27
                    for _ in 0..2_u8 {
584
18
                        let message = subscriber.receiver().receive()?;
585
18
                        assert_eq!(message.payload::<String>()?, "1");
586
18
                        message_topics.push(message.topic.clone());
587
                    }
588
                    assert!(matches!(
589
9
                        subscriber.receiver().try_receive(),
590
                        Err($crate::pubsub::TryReceiveError::Empty)
591
                    ));
592
9
                    assert!(message_topics[0] != message_topics[1]);
593
                }
594

            
595
3
                Ok(())
596
3
            }
597
        }
598
    };
599
}