1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
import unittest
import sys
from eventlet.coros import event, spawn_link
from eventlet.api import spawn, sleep, GreenletExit
class TestEvent(unittest.TestCase):
def test_send_exc(self):
log = []
e = event()
def waiter():
try:
result = e.wait()
log.append(('received', result))
except Exception, ex:
log.append(('catched', type(ex).__name__))
spawn(waiter)
sleep(0) # let waiter to block on e.wait()
e.send(exc=Exception())
sleep(0)
assert log == [('catched', 'Exception')], log
class TestSpawnLink(unittest.TestCase):
def test_simple_return(self):
res = spawn_link(lambda: 25).wait()
assert res==25, res
def test_exception(self):
try:
spawn_link(sys.exit, 'bye').wait()
except SystemExit, ex:
assert ex.args == ('bye', )
else:
assert False, "Shouldn't get there"
def _test_kill(self, sync):
def func():
sleep(0.1)
return 101
res = spawn_link(func)
if sync:
res.kill()
else:
spawn(res.kill)
wait_result = res.wait()
assert isinstance(wait_result, GreenletExit), `wait_result`
def test_kill_sync(self):
return self._test_kill(True)
def test_kill_async(self):
return self._test_kill(False)
if __name__=='__main__':
unittest.main()
|