diff options
author | Denis Bilenko <denis@ag-projects.com> | 2009-01-12 15:51:22 +0600 |
---|---|---|
committer | Denis Bilenko <denis@ag-projects.com> | 2009-01-12 15:51:22 +0600 |
commit | aaf2a9fda3b947d1df08b85fa9951598e721a928 (patch) | |
tree | 9b08403033c533bc32b35ef04095ead72b7d1c81 /greentest | |
parent | 19f21172b9fa09f90387d15e88111e87c2c58b80 (diff) | |
download | eventlet-aaf2a9fda3b947d1df08b85fa9951598e721a928.tar.gz |
simplifed and rewrote proc module; the interface is mostly the same except
* notification happens in the main loop greenlet - no blocking stuff here is allowed
* callbacks now called with different args; events and greenlets are notified the same way as before
* GreenletExit is not converted to return value anymore; it's returned like any other exception
* weak links aren't supported ("weak" argument is gone)
* link_raise renamed to link_exception
* link_return renamed to link_value
* calling link_value does not remove "exception" link anymore and the other way around is also true
* link() will to try to perform notification immediatelly even if the result is available
* obscure function forward is gone
* ProcKilled renamed to ProcExit
compatible changes:
* Source class is added that maintans the links and delivers the result
Diffstat (limited to 'greentest')
-rw-r--r-- | greentest/test__proc.py | 225 |
1 files changed, 150 insertions, 75 deletions
diff --git a/greentest/test__proc.py b/greentest/test__proc.py index 46244a4..2c34398 100644 --- a/greentest/test__proc.py +++ b/greentest/test__proc.py @@ -20,19 +20,92 @@ from __future__ import with_statement import sys -from twisted.internet import reactor import unittest from eventlet.api import sleep, timeout -from eventlet import proc, coros +from eventlet import api, proc, coros -DELAY= 0.001 +DELAY = 0.01 -class TestCase(unittest.TestCase): +class LimitedTestCase(unittest.TestCase): + + def setUp(self): + self.timer = api.exc_after(1, api.TimeoutError('test is taking too long')) + + def tearDown(self): + self.timer.cancel() + +class TestEventSource(LimitedTestCase): + + def test_send(self): + s = proc.Source() + q1, q2, q3 = coros.queue(), coros.queue(), coros.queue() + s.link_value(q1) + assert s.wait(0) is None + assert s.wait(0.001, None) is None + s.send(1) + assert not q1.ready() + assert s.wait()==1 + api.sleep(0) + assert q1.ready() + s.link_exception(q2) + s.link(q3) + assert not q2.ready() + api.sleep(0) + assert q3.ready() + assert s.wait()==1 + + def test_send_exception(self): + s = proc.Source() + q1, q2, q3 = coros.queue(), coros.queue(), coros.queue() + s.link_exception(q1) + s.send_exception(OSError('hello')) + api.sleep(0) + assert q1.ready() + s.link_value(q2) + s.link(q3) + assert not q2.ready() + api.sleep(0) + assert q3.ready() + self.assertRaises(OSError, q1.wait) + self.assertRaises(OSError, q3.wait) + self.assertRaises(OSError, s.wait) + + +class SimpleTestProc(LimitedTestCase): + + def test_proc(self): + p = proc.spawn(lambda : 100) + receiver = proc.spawn(api.sleep, 1) + p.link(receiver) + self.assertRaises(proc.LinkedCompleted, receiver.wait) + receiver2 = proc.spawn(api.sleep, 1) + p.link(receiver2) + self.assertRaises(proc.LinkedCompleted, receiver2.wait) + + def test_event(self): + p = proc.spawn(lambda : 100) + event = coros.event() + p.link(event) + self.assertEqual(event.wait(), 100) + + for i in xrange(3): + event2 = coros.event() + p.link(event2) + self.assertEqual(event2.wait(), 100) + + def test_current(self): + p = proc.spawn(lambda : 100) + p.link() + self.assertRaises(proc.LinkedCompleted, sleep, 0.1) + + +class TestCase(LimitedTestCase): def link(self, p, listener=None): getattr(p, self.link_method)(listener) def tearDown(self): + LimitedTestCase.tearDown(self) self.p.unlink() def set_links(self, p, first_time, kill_exc_type): @@ -51,6 +124,7 @@ class TestCase(unittest.TestCase): try: self.link(p) + api.sleep(0) except kill_exc_type: if first_time: raise @@ -100,30 +174,23 @@ class TestCase(unittest.TestCase): class TestReturn_link(TestCase): link_method = 'link' - def test_kill(self): - p = self.p = proc.spawn(sleep, DELAY) - self._test_return(p, True, proc.ProcKilled, proc.LinkedKilled, p.kill) - # repeating the same with dead process - for _ in xrange(3): - self._test_return(p, False, proc.ProcKilled, proc.LinkedKilled, p.kill) - def test_return(self): - p = self.p = proc.spawn(lambda : 25) - self._test_return(p, True, int, proc.LinkedCompleted, lambda : sleep(0)) + def return25(): + return 25 + p = self.p = proc.spawn(return25) + self._test_return(p, True, 25, proc.LinkedCompleted, lambda : sleep(0)) # repeating the same with dead process for _ in xrange(3): - self._test_return(p, False, int, proc.LinkedCompleted, lambda : sleep(0)) + self._test_return(p, False, 25, proc.LinkedCompleted, lambda : sleep(0)) - def _test_return(self, p, first_time, result_type, kill_exc_type, action): + def _test_return(self, p, first_time, result, kill_exc_type, action): event, receiver, proc_flag, queue, callback_flag = self.set_links(p, first_time, kill_exc_type) # stuff that will time out because there's no unhandled exception: - #link_raise_event, link_raise_receiver, link_raise_flag, link_raise_queue = self.set_links_timeout(p.link_raise) - xxxxx = self.set_links_timeout(p.link_raise) + xxxxx = self.set_links_timeout(p.link_exception) - action() try: - sleep(DELAY) + sleep(DELAY*2) except kill_exc_type: assert first_time, 'raising here only first time' else: @@ -131,30 +198,28 @@ class TestReturn_link(TestCase): assert not p, p - with timeout(DELAY): - event_result = event.wait() - queue_result = queue.wait() - proc_result = proc.wait(receiver) + self.assertEqual(event.wait(), result) + self.assertEqual(queue.wait(), result) + self.assertRaises(kill_exc_type, receiver.wait) + self.assertRaises(kill_exc_type, proc.wait, receiver) - assert isinstance(event_result, result_type), repr(event_result) - assert isinstance(proc_result, kill_exc_type), repr(proc_result) sleep(DELAY) assert not proc_flag, proc_flag assert not callback_flag, callback_flag self.check_timed_out(*xxxxx) - -class TestReturn_link_return(TestReturn_link): + +class TestReturn_link_value(TestReturn_link): sync = False - link_method = 'link_return' + link_method = 'link_value' class TestRaise_link(TestCase): link_method = 'link' - def _test_raise(self, p, first_time, kill_exc_type=proc.LinkedFailed): + def _test_raise(self, p, first_time, kill_exc_type): event, receiver, proc_flag, queue, callback_flag = self.set_links(p, first_time, kill_exc_type) - xxxxx = self.set_links_timeout(p.link_return) + xxxxx = self.set_links_timeout(p.link_value) try: sleep(DELAY) @@ -168,9 +233,7 @@ class TestRaise_link(TestCase): with timeout(DELAY): self.assertRaises(ValueError, event.wait) self.assertRaises(ValueError, queue.wait) - proc_result = proc.wait(receiver) - - assert isinstance(proc_result, kill_exc_type), repr(proc_result) + self.assertRaises(kill_exc_type, proc.wait, receiver) sleep(DELAY) assert not proc_flag, proc_flag assert not callback_flag, callback_flag @@ -179,13 +242,45 @@ class TestRaise_link(TestCase): def test_raise(self): p = self.p = proc.spawn(int, 'badint') - self._test_raise(p, True) + self._test_raise(p, True, proc.LinkedFailed) + # repeating the same with dead process + for _ in xrange(3): + self._test_raise(p, False, proc.LinkedFailed) + + def _test_kill(self, p, first_time, kill_exc_type): + event, receiver, proc_flag, queue, callback_flag = self.set_links(p, first_time, kill_exc_type) + xxxxx = self.set_links_timeout(p.link_value) + + p.kill() + try: + sleep(DELAY) + except kill_exc_type: + assert first_time, 'raising here only first time' + else: + assert not first_time, 'Should not raise LinkedKilled here after first time' + + assert not p, p + + with timeout(DELAY): + self.assertRaises(proc.ProcExit, event.wait) + self.assertRaises(proc.ProcExit, queue.wait) + self.assertRaises(kill_exc_type, proc.wait, receiver) + + sleep(DELAY) + assert not proc_flag, proc_flag + assert not callback_flag, callback_flag + + self.check_timed_out(*xxxxx) + + def test_kill(self): + p = self.p = proc.spawn(sleep, DELAY) + self._test_kill(p, True, proc.LinkedKilled) # repeating the same with dead process for _ in xrange(3): - self._test_raise(p, False) + self._test_kill(p, False, proc.LinkedKilled) -class TestRaise_link_raise(TestCase): - link_method = 'link_raise' +class TestRaise_link_exception(TestCase): + link_method = 'link_exception' class TestStuff(unittest.TestCase): @@ -195,6 +290,13 @@ class TestStuff(unittest.TestCase): y = proc.spawn(lambda : 2) z = proc.spawn(lambda : 3) self.assertEqual(proc.wait([x, y, z]), [1, 2, 3]) + e = coros.event() + x.link(e) + self.assertEqual(e.wait(), 1) + x.unlink(e) + e = coros.event() + x.link(e) + self.assertEqual(e.wait(), 1) self.assertEqual([proc.wait(X) for X in [x, y, z]], [1, 2, 3]) def test_wait_error(self): @@ -209,7 +311,7 @@ class TestStuff(unittest.TestCase): y.link(z) z.link(y) self.assertRaises(ValueError, proc.wait, [x, y, z]) - assert isinstance(proc.wait(x), proc.LinkedFailed), repr(proc.wait(x)) + self.assertRaises(proc.LinkedFailed, proc.wait, x) self.assertEqual(proc.wait(z), 3) self.assertRaises(ValueError, proc.wait, y) @@ -228,7 +330,8 @@ class TestStuff(unittest.TestCase): def test_multiple_listeners_error(self): # if there was an error while calling a callback # it should not prevent the other listeners from being called - # (but all of them should be logged, check the output that they are) + # also, all of the errors should be logged, check the output + # manually that they are p = proc.spawn(lambda : 5) results = [] def listener1(*args): @@ -242,7 +345,7 @@ class TestStuff(unittest.TestCase): p.link(listener1) p.link(listener2) p.link(listener3) - sleep(DELAY*3) + sleep(DELAY*10) assert results in [[10, 20], [20, 10]], results p = proc.spawn(int, 'hello') @@ -250,16 +353,20 @@ class TestStuff(unittest.TestCase): p.link(listener1) p.link(listener2) p.link(listener3) - sleep(DELAY*3) + sleep(DELAY*10) assert results in [[10, 20], [20, 10]], results def test_multiple_listeners_error_unlink(self): + # notification must not happen after unlink even + # though notification process has been already started p = proc.spawn(lambda : 5) results = [] def listener1(*args): + p.unlink(listener2) results.append(5) 1/0 def listener2(*args): + p.unlink(listener1) results.append(5) 2/0 def listener3(*args): @@ -267,16 +374,10 @@ class TestStuff(unittest.TestCase): p.link(listener1) p.link(listener2) p.link(listener3) - sleep(0) - # unlink one that is not fired yet - if listener1 in p._receivers: - p.unlink(listener1) - elif listener2 in p._receivers: - p.unlink(listener2) - sleep(DELAY*3) + sleep(DELAY*10) assert results == [5], results - def FAILING_test_killing_unlinked(self): + def test_killing_unlinked(self): e = coros.event() def func(): try: @@ -293,31 +394,5 @@ class TestStuff(unittest.TestCase): sleep(DELAY) -funcs_only_1arg = [lambda x: None, - lambda x=1: None] - -funcs_only_3args = [lambda x, y, z: None, - lambda x, y, z=1: None] - -funcs_any_arg = [lambda a, b=1, c=1: None, - lambda *args: None] - -class TestCallbackTypeErrors(unittest.TestCase): - - def test(self): - p = proc.spawn(lambda : None) - for func in funcs_only_1arg: - p.link_return(func) - self.assertRaises(TypeError, p.link_raise, func) - self.assertRaises(TypeError, p.link, func) - for func in funcs_only_3args: - p.link_raise(func) - self.assertRaises(TypeError, p.link_return, func) - self.assertRaises(TypeError, p.link, func) - for func in funcs_any_arg: - p.link_raise(func) - p.link_return(func) - p.link(func) - if __name__=='__main__': unittest.main() |