summaryrefslogtreecommitdiff
path: root/src/broadcast.rs
blob: 5389b0c219eae8987defce6384628ec0684ae936 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
use chan;
use chan::{Sender, Receiver};


/// Retain a list of all peers that should receive the incoming message.
pub struct Broadcast<A: Clone> {
    peers: Vec<Sender<A>>,
    rx:    Receiver<A>
}

impl<A: Clone> Broadcast<A> {
    /// Instantiate a new broadcaster for the given `Receiver`.
    pub fn new(rx: Receiver<A>) -> Broadcast<A> {
        Broadcast { peers: vec![], rx: rx }
    }

    /// Start receiving broadcasting messages and forwarding each to the list
    /// of peers.
    pub fn start(&self) {
        loop {
            self.rx.recv().map(|a| {
                for subscriber in &self.peers {
                    subscriber.send(a.clone());
                }
            });
        }
    }

    /// Add a new subscriber to the list of peers that will receive the broadcast
    /// messages.
    pub fn subscribe(&mut self) -> Receiver<A> {
        let (tx, rx) = chan::sync::<A>(0);
        self.peers.push(tx);
        rx
    }
}


#[cfg(test)]
mod tests {
    use chan;
    use std::thread;

    use super::*;


    #[test]
    fn test_broadcasts_events() {
        let (tx, rx)      = chan::sync(0);
        let mut broadcast = Broadcast::new(rx);

        let a = broadcast.subscribe();
        let b = broadcast.subscribe();
        thread::spawn(move || broadcast.start());

        tx.send(123);
        assert_eq!(123, a.recv().unwrap());
        assert_eq!(123, b.recv().unwrap());
    }
}