summaryrefslogtreecommitdiff
path: root/test_futures.py
diff options
context:
space:
mode:
Diffstat (limited to 'test_futures.py')
-rw-r--r--test_futures.py159
1 files changed, 99 insertions, 60 deletions
diff --git a/test_futures.py b/test_futures.py
index 0093b0e..64e541b 100644
--- a/test_futures.py
+++ b/test_futures.py
@@ -25,20 +25,34 @@ CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError())
SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
-
class Call(object):
+ CALL_LOCKS = {}
def __init__(self, manual_finish=False):
- self._called_event = threading.Event()
+ called_event = multiprocessing.Event()
+ can_finish = multiprocessing.Event()
+
+ self._called_event_id = id(called_event)
+ self._can_finish_event_id = id(can_finish)
+
+ self.CALL_LOCKS[self._called_event_id] = called_event
+ self.CALL_LOCKS[self._can_finish_event_id] = can_finish
- self._can_finished = threading.Event()
if not manual_finish:
- self._can_finished.set()
+ self._can_finish.set()
+
+ @property
+ def _can_finish(self):
+ return self.CALL_LOCKS[self._can_finish_event_id]
+
+ @property
+ def _called_event(self):
+ return self.CALL_LOCKS[self._called_event_id]
def wait_on_called(self):
self._called_event.wait()
def set_can(self):
- self._can_finished.set()
+ self._can_finish.set()
def called(self):
return self._called_event.is_set()
@@ -47,36 +61,36 @@ class Call(object):
if self._called_event.is_set(): print('called twice')
self._called_event.set()
- self._can_finished.wait()
+ self._can_finish.wait()
return 42
+ def __del__(self):
+ del self.CALL_LOCKS[self._called_event_id]
+ del self.CALL_LOCKS[self._can_finish_event_id]
+
class ExceptionCall(Call):
def __call__(self):
assert not self._called_event.is_set(), 'already called'
self._called_event.set()
- self._can_finished.wait()
+ self._can_finish.wait()
raise ZeroDivisionError()
class ShutdownTest(unittest.TestCase):
def test_run_after_shutdown(self):
- self.executor = futures.ThreadPoolExecutor(max_threads=1)
-
call1 = Call()
self.executor.shutdown()
self.assertRaises(RuntimeError,
- self.executor.run,
+ self.executor.run_to_futures,
[call1])
- def test_threads_terminate(self):
- self.executor = futures.ThreadPoolExecutor(max_threads=5)
-
+ def _start_some_futures(self):
call1 = Call(manual_finish=True)
call2 = Call(manual_finish=True)
call3 = Call(manual_finish=True)
- self.executor.run([call1, call2, call3],
- return_when=futures.RETURN_IMMEDIATELY)
+ self.executor.run_to_futures([call1, call2, call3],
+ return_when=futures.RETURN_IMMEDIATELY)
call1.wait_on_called()
call2.wait_on_called()
@@ -86,11 +100,33 @@ class ShutdownTest(unittest.TestCase):
call2.set_can()
call3.set_can()
+class ThreadPoolShutdownTest(ShutdownTest):
+ def setUp(self):
+ self.executor = futures.ThreadPoolExecutor(max_threads=5)
+
+ def tearDown(self):
+ self.executor.shutdown()
+
+ def test_threads_terminate(self):
+ self._start_some_futures()
self.assertEqual(len(self.executor._threads), 3)
self.executor.shutdown()
for t in self.executor._threads:
t.join()
-
+
+class ProcessPoolShutdownTest(ShutdownTest):
+ def setUp(self):
+ self.executor = futures.ProcessPoolExecutor(max_processes=5)
+
+ def tearDown(self):
+ self.executor.shutdown()
+
+ def test_processes_terminate(self):
+ self._start_some_futures()
+ self.assertEqual(len(self.executor._processes), 5)
+ self.executor.shutdown()
+ for p in self.executor._processes:
+ p.join()
class WaitsTest(unittest.TestCase):
def test(self):
@@ -105,7 +141,7 @@ class WaitsTest(unittest.TestCase):
def wait_for_FIRST_COMPLETED():
fs.wait(return_when=futures.FIRST_COMPLETED)
self.assertTrue(f1.done())
- self.assertFalse(f2.done())
+ self.assertFalse(f2.done()) # XXX
self.assertFalse(f3.done())
self.assertFalse(f4.done())
first_completed.release()
@@ -114,7 +150,7 @@ class WaitsTest(unittest.TestCase):
fs.wait(return_when=futures.FIRST_EXCEPTION)
self.assertTrue(f1.done())
self.assertTrue(f2.done())
- self.assertFalse(f3.done())
+ self.assertFalse(f3.done()) # XXX
self.assertFalse(f4.done())
first_exception.release()
@@ -127,46 +163,54 @@ class WaitsTest(unittest.TestCase):
call3 = Call(manual_finish=True)
call4 = Call()
- fs = self.executor.run([call1, call2, call3, call4],
- return_when=futures.RETURN_IMMEDIATELY)
+ fs = self.executor.run_to_futures(
+ [call1, call2, call3, call4],
+ return_when=futures.RETURN_IMMEDIATELY)
f1, f2, f3, f4 = fs
threads = []
- for call in [wait_for_ALL_COMPLETED,
- wait_for_FIRST_COMPLETED,
- wait_for_FIRST_EXCEPTION]:
- t = threading.Thread(target=call)
+ for wait_test in [wait_for_ALL_COMPLETED,
+ wait_for_FIRST_COMPLETED,
+ wait_for_FIRST_EXCEPTION]:
+ t = threading.Thread(target=wait_test)
t.start()
threads.append(t)
time.sleep(1) # give threads enough time to execute wait
+
call1.set_can()
first_completed.acquire()
call2.set_can()
first_exception.acquire()
call3.set_can()
- call4.set_can()
all_completed.acquire()
self.executor.shutdown()
class ThreadPoolWaitTests(WaitsTest):
- executor = futures.ThreadPoolExecutor(max_threads=1)
+ def setUp(self):
+ self.executor = futures.ThreadPoolExecutor(max_threads=1)
+
+ def tearDown(self):
+ self.executor.shutdown()
class ProcessPoolWaitTests(WaitsTest):
- executor = futures.ProcessPoolExecutor(max_processes=1)
+ def setUp(self):
+ self.executor = futures.ProcessPoolExecutor(max_processes=1)
+
+ def tearDown(self):
+ self.executor.shutdown()
class CancelTests(unittest.TestCase):
def test_cancel_states(self):
- executor = futures.ThreadPoolExecutor(max_threads=1)
-
call1 = Call(manual_finish=True)
call2 = Call()
call3 = Call()
call4 = Call()
- fs = executor.run([call1, call2, call3, call4],
- return_when=futures.RETURN_IMMEDIATELY)
+ fs = self.executor.run_to_futures(
+ [call1, call2, call3, call4],
+ return_when=futures.RETURN_IMMEDIATELY)
f1, f2, f3, f4 = fs
call1.wait_on_called()
@@ -193,7 +237,6 @@ class CancelTests(unittest.TestCase):
self.assertEqual(call2.called(), False)
self.assertEqual(call4.called(), False)
- executor.shutdown()
def test_wait_for_individual_cancel(self):
def end_call():
@@ -201,12 +244,12 @@ class CancelTests(unittest.TestCase):
f2.cancel()
call1.set_can()
- executor = futures.ThreadPoolExecutor(max_threads=1)
-
call1 = Call(manual_finish=True)
call2 = Call()
- fs = executor.run([call1, call2], return_when=futures.RETURN_IMMEDIATELY)
+ fs = self.executor.run_to_futures(
+ [call1, call2],
+ return_when=futures.RETURN_IMMEDIATELY)
f1, f2 = fs
call1.wait_on_called()
@@ -215,18 +258,16 @@ class CancelTests(unittest.TestCase):
self.assertRaises(futures.CancelledError, f2.result)
self.assertRaises(futures.CancelledError, f2.exception)
t.join()
- executor.shutdown()
def test_wait_with_already_cancelled_futures(self):
- executor = futures.ThreadPoolExecutor(max_threads=1)
-
call1 = Call(manual_finish=True)
call2 = Call()
call3 = Call()
call4 = Call()
- fs = executor.run([call1, call2, call3, call4],
- return_when=futures.RETURN_IMMEDIATELY)
+ fs = self.executor.run_to_futures(
+ [call1, call2, call3, call4],
+ return_when=futures.RETURN_IMMEDIATELY)
f1, f2, f3, f4 = fs
call1.wait_on_called()
@@ -236,18 +277,16 @@ class CancelTests(unittest.TestCase):
time.sleep(0.1)
fs.wait(return_when=futures.ALL_COMPLETED)
- executor.shutdown()
def test_cancel_all(self):
- executor = futures.ThreadPoolExecutor(max_threads=1)
-
call1 = Call(manual_finish=True)
call2 = Call()
call3 = Call()
call4 = Call()
- fs = executor.run([call1, call2, call3, call4],
- return_when=futures.RETURN_IMMEDIATELY)
+ fs = self.executor.run_to_futures(
+ [call1, call2, call3, call4],
+ return_when=futures.RETURN_IMMEDIATELY)
f1, f2, f3, f4 = fs
call1.wait_on_called()
@@ -259,22 +298,20 @@ class CancelTests(unittest.TestCase):
self.assertTrue(f2.cancelled())
self.assertTrue(f3.cancelled())
self.assertTrue(f4.cancelled())
- executor.shutdown()
- def test_cancel_repr(self):
- executor = futures.ThreadPoolExecutor(max_threads=1)
+class ThreadPoolCancelTests(CancelTests):
+ def setUp(self):
+ self.executor = futures.ThreadPoolExecutor(max_threads=1)
- call1 = Call(manual_finish=True)
- call2 = Call()
+ def tearDown(self):
+ self.executor.shutdown()
- fs = executor.run([call1, call2], return_when=futures.RETURN_IMMEDIATELY)
- f1, f2 = fs
+class ProcessPoolCancelTests(WaitsTest):
+ def setUp(self):
+ self.executor = futures.ProcessPoolExecutor(max_processes=1)
- call1.wait_on_called()
- call1.set_can()
- f2.cancel()
- self.assertEqual(repr(f2), '<Future state=cancelled>')
- executor.shutdown()
+ def tearDown(self):
+ self.executor.shutdown()
class FutureTests(unittest.TestCase):
def test_repr(self):
@@ -552,12 +589,14 @@ class FutureListTests(unittest.TestCase):
'[#pending=4 #cancelled=3 #running=2 #finished=6]>')
def test_main():
- test.support.run_unittest(CancelTests,
-# ProcessPoolWaitTests,
+ test.support.run_unittest(ProcessPoolCancelTests,
+ ThreadPoolCancelTests,
+ ProcessPoolWaitTests,
ThreadPoolWaitTests,
FutureTests,
FutureListTests,
- ShutdownTest)
+ ProcessPoolShutdownTest,
+ ThreadPoolShutdownTest)
if __name__ == "__main__":
test_main() \ No newline at end of file