diff options
Diffstat (limited to 'test_futures.py')
-rw-r--r-- | test_futures.py | 159 |
1 files changed, 99 insertions, 60 deletions
diff --git a/test_futures.py b/test_futures.py index 0093b0e..64e541b 100644 --- a/test_futures.py +++ b/test_futures.py @@ -25,20 +25,34 @@ CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED) EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError()) SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42) - class Call(object): + CALL_LOCKS = {} def __init__(self, manual_finish=False): - self._called_event = threading.Event() + called_event = multiprocessing.Event() + can_finish = multiprocessing.Event() + + self._called_event_id = id(called_event) + self._can_finish_event_id = id(can_finish) + + self.CALL_LOCKS[self._called_event_id] = called_event + self.CALL_LOCKS[self._can_finish_event_id] = can_finish - self._can_finished = threading.Event() if not manual_finish: - self._can_finished.set() + self._can_finish.set() + + @property + def _can_finish(self): + return self.CALL_LOCKS[self._can_finish_event_id] + + @property + def _called_event(self): + return self.CALL_LOCKS[self._called_event_id] def wait_on_called(self): self._called_event.wait() def set_can(self): - self._can_finished.set() + self._can_finish.set() def called(self): return self._called_event.is_set() @@ -47,36 +61,36 @@ class Call(object): if self._called_event.is_set(): print('called twice') self._called_event.set() - self._can_finished.wait() + self._can_finish.wait() return 42 + def __del__(self): + del self.CALL_LOCKS[self._called_event_id] + del self.CALL_LOCKS[self._can_finish_event_id] + class ExceptionCall(Call): def __call__(self): assert not self._called_event.is_set(), 'already called' self._called_event.set() - self._can_finished.wait() + self._can_finish.wait() raise ZeroDivisionError() class ShutdownTest(unittest.TestCase): def test_run_after_shutdown(self): - self.executor = futures.ThreadPoolExecutor(max_threads=1) - call1 = Call() self.executor.shutdown() self.assertRaises(RuntimeError, - self.executor.run, + self.executor.run_to_futures, [call1]) - def test_threads_terminate(self): - self.executor = futures.ThreadPoolExecutor(max_threads=5) - + def _start_some_futures(self): call1 = Call(manual_finish=True) call2 = Call(manual_finish=True) call3 = Call(manual_finish=True) - self.executor.run([call1, call2, call3], - return_when=futures.RETURN_IMMEDIATELY) + self.executor.run_to_futures([call1, call2, call3], + return_when=futures.RETURN_IMMEDIATELY) call1.wait_on_called() call2.wait_on_called() @@ -86,11 +100,33 @@ class ShutdownTest(unittest.TestCase): call2.set_can() call3.set_can() +class ThreadPoolShutdownTest(ShutdownTest): + def setUp(self): + self.executor = futures.ThreadPoolExecutor(max_threads=5) + + def tearDown(self): + self.executor.shutdown() + + def test_threads_terminate(self): + self._start_some_futures() self.assertEqual(len(self.executor._threads), 3) self.executor.shutdown() for t in self.executor._threads: t.join() - + +class ProcessPoolShutdownTest(ShutdownTest): + def setUp(self): + self.executor = futures.ProcessPoolExecutor(max_processes=5) + + def tearDown(self): + self.executor.shutdown() + + def test_processes_terminate(self): + self._start_some_futures() + self.assertEqual(len(self.executor._processes), 5) + self.executor.shutdown() + for p in self.executor._processes: + p.join() class WaitsTest(unittest.TestCase): def test(self): @@ -105,7 +141,7 @@ class WaitsTest(unittest.TestCase): def wait_for_FIRST_COMPLETED(): fs.wait(return_when=futures.FIRST_COMPLETED) self.assertTrue(f1.done()) - self.assertFalse(f2.done()) + self.assertFalse(f2.done()) # XXX self.assertFalse(f3.done()) self.assertFalse(f4.done()) first_completed.release() @@ -114,7 +150,7 @@ class WaitsTest(unittest.TestCase): fs.wait(return_when=futures.FIRST_EXCEPTION) self.assertTrue(f1.done()) self.assertTrue(f2.done()) - self.assertFalse(f3.done()) + self.assertFalse(f3.done()) # XXX self.assertFalse(f4.done()) first_exception.release() @@ -127,46 +163,54 @@ class WaitsTest(unittest.TestCase): call3 = Call(manual_finish=True) call4 = Call() - fs = self.executor.run([call1, call2, call3, call4], - return_when=futures.RETURN_IMMEDIATELY) + fs = self.executor.run_to_futures( + [call1, call2, call3, call4], + return_when=futures.RETURN_IMMEDIATELY) f1, f2, f3, f4 = fs threads = [] - for call in [wait_for_ALL_COMPLETED, - wait_for_FIRST_COMPLETED, - wait_for_FIRST_EXCEPTION]: - t = threading.Thread(target=call) + for wait_test in [wait_for_ALL_COMPLETED, + wait_for_FIRST_COMPLETED, + wait_for_FIRST_EXCEPTION]: + t = threading.Thread(target=wait_test) t.start() threads.append(t) time.sleep(1) # give threads enough time to execute wait + call1.set_can() first_completed.acquire() call2.set_can() first_exception.acquire() call3.set_can() - call4.set_can() all_completed.acquire() self.executor.shutdown() class ThreadPoolWaitTests(WaitsTest): - executor = futures.ThreadPoolExecutor(max_threads=1) + def setUp(self): + self.executor = futures.ThreadPoolExecutor(max_threads=1) + + def tearDown(self): + self.executor.shutdown() class ProcessPoolWaitTests(WaitsTest): - executor = futures.ProcessPoolExecutor(max_processes=1) + def setUp(self): + self.executor = futures.ProcessPoolExecutor(max_processes=1) + + def tearDown(self): + self.executor.shutdown() class CancelTests(unittest.TestCase): def test_cancel_states(self): - executor = futures.ThreadPoolExecutor(max_threads=1) - call1 = Call(manual_finish=True) call2 = Call() call3 = Call() call4 = Call() - fs = executor.run([call1, call2, call3, call4], - return_when=futures.RETURN_IMMEDIATELY) + fs = self.executor.run_to_futures( + [call1, call2, call3, call4], + return_when=futures.RETURN_IMMEDIATELY) f1, f2, f3, f4 = fs call1.wait_on_called() @@ -193,7 +237,6 @@ class CancelTests(unittest.TestCase): self.assertEqual(call2.called(), False) self.assertEqual(call4.called(), False) - executor.shutdown() def test_wait_for_individual_cancel(self): def end_call(): @@ -201,12 +244,12 @@ class CancelTests(unittest.TestCase): f2.cancel() call1.set_can() - executor = futures.ThreadPoolExecutor(max_threads=1) - call1 = Call(manual_finish=True) call2 = Call() - fs = executor.run([call1, call2], return_when=futures.RETURN_IMMEDIATELY) + fs = self.executor.run_to_futures( + [call1, call2], + return_when=futures.RETURN_IMMEDIATELY) f1, f2 = fs call1.wait_on_called() @@ -215,18 +258,16 @@ class CancelTests(unittest.TestCase): self.assertRaises(futures.CancelledError, f2.result) self.assertRaises(futures.CancelledError, f2.exception) t.join() - executor.shutdown() def test_wait_with_already_cancelled_futures(self): - executor = futures.ThreadPoolExecutor(max_threads=1) - call1 = Call(manual_finish=True) call2 = Call() call3 = Call() call4 = Call() - fs = executor.run([call1, call2, call3, call4], - return_when=futures.RETURN_IMMEDIATELY) + fs = self.executor.run_to_futures( + [call1, call2, call3, call4], + return_when=futures.RETURN_IMMEDIATELY) f1, f2, f3, f4 = fs call1.wait_on_called() @@ -236,18 +277,16 @@ class CancelTests(unittest.TestCase): time.sleep(0.1) fs.wait(return_when=futures.ALL_COMPLETED) - executor.shutdown() def test_cancel_all(self): - executor = futures.ThreadPoolExecutor(max_threads=1) - call1 = Call(manual_finish=True) call2 = Call() call3 = Call() call4 = Call() - fs = executor.run([call1, call2, call3, call4], - return_when=futures.RETURN_IMMEDIATELY) + fs = self.executor.run_to_futures( + [call1, call2, call3, call4], + return_when=futures.RETURN_IMMEDIATELY) f1, f2, f3, f4 = fs call1.wait_on_called() @@ -259,22 +298,20 @@ class CancelTests(unittest.TestCase): self.assertTrue(f2.cancelled()) self.assertTrue(f3.cancelled()) self.assertTrue(f4.cancelled()) - executor.shutdown() - def test_cancel_repr(self): - executor = futures.ThreadPoolExecutor(max_threads=1) +class ThreadPoolCancelTests(CancelTests): + def setUp(self): + self.executor = futures.ThreadPoolExecutor(max_threads=1) - call1 = Call(manual_finish=True) - call2 = Call() + def tearDown(self): + self.executor.shutdown() - fs = executor.run([call1, call2], return_when=futures.RETURN_IMMEDIATELY) - f1, f2 = fs +class ProcessPoolCancelTests(WaitsTest): + def setUp(self): + self.executor = futures.ProcessPoolExecutor(max_processes=1) - call1.wait_on_called() - call1.set_can() - f2.cancel() - self.assertEqual(repr(f2), '<Future state=cancelled>') - executor.shutdown() + def tearDown(self): + self.executor.shutdown() class FutureTests(unittest.TestCase): def test_repr(self): @@ -552,12 +589,14 @@ class FutureListTests(unittest.TestCase): '[#pending=4 #cancelled=3 #running=2 #finished=6]>') def test_main(): - test.support.run_unittest(CancelTests, -# ProcessPoolWaitTests, + test.support.run_unittest(ProcessPoolCancelTests, + ThreadPoolCancelTests, + ProcessPoolWaitTests, ThreadPoolWaitTests, FutureTests, FutureListTests, - ShutdownTest) + ProcessPoolShutdownTest, + ThreadPoolShutdownTest) if __name__ == "__main__": test_main()
\ No newline at end of file |