blob: 35f3cedc6877f7b61c54a1e7104773da1d24985b (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
import asyncio
import blinker
def test_send_async():
calls = []
@asyncio.coroutine
def receiver_a(sender):
calls.append(receiver_a)
return 'value a'
@asyncio.coroutine
def receiver_b(sender):
calls.append(receiver_b)
return 'value b'
def receiver_c(sender):
calls.append(receiver_c)
return 'value c'
sig = blinker.Signal()
sig.connect(receiver_a)
sig.connect(receiver_b)
sig.connect(receiver_c)
@asyncio.coroutine
def collect():
return sig.send_async()
loop = asyncio.get_event_loop()
results = loop.run_until_complete(collect())
expected = {
receiver_a: 'value a',
receiver_b: 'value b',
receiver_c: 'value c',
}
assert set(calls) == set(expected.keys())
collected_results = {v.result() for r, v in results}
assert collected_results == set(expected.values())
|