summaryrefslogtreecommitdiff
path: root/greentest
diff options
context:
space:
mode:
authorDenis Bilenko <denis@ag-projects.com>2009-01-12 15:51:22 +0600
committerDenis Bilenko <denis@ag-projects.com>2009-01-12 15:51:22 +0600
commitaaf2a9fda3b947d1df08b85fa9951598e721a928 (patch)
tree9b08403033c533bc32b35ef04095ead72b7d1c81 /greentest
parent19f21172b9fa09f90387d15e88111e87c2c58b80 (diff)
downloadeventlet-aaf2a9fda3b947d1df08b85fa9951598e721a928.tar.gz
simplifed and rewrote proc module; the interface is mostly the same except
* notification happens in the main loop greenlet - no blocking stuff here is allowed * callbacks now called with different args; events and greenlets are notified the same way as before * GreenletExit is not converted to return value anymore; it's returned like any other exception * weak links aren't supported ("weak" argument is gone) * link_raise renamed to link_exception * link_return renamed to link_value * calling link_value does not remove "exception" link anymore and the other way around is also true * link() will to try to perform notification immediatelly even if the result is available * obscure function forward is gone * ProcKilled renamed to ProcExit compatible changes: * Source class is added that maintans the links and delivers the result
Diffstat (limited to 'greentest')
-rw-r--r--greentest/test__proc.py225
1 files changed, 150 insertions, 75 deletions
diff --git a/greentest/test__proc.py b/greentest/test__proc.py
index 46244a4..2c34398 100644
--- a/greentest/test__proc.py
+++ b/greentest/test__proc.py
@@ -20,19 +20,92 @@
from __future__ import with_statement
import sys
-from twisted.internet import reactor
import unittest
from eventlet.api import sleep, timeout
-from eventlet import proc, coros
+from eventlet import api, proc, coros
-DELAY= 0.001
+DELAY = 0.01
-class TestCase(unittest.TestCase):
+class LimitedTestCase(unittest.TestCase):
+
+ def setUp(self):
+ self.timer = api.exc_after(1, api.TimeoutError('test is taking too long'))
+
+ def tearDown(self):
+ self.timer.cancel()
+
+class TestEventSource(LimitedTestCase):
+
+ def test_send(self):
+ s = proc.Source()
+ q1, q2, q3 = coros.queue(), coros.queue(), coros.queue()
+ s.link_value(q1)
+ assert s.wait(0) is None
+ assert s.wait(0.001, None) is None
+ s.send(1)
+ assert not q1.ready()
+ assert s.wait()==1
+ api.sleep(0)
+ assert q1.ready()
+ s.link_exception(q2)
+ s.link(q3)
+ assert not q2.ready()
+ api.sleep(0)
+ assert q3.ready()
+ assert s.wait()==1
+
+ def test_send_exception(self):
+ s = proc.Source()
+ q1, q2, q3 = coros.queue(), coros.queue(), coros.queue()
+ s.link_exception(q1)
+ s.send_exception(OSError('hello'))
+ api.sleep(0)
+ assert q1.ready()
+ s.link_value(q2)
+ s.link(q3)
+ assert not q2.ready()
+ api.sleep(0)
+ assert q3.ready()
+ self.assertRaises(OSError, q1.wait)
+ self.assertRaises(OSError, q3.wait)
+ self.assertRaises(OSError, s.wait)
+
+
+class SimpleTestProc(LimitedTestCase):
+
+ def test_proc(self):
+ p = proc.spawn(lambda : 100)
+ receiver = proc.spawn(api.sleep, 1)
+ p.link(receiver)
+ self.assertRaises(proc.LinkedCompleted, receiver.wait)
+ receiver2 = proc.spawn(api.sleep, 1)
+ p.link(receiver2)
+ self.assertRaises(proc.LinkedCompleted, receiver2.wait)
+
+ def test_event(self):
+ p = proc.spawn(lambda : 100)
+ event = coros.event()
+ p.link(event)
+ self.assertEqual(event.wait(), 100)
+
+ for i in xrange(3):
+ event2 = coros.event()
+ p.link(event2)
+ self.assertEqual(event2.wait(), 100)
+
+ def test_current(self):
+ p = proc.spawn(lambda : 100)
+ p.link()
+ self.assertRaises(proc.LinkedCompleted, sleep, 0.1)
+
+
+class TestCase(LimitedTestCase):
def link(self, p, listener=None):
getattr(p, self.link_method)(listener)
def tearDown(self):
+ LimitedTestCase.tearDown(self)
self.p.unlink()
def set_links(self, p, first_time, kill_exc_type):
@@ -51,6 +124,7 @@ class TestCase(unittest.TestCase):
try:
self.link(p)
+ api.sleep(0)
except kill_exc_type:
if first_time:
raise
@@ -100,30 +174,23 @@ class TestCase(unittest.TestCase):
class TestReturn_link(TestCase):
link_method = 'link'
- def test_kill(self):
- p = self.p = proc.spawn(sleep, DELAY)
- self._test_return(p, True, proc.ProcKilled, proc.LinkedKilled, p.kill)
- # repeating the same with dead process
- for _ in xrange(3):
- self._test_return(p, False, proc.ProcKilled, proc.LinkedKilled, p.kill)
-
def test_return(self):
- p = self.p = proc.spawn(lambda : 25)
- self._test_return(p, True, int, proc.LinkedCompleted, lambda : sleep(0))
+ def return25():
+ return 25
+ p = self.p = proc.spawn(return25)
+ self._test_return(p, True, 25, proc.LinkedCompleted, lambda : sleep(0))
# repeating the same with dead process
for _ in xrange(3):
- self._test_return(p, False, int, proc.LinkedCompleted, lambda : sleep(0))
+ self._test_return(p, False, 25, proc.LinkedCompleted, lambda : sleep(0))
- def _test_return(self, p, first_time, result_type, kill_exc_type, action):
+ def _test_return(self, p, first_time, result, kill_exc_type, action):
event, receiver, proc_flag, queue, callback_flag = self.set_links(p, first_time, kill_exc_type)
# stuff that will time out because there's no unhandled exception:
- #link_raise_event, link_raise_receiver, link_raise_flag, link_raise_queue = self.set_links_timeout(p.link_raise)
- xxxxx = self.set_links_timeout(p.link_raise)
+ xxxxx = self.set_links_timeout(p.link_exception)
- action()
try:
- sleep(DELAY)
+ sleep(DELAY*2)
except kill_exc_type:
assert first_time, 'raising here only first time'
else:
@@ -131,30 +198,28 @@ class TestReturn_link(TestCase):
assert not p, p
- with timeout(DELAY):
- event_result = event.wait()
- queue_result = queue.wait()
- proc_result = proc.wait(receiver)
+ self.assertEqual(event.wait(), result)
+ self.assertEqual(queue.wait(), result)
+ self.assertRaises(kill_exc_type, receiver.wait)
+ self.assertRaises(kill_exc_type, proc.wait, receiver)
- assert isinstance(event_result, result_type), repr(event_result)
- assert isinstance(proc_result, kill_exc_type), repr(proc_result)
sleep(DELAY)
assert not proc_flag, proc_flag
assert not callback_flag, callback_flag
self.check_timed_out(*xxxxx)
-
-class TestReturn_link_return(TestReturn_link):
+
+class TestReturn_link_value(TestReturn_link):
sync = False
- link_method = 'link_return'
+ link_method = 'link_value'
class TestRaise_link(TestCase):
link_method = 'link'
- def _test_raise(self, p, first_time, kill_exc_type=proc.LinkedFailed):
+ def _test_raise(self, p, first_time, kill_exc_type):
event, receiver, proc_flag, queue, callback_flag = self.set_links(p, first_time, kill_exc_type)
- xxxxx = self.set_links_timeout(p.link_return)
+ xxxxx = self.set_links_timeout(p.link_value)
try:
sleep(DELAY)
@@ -168,9 +233,7 @@ class TestRaise_link(TestCase):
with timeout(DELAY):
self.assertRaises(ValueError, event.wait)
self.assertRaises(ValueError, queue.wait)
- proc_result = proc.wait(receiver)
-
- assert isinstance(proc_result, kill_exc_type), repr(proc_result)
+ self.assertRaises(kill_exc_type, proc.wait, receiver)
sleep(DELAY)
assert not proc_flag, proc_flag
assert not callback_flag, callback_flag
@@ -179,13 +242,45 @@ class TestRaise_link(TestCase):
def test_raise(self):
p = self.p = proc.spawn(int, 'badint')
- self._test_raise(p, True)
+ self._test_raise(p, True, proc.LinkedFailed)
+ # repeating the same with dead process
+ for _ in xrange(3):
+ self._test_raise(p, False, proc.LinkedFailed)
+
+ def _test_kill(self, p, first_time, kill_exc_type):
+ event, receiver, proc_flag, queue, callback_flag = self.set_links(p, first_time, kill_exc_type)
+ xxxxx = self.set_links_timeout(p.link_value)
+
+ p.kill()
+ try:
+ sleep(DELAY)
+ except kill_exc_type:
+ assert first_time, 'raising here only first time'
+ else:
+ assert not first_time, 'Should not raise LinkedKilled here after first time'
+
+ assert not p, p
+
+ with timeout(DELAY):
+ self.assertRaises(proc.ProcExit, event.wait)
+ self.assertRaises(proc.ProcExit, queue.wait)
+ self.assertRaises(kill_exc_type, proc.wait, receiver)
+
+ sleep(DELAY)
+ assert not proc_flag, proc_flag
+ assert not callback_flag, callback_flag
+
+ self.check_timed_out(*xxxxx)
+
+ def test_kill(self):
+ p = self.p = proc.spawn(sleep, DELAY)
+ self._test_kill(p, True, proc.LinkedKilled)
# repeating the same with dead process
for _ in xrange(3):
- self._test_raise(p, False)
+ self._test_kill(p, False, proc.LinkedKilled)
-class TestRaise_link_raise(TestCase):
- link_method = 'link_raise'
+class TestRaise_link_exception(TestCase):
+ link_method = 'link_exception'
class TestStuff(unittest.TestCase):
@@ -195,6 +290,13 @@ class TestStuff(unittest.TestCase):
y = proc.spawn(lambda : 2)
z = proc.spawn(lambda : 3)
self.assertEqual(proc.wait([x, y, z]), [1, 2, 3])
+ e = coros.event()
+ x.link(e)
+ self.assertEqual(e.wait(), 1)
+ x.unlink(e)
+ e = coros.event()
+ x.link(e)
+ self.assertEqual(e.wait(), 1)
self.assertEqual([proc.wait(X) for X in [x, y, z]], [1, 2, 3])
def test_wait_error(self):
@@ -209,7 +311,7 @@ class TestStuff(unittest.TestCase):
y.link(z)
z.link(y)
self.assertRaises(ValueError, proc.wait, [x, y, z])
- assert isinstance(proc.wait(x), proc.LinkedFailed), repr(proc.wait(x))
+ self.assertRaises(proc.LinkedFailed, proc.wait, x)
self.assertEqual(proc.wait(z), 3)
self.assertRaises(ValueError, proc.wait, y)
@@ -228,7 +330,8 @@ class TestStuff(unittest.TestCase):
def test_multiple_listeners_error(self):
# if there was an error while calling a callback
# it should not prevent the other listeners from being called
- # (but all of them should be logged, check the output that they are)
+ # also, all of the errors should be logged, check the output
+ # manually that they are
p = proc.spawn(lambda : 5)
results = []
def listener1(*args):
@@ -242,7 +345,7 @@ class TestStuff(unittest.TestCase):
p.link(listener1)
p.link(listener2)
p.link(listener3)
- sleep(DELAY*3)
+ sleep(DELAY*10)
assert results in [[10, 20], [20, 10]], results
p = proc.spawn(int, 'hello')
@@ -250,16 +353,20 @@ class TestStuff(unittest.TestCase):
p.link(listener1)
p.link(listener2)
p.link(listener3)
- sleep(DELAY*3)
+ sleep(DELAY*10)
assert results in [[10, 20], [20, 10]], results
def test_multiple_listeners_error_unlink(self):
+ # notification must not happen after unlink even
+ # though notification process has been already started
p = proc.spawn(lambda : 5)
results = []
def listener1(*args):
+ p.unlink(listener2)
results.append(5)
1/0
def listener2(*args):
+ p.unlink(listener1)
results.append(5)
2/0
def listener3(*args):
@@ -267,16 +374,10 @@ class TestStuff(unittest.TestCase):
p.link(listener1)
p.link(listener2)
p.link(listener3)
- sleep(0)
- # unlink one that is not fired yet
- if listener1 in p._receivers:
- p.unlink(listener1)
- elif listener2 in p._receivers:
- p.unlink(listener2)
- sleep(DELAY*3)
+ sleep(DELAY*10)
assert results == [5], results
- def FAILING_test_killing_unlinked(self):
+ def test_killing_unlinked(self):
e = coros.event()
def func():
try:
@@ -293,31 +394,5 @@ class TestStuff(unittest.TestCase):
sleep(DELAY)
-funcs_only_1arg = [lambda x: None,
- lambda x=1: None]
-
-funcs_only_3args = [lambda x, y, z: None,
- lambda x, y, z=1: None]
-
-funcs_any_arg = [lambda a, b=1, c=1: None,
- lambda *args: None]
-
-class TestCallbackTypeErrors(unittest.TestCase):
-
- def test(self):
- p = proc.spawn(lambda : None)
- for func in funcs_only_1arg:
- p.link_return(func)
- self.assertRaises(TypeError, p.link_raise, func)
- self.assertRaises(TypeError, p.link, func)
- for func in funcs_only_3args:
- p.link_raise(func)
- self.assertRaises(TypeError, p.link_return, func)
- self.assertRaises(TypeError, p.link, func)
- for func in funcs_any_arg:
- p.link_raise(func)
- p.link_return(func)
- p.link(func)
-
if __name__=='__main__':
unittest.main()