diff options
Diffstat (limited to 'python3/test_futures.py')
-rw-r--r-- | python3/test_futures.py | 994 |
1 files changed, 491 insertions, 503 deletions
diff --git a/python3/test_futures.py b/python3/test_futures.py index b7dee65..98eea27 100644 --- a/python3/test_futures.py +++ b/python3/test_futures.py @@ -1,17 +1,24 @@ -import test.support - -import unittest +import io +import logging +import multiprocessing +import sys import threading +import test.support import time -import multiprocessing +import unittest + +if sys.platform.startswith('win'): + import ctypes + import ctypes.wintypes import futures -import futures._base from futures._base import ( - PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future) + PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future, + LOGGER, STDERR_HANDLER, wait) +import futures.process def create_future(state=PENDING, exception=None, result=None): - f = Future(0) + f = Future() f._state = state f._exception = exception f._result = result @@ -24,68 +31,104 @@ CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED) EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError()) SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42) +def mul(x, y): + return x * y + class Call(object): + """A call that can be submitted to a future.Executor for testing. + + The call signals when it is called and waits for an event before finishing. + """ CALL_LOCKS = {} + def _create_event(self): + if sys.platform.startswith('win'): + class SECURITY_ATTRIBUTES(ctypes.Structure): + _fields_ = [("nLength", ctypes.wintypes.DWORD), + ("lpSecurityDescriptor", ctypes.wintypes.LPVOID), + ("bInheritHandle", ctypes.wintypes.BOOL)] + + s = SECURITY_ATTRIBUTES() + s.nLength = ctypes.sizeof(s) + s.lpSecurityDescriptor = None + s.bInheritHandle = True + + handle = ctypes.windll.kernel32.CreateEventA(ctypes.pointer(s), + True, + False, + None) + assert handle is not None + return handle + else: + event = multiprocessing.Event() + self.CALL_LOCKS[id(event)] = event + return id(event) + + def _wait_on_event(self, handle): + if sys.platform.startswith('win'): + r = ctypes.windll.kernel32.WaitForSingleObject(handle, 5 * 1000) + assert r == 0 + else: + self.CALL_LOCKS[handle].wait() + + def _signal_event(self, handle): + if sys.platform.startswith('win'): + r = ctypes.windll.kernel32.SetEvent(handle) + assert r != 0 + else: + self.CALL_LOCKS[handle].set() + def __init__(self, manual_finish=False, result=42): - called_event = multiprocessing.Event() - can_finish = multiprocessing.Event() + self._called_event = self._create_event() + self._can_finish = self._create_event() self._result = result - self._called_event_id = id(called_event) - self._can_finish_event_id = id(can_finish) - - self.CALL_LOCKS[self._called_event_id] = called_event - self.CALL_LOCKS[self._can_finish_event_id] = can_finish if not manual_finish: - self._can_finish.set() - - @property - def _can_finish(self): - return self.CALL_LOCKS[self._can_finish_event_id] - - @property - def _called_event(self): - return self.CALL_LOCKS[self._called_event_id] + self._signal_event(self._can_finish) def wait_on_called(self): - self._called_event.wait() + self._wait_on_event(self._called_event) def set_can(self): - self._can_finish.set() - - def called(self): - return self._called_event.is_set() + self._signal_event(self._can_finish) def __call__(self): - if self._called_event.is_set(): print('called twice') + self._signal_event(self._called_event) + self._wait_on_event(self._can_finish) - self._called_event.set() - self._can_finish.wait() return self._result def close(self): - del self.CALL_LOCKS[self._called_event_id] - del self.CALL_LOCKS[self._can_finish_event_id] + self.set_can() + if sys.platform.startswith('win'): + ctypes.windll.kernel32.CloseHandle(self._called_event) + ctypes.windll.kernel32.CloseHandle(self._can_finish) + else: + del self.CALL_LOCKS[self._called_event] + del self.CALL_LOCKS[self._can_finish] class ExceptionCall(Call): def __call__(self): - assert not self._called_event.is_set(), 'already called' - - self._called_event.set() - self._can_finish.wait() + self._signal_event(self._called_event) + self._wait_on_event(self._can_finish) raise ZeroDivisionError() +class MapCall(Call): + def __init__(self, result=42): + super().__init__(manual_finish=True, result=result) + + def __call__(self, manual_finish): + if manual_finish: + super().__call__() + return self._result + class ExecutorShutdownTest(unittest.TestCase): def test_run_after_shutdown(self): - call1 = Call() - try: - self.executor.shutdown() - self.assertRaises(RuntimeError, - self.executor.run_to_futures, - [call1]) - finally: - call1.close() + self.executor.shutdown() + self.assertRaises(RuntimeError, + self.executor.submit, + pow, 2, 5) + def _start_some_futures(self): call1 = Call(manual_finish=True) @@ -93,13 +136,14 @@ class ExecutorShutdownTest(unittest.TestCase): call3 = Call(manual_finish=True) try: - self.executor.run_to_futures([call1, call2, call3], - return_when=futures.RETURN_IMMEDIATELY) - + self.executor.submit(call1) + self.executor.submit(call2) + self.executor.submit(call3) + call1.wait_on_called() call2.wait_on_called() call3.wait_on_called() - + call1.set_can() call2.set_can() call3.set_can() @@ -110,10 +154,10 @@ class ExecutorShutdownTest(unittest.TestCase): class ThreadPoolShutdownTest(ExecutorShutdownTest): def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_threads=5) + self.executor = futures.ThreadPoolExecutor(max_workers=5) def tearDown(self): - self.executor.shutdown() + self.executor.shutdown(wait=True) def test_threads_terminate(self): self._start_some_futures() @@ -123,7 +167,7 @@ class ThreadPoolShutdownTest(ExecutorShutdownTest): t.join() def test_context_manager_shutdown(self): - with futures.ThreadPoolExecutor(max_threads=5) as e: + with futures.ThreadPoolExecutor(max_workers=5) as e: executor = e self.assertEqual(list(e.map(abs, range(-5, 5))), [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) @@ -132,7 +176,7 @@ class ThreadPoolShutdownTest(ExecutorShutdownTest): t.join() def test_del_shutdown(self): - executor = futures.ThreadPoolExecutor(max_threads=5) + executor = futures.ThreadPoolExecutor(max_workers=5) executor.map(abs, range(-5, 5)) threads = executor._threads del executor @@ -142,32 +186,31 @@ class ThreadPoolShutdownTest(ExecutorShutdownTest): class ProcessPoolShutdownTest(ExecutorShutdownTest): def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_processes=5) + self.executor = futures.ProcessPoolExecutor(max_workers=5) def tearDown(self): - self.executor.shutdown() + self.executor.shutdown(wait=True) def test_processes_terminate(self): self._start_some_futures() self.assertEqual(len(self.executor._processes), 5) + processes = self.executor._processes self.executor.shutdown() - self.executor._queue_management_thread.join() - for p in self.executor._processes: + for p in processes: p.join() def test_context_manager_shutdown(self): - with futures.ProcessPoolExecutor(max_processes=5) as e: + with futures.ProcessPoolExecutor(max_workers=5) as e: executor = e self.assertEqual(list(e.map(abs, range(-5, 5))), [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) - executor._queue_management_thread.join() for p in self.executor._processes: p.join() def test_del_shutdown(self): - executor = futures.ProcessPoolExecutor(max_processes=5) + executor = futures.ProcessPoolExecutor(max_workers=5) list(executor.map(abs, range(-5, 5))) queue_management_thread = executor._queue_management_thread processes = executor._processes @@ -177,313 +220,317 @@ class ProcessPoolShutdownTest(ExecutorShutdownTest): for p in processes: p.join() -class WaitsTest(unittest.TestCase): - def test_concurrent_waits(self): - def wait_for_ALL_COMPLETED(): - fs.wait(return_when=futures.ALL_COMPLETED) - self.assertTrue(f1.done()) - self.assertTrue(f2.done()) - self.assertTrue(f3.done()) - self.assertTrue(f4.done()) - all_completed.release() - - def wait_for_FIRST_COMPLETED(): - fs.wait(return_when=futures.FIRST_COMPLETED) - self.assertTrue(f1.done()) - self.assertFalse(f2.done()) # XXX - self.assertFalse(f3.done()) - self.assertFalse(f4.done()) - first_completed.release() - - def wait_for_FIRST_EXCEPTION(): - fs.wait(return_when=futures.FIRST_EXCEPTION) - self.assertTrue(f1.done()) - self.assertTrue(f2.done()) - self.assertFalse(f3.done()) # XXX - self.assertFalse(f4.done()) - first_exception.release() - - all_completed = threading.Semaphore(0) - first_completed = threading.Semaphore(0) - first_exception = threading.Semaphore(0) +class WaitTests(unittest.TestCase): + def test_first_completed(self): + def wait_test(): + while not future1._waiters: + pass + call1.set_can() call1 = Call(manual_finish=True) - call2 = ExceptionCall(manual_finish=True) - call3 = Call(manual_finish=True) - call4 = Call() - + call2 = Call(manual_finish=True) try: - fs = self.executor.run_to_futures( - [call1, call2, call3, call4], - return_when=futures.RETURN_IMMEDIATELY) - f1, f2, f3, f4 = fs - - threads = [] - for wait_test in [wait_for_ALL_COMPLETED, - wait_for_FIRST_COMPLETED, - wait_for_FIRST_EXCEPTION]: - t = threading.Thread(target=wait_test) - t.start() - threads.append(t) - - time.sleep(1) # give threads enough time to execute wait - - call1.set_can() - first_completed.acquire() - call2.set_can() - first_exception.acquire() - call3.set_can() - all_completed.acquire() - - self.executor.shutdown() + future1 = self.executor.submit(call1) + future2 = self.executor.submit(call2) + + t = threading.Thread(target=wait_test) + t.start() + done, not_done = futures.wait( + [CANCELLED_FUTURE, future1, future2], + return_when=futures.FIRST_COMPLETED) + + self.assertEquals(set([future1]), done) + self.assertEquals(set([CANCELLED_FUTURE, future2]), not_done) finally: call1.close() call2.close() - call3.close() - call4.close() -class ThreadPoolWaitTests(WaitsTest): - def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_threads=1) + def test_first_completed_one_already_completed(self): + call1 = Call(manual_finish=True) + try: + future1 = self.executor.submit(call1) - def tearDown(self): - self.executor.shutdown() + finished, pending = futures.wait( + [SUCCESSFUL_FUTURE, future1], + return_when=futures.FIRST_COMPLETED) -class ProcessPoolWaitTests(WaitsTest): - def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_processes=1) + self.assertEquals(set([SUCCESSFUL_FUTURE]), finished) + self.assertEquals(set([future1]), pending) + finally: + call1.close() - def tearDown(self): - self.executor.shutdown() + def test_first_exception(self): + def wait_test(): + while not future1._waiters: + pass + call1.set_can() + call2.set_can() -class CancelTests(unittest.TestCase): - def test_cancel_states(self): call1 = Call(manual_finish=True) - call2 = Call() - call3 = Call() - call4 = Call() - + call2 = ExceptionCall(manual_finish=True) + call3 = Call(manual_finish=True) try: - fs = self.executor.run_to_futures( - [call1, call2, call3, call4], - return_when=futures.RETURN_IMMEDIATELY) - f1, f2, f3, f4 = fs - - call1.wait_on_called() - self.assertEqual(f1.cancel(), False) - self.assertEqual(f2.cancel(), True) - self.assertEqual(f4.cancel(), True) - self.assertEqual(f1.cancelled(), False) - self.assertEqual(f2.cancelled(), True) - self.assertEqual(f3.cancelled(), False) - self.assertEqual(f4.cancelled(), True) - self.assertEqual(f1.done(), False) - self.assertEqual(f2.done(), True) - self.assertEqual(f3.done(), False) - self.assertEqual(f4.done(), True) - - call1.set_can() - fs.wait(return_when=futures.ALL_COMPLETED) - self.assertEqual(f1.result(), 42) - self.assertRaises(futures.CancelledError, f2.result) - self.assertRaises(futures.CancelledError, f2.exception) - self.assertEqual(f3.result(), 42) - self.assertRaises(futures.CancelledError, f4.result) - self.assertRaises(futures.CancelledError, f4.exception) - - self.assertEqual(call2.called(), False) - self.assertEqual(call4.called(), False) + future1 = self.executor.submit(call1) + future2 = self.executor.submit(call2) + future3 = self.executor.submit(call3) + + t = threading.Thread(target=wait_test) + t.start() + finished, pending = futures.wait( + [future1, future2, future3], + return_when=futures.FIRST_EXCEPTION) + + self.assertEquals(set([future1, future2]), finished) + self.assertEquals(set([future3]), pending) finally: call1.close() call2.close() call3.close() - call4.close() - def test_wait_for_individual_cancel_while_waiting(self): - def end_call(): - # Wait until the main thread is waiting on the results of the - # future. - time.sleep(1) - f2.cancel() + def test_first_exception_some_already_complete(self): + def wait_test(): + while not future1._waiters: + pass call1.set_can() - call1 = Call(manual_finish=True) - call2 = Call() - + call1 = ExceptionCall(manual_finish=True) + call2 = Call(manual_finish=True) try: - fs = self.executor.run_to_futures( - [call1, call2], - return_when=futures.RETURN_IMMEDIATELY) - f1, f2 = fs - - call1.wait_on_called() - t = threading.Thread(target=end_call) + future1 = self.executor.submit(call1) + future2 = self.executor.submit(call2) + + t = threading.Thread(target=wait_test) t.start() - self.assertRaises(futures.CancelledError, f2.result) - self.assertRaises(futures.CancelledError, f2.exception) - t.join() + finished, pending = futures.wait( + [SUCCESSFUL_FUTURE, + CANCELLED_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + future1, future2], + return_when=futures.FIRST_EXCEPTION) + + self.assertEquals(set([SUCCESSFUL_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + future1]), finished) + self.assertEquals(set([CANCELLED_FUTURE, future2]), pending) + + finally: call1.close() call2.close() - def test_wait_with_already_cancelled_futures(self): + def test_first_exception_one_already_failed(self): call1 = Call(manual_finish=True) - call2 = Call() - call3 = Call() - call4 = Call() + try: + future1 = self.executor.submit(call1) + + finished, pending = futures.wait( + [EXCEPTION_FUTURE, future1], + return_when=futures.FIRST_EXCEPTION) + self.assertEquals(set([EXCEPTION_FUTURE]), finished) + self.assertEquals(set([future1]), pending) + finally: + call1.close() + + def test_all_completed(self): + def wait_test(): + while not future1._waiters: + pass + call1.set_can() + call2.set_can() + + call1 = Call(manual_finish=True) + call2 = Call(manual_finish=True) try: - fs = self.executor.run_to_futures( - [call1, call2, call3, call4], - return_when=futures.RETURN_IMMEDIATELY) - f1, f2, f3, f4 = fs - - call1.wait_on_called() - self.assertTrue(f2.cancel()) - self.assertTrue(f3.cancel()) + future1 = self.executor.submit(call1) + future2 = self.executor.submit(call2) + + t = threading.Thread(target=wait_test) + t.start() + finished, pending = futures.wait( + [future1, future2], + return_when=futures.ALL_COMPLETED) + + self.assertEquals(set([future1, future2]), finished) + self.assertEquals(set(), pending) + + + finally: + call1.close() + call2.close() + + def test_all_completed_some_already_completed(self): + def wait_test(): + while not future1._waiters: + pass + + future4.cancel() call1.set_can() - - fs.wait(return_when=futures.ALL_COMPLETED) + call2.set_can() + call3.set_can() + + self.assertLessEqual( + futures.process.EXTRA_QUEUED_CALLS, + 1, + 'this test assumes that future4 will be cancelled before it is ' + 'queued to run - which might not be the case if ' + 'ProcessPoolExecutor is too aggresive in scheduling futures') + call1 = Call(manual_finish=True) + call2 = Call(manual_finish=True) + call3 = Call(manual_finish=True) + call4 = Call(manual_finish=True) + try: + future1 = self.executor.submit(call1) + future2 = self.executor.submit(call2) + future3 = self.executor.submit(call3) + future4 = self.executor.submit(call4) + + t = threading.Thread(target=wait_test) + t.start() + finished, pending = futures.wait( + [SUCCESSFUL_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + future1, future2, future3, future4], + return_when=futures.ALL_COMPLETED) + + self.assertEquals(set([SUCCESSFUL_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + future1, future2, future3, future4]), + finished) + self.assertEquals(set(), pending) finally: call1.close() call2.close() call3.close() call4.close() - def test_cancel_all(self): - call1 = Call(manual_finish=True) - call2 = Call() - call3 = Call() - call4 = Call() + def test_timeout(self): + def wait_test(): + while not future1._waiters: + pass + call1.set_can() + call1 = Call(manual_finish=True) + call2 = Call(manual_finish=True) try: - fs = self.executor.run_to_futures( - [call1, call2, call3, call4], - return_when=futures.RETURN_IMMEDIATELY) - f1, f2, f3, f4 = fs - - call1.wait_on_called() - self.assertRaises(futures.TimeoutError, fs.cancel, timeout=0) - call1.set_can() - fs.cancel() - - self.assertFalse(f1.cancelled()) - self.assertTrue(f2.cancelled()) - self.assertTrue(f3.cancelled()) - self.assertTrue(f4.cancelled()) + future1 = self.executor.submit(call1) + future2 = self.executor.submit(call2) + + t = threading.Thread(target=wait_test) + t.start() + finished, pending = futures.wait( + [CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE, + future1, future2], + timeout=1, + return_when=futures.ALL_COMPLETED) + + self.assertEquals(set([CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE, + future1]), finished) + self.assertEquals(set([future2]), pending) + + finally: call1.close() call2.close() - call3.close() - call4.close() -class ThreadPoolCancelTests(CancelTests): + +class ThreadPoolWaitTests(WaitTests): def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_threads=1) + self.executor = futures.ThreadPoolExecutor(max_workers=1) def tearDown(self): - self.executor.shutdown() + self.executor.shutdown(wait=True) -class ProcessPoolCancelTests(WaitsTest): +class ProcessPoolWaitTests(WaitTests): def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_processes=1) + self.executor = futures.ProcessPoolExecutor(max_workers=1) def tearDown(self): - self.executor.shutdown() - -class ExecutorTest(unittest.TestCase): - # Executor.shutdown() and context manager usage is tested by - # ExecutorShutdownTest. - def test_run_to_futures(self): - call1 = Call(result=1) - call2 = Call(result=2) - call3 = Call(manual_finish=True) - call4 = Call() - call5 = Call() + self.executor.shutdown(wait=True) + +class AsCompletedTests(unittest.TestCase): + # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout. + def test_no_timeout(self): + def wait_test(): + while not future1._waiters: + pass + call1.set_can() + call2.set_can() + call1 = Call(manual_finish=True) + call2 = Call(manual_finish=True) try: - f1, f2, f3, f4, f5 = self.executor.run_to_futures( - [call1, call2, call3, call4, call5], - return_when=futures.RETURN_IMMEDIATELY) - - call3.wait_on_called() + future1 = self.executor.submit(call1) + future2 = self.executor.submit(call2) - # ProcessPoolExecutor uses a thread to propogate results into the - # future. Calling result() ensures that the thread has done its work - # before doing the next set of checks. - f1.result() - f2.result() - - self.assertTrue(f1.done()) - self.assertFalse(f1.running()) - self.assertEqual(f1.index, 0) - - self.assertTrue(f2.done()) - self.assertFalse(f2.running()) - self.assertEqual(f2.index, 1) - - self.assertFalse(f3.done()) - self.assertTrue(f3.running()) - self.assertEqual(f3.index, 2) - - # ProcessPoolExecutor may mark some futures as running before they - # actually are so don't check these ones. - self.assertFalse(f4.done()) - self.assertEqual(f4.index, 3) - - self.assertFalse(f5.done()) - self.assertEqual(f5.index, 4) + t = threading.Thread(target=wait_test) + t.start() + completed = set(futures.as_completed( + [CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE, + future1, future2])) + self.assertEquals(set( + [CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE, + future1, future2]), + completed) finally: - call3.set_can() # Let the call finish executing. call1.close() call2.close() - call3.close() - call4.close() - call5.close() - def test_run_to_results(self): - call1 = Call(result=1) - call2 = Call(result=2) - call3 = Call(result=3) + def test_zero_timeout(self): + call1 = Call(manual_finish=True) try: - self.assertEqual( - list(self.executor.run_to_results([call1, call2, call3])), - [1, 2, 3]) + future1 = self.executor.submit(call1) + completed_futures = set() + try: + for future in futures.as_completed( + [CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE, + future1], + timeout=0): + completed_futures.add(future) + except futures.TimeoutError: + pass + + self.assertEquals(set([CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE]), + completed_futures) finally: call1.close() - call2.close() - call3.close() - def test_run_to_results_exception(self): - call1 = Call(result=1) - call2 = Call(result=2) - call3 = ExceptionCall() - try: - i = self.executor.run_to_results([call1, call2, call3]) - - self.assertEqual(i.__next__(), 1) - self.assertEqual(i.__next__(), 2) - self.assertRaises(ZeroDivisionError, i.__next__) - finally: - call1.close() - call2.close() - call3.close() +class ThreadPoolAsCompletedTests(AsCompletedTests): + def setUp(self): + self.executor = futures.ThreadPoolExecutor(max_workers=1) - def test_run_to_results_timeout(self): - call1 = Call(result=1) - call2 = Call(result=2) - call3 = Call(manual_finish=True) + def tearDown(self): + self.executor.shutdown(wait=True) - try: - i = self.executor.run_to_results([call1, call2, call3], timeout=1) - self.assertEqual(i.__next__(), 1) - self.assertEqual(i.__next__(), 2) - self.assertRaises(futures.TimeoutError, i.__next__) - call3.set_can() - finally: - call1.close() - call2.close() - call3.close() +class ProcessPoolAsCompletedTests(AsCompletedTests): + def setUp(self): + self.executor = futures.ProcessPoolExecutor(max_workers=1) + + def tearDown(self): + self.executor.shutdown(wait=True) + +class ExecutorTest(unittest.TestCase): + # Executor.shutdown() and context manager usage is tested by + # ExecutorShutdownTest. + def test_submit(self): + future = self.executor.submit(pow, 2, 8) + self.assertEquals(256, future.result()) + + def test_submit_keyword(self): + future = self.executor.submit(mul, 2, y=8) + self.assertEquals(16, future.result()) def test_map(self): self.assertEqual( @@ -496,36 +543,150 @@ class ExecutorTest(unittest.TestCase): self.assertEqual(i.__next__(), (0, 1)) self.assertRaises(ZeroDivisionError, i.__next__) + def test_map_timeout(self): + results = [] + timeout_call = MapCall() + try: + try: + for i in self.executor.map(timeout_call, + [False, False, True], + timeout=1): + results.append(i) + except futures.TimeoutError: + pass + else: + self.fail('expected TimeoutError') + finally: + timeout_call.close() + + self.assertEquals([42, 42], results) + class ThreadPoolExecutorTest(ExecutorTest): def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_threads=1) + self.executor = futures.ThreadPoolExecutor(max_workers=1) def tearDown(self): - self.executor.shutdown() + self.executor.shutdown(wait=True) class ProcessPoolExecutorTest(ExecutorTest): def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_processes=1) + self.executor = futures.ProcessPoolExecutor(max_workers=1) def tearDown(self): - self.executor.shutdown() + self.executor.shutdown(wait=True) class FutureTests(unittest.TestCase): - # Future.index() is tested by ExecutorTest - # Future.cancel() is further tested by CancelTests. + def test_done_callback_with_result(self): + callback_result = None + def fn(callback_future): + nonlocal callback_result + callback_result = callback_future.result() + + f = Future() + f.add_done_callback(fn) + f.set_result(5) + self.assertEquals(5, callback_result) + + def test_done_callback_with_exception(self): + callback_exception = None + def fn(callback_future): + nonlocal callback_exception + callback_exception = callback_future.exception() + + f = Future() + f.add_done_callback(fn) + f.set_exception(Exception('test')) + self.assertEquals(('test',), callback_exception.args) + + def test_done_callback_with_cancel(self): + was_cancelled = None + def fn(callback_future): + nonlocal was_cancelled + was_cancelled = callback_future.cancelled() + + f = Future() + f.add_done_callback(fn) + self.assertTrue(f.cancel()) + self.assertTrue(was_cancelled) + + def test_done_callback_raises(self): + LOGGER.removeHandler(STDERR_HANDLER) + logging_stream = io.StringIO() + handler = logging.StreamHandler(logging_stream) + LOGGER.addHandler(handler) + try: + raising_was_called = False + fn_was_called = False + + def raising_fn(callback_future): + nonlocal raising_was_called + raising_was_called = True + raise Exception('doh!') + + def fn(callback_future): + nonlocal fn_was_called + fn_was_called = True + + f = Future() + f.add_done_callback(raising_fn) + f.add_done_callback(fn) + f.set_result(5) + self.assertTrue(raising_was_called) + self.assertTrue(fn_was_called) + self.assertIn('Exception: doh!', logging_stream.getvalue()) + finally: + LOGGER.removeHandler(handler) + LOGGER.addHandler(STDERR_HANDLER) + + def test_done_callback_already_successful(self): + callback_result = None + def fn(callback_future): + nonlocal callback_result + callback_result = callback_future.result() + + f = Future() + f.set_result(5) + f.add_done_callback(fn) + self.assertEquals(5, callback_result) + + def test_done_callback_already_failed(self): + callback_exception = None + def fn(callback_future): + nonlocal callback_exception + callback_exception = callback_future.exception() + + f = Future() + f.set_exception(Exception('test')) + f.add_done_callback(fn) + self.assertEquals(('test',), callback_exception.args) + + def test_done_callback_already_cancelled(self): + was_cancelled = None + def fn(callback_future): + nonlocal was_cancelled + was_cancelled = callback_future.cancelled() + + f = Future() + self.assertTrue(f.cancel()) + f.add_done_callback(fn) + self.assertTrue(was_cancelled) def test_repr(self): - self.assertEqual(repr(PENDING_FUTURE), '<Future state=pending>') - self.assertEqual(repr(RUNNING_FUTURE), '<Future state=running>') - self.assertEqual(repr(CANCELLED_FUTURE), '<Future state=cancelled>') - self.assertEqual(repr(CANCELLED_AND_NOTIFIED_FUTURE), - '<Future state=cancelled>') - self.assertEqual(repr(EXCEPTION_FUTURE), - '<Future state=finished raised IOError>') - self.assertEqual(repr(SUCCESSFUL_FUTURE), - '<Future state=finished returned int>') - - create_future + self.assertRegexpMatches(repr(PENDING_FUTURE), + '<Future at 0x[0-9a-f]+ state=pending>') + self.assertRegexpMatches(repr(RUNNING_FUTURE), + '<Future at 0x[0-9a-f]+ state=running>') + self.assertRegexpMatches(repr(CANCELLED_FUTURE), + '<Future at 0x[0-9a-f]+ state=cancelled>') + self.assertRegexpMatches(repr(CANCELLED_AND_NOTIFIED_FUTURE), + '<Future at 0x[0-9a-f]+ state=cancelled>') + self.assertRegexpMatches( + repr(EXCEPTION_FUTURE), + '<Future at 0x[0-9a-f]+ state=finished raised IOError>') + self.assertRegexpMatches( + repr(SUCCESSFUL_FUTURE), + '<Future at 0x[0-9a-f]+ state=finished returned int>') + def test_cancel(self): f1 = create_future(state=PENDING) @@ -590,13 +751,11 @@ class FutureTests(unittest.TestCase): self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42) def test_result_with_success(self): + # TODO(brian@sweetapp.com): This test is timing dependant. def notification(): # Wait until the main thread is waiting for the result. time.sleep(1) - with f1._condition: - f1._state = FINISHED - f1._result = 42 - f1._condition.notify_all() + f1.set_result(42) f1 = create_future(state=PENDING) t = threading.Thread(target=notification) @@ -605,12 +764,11 @@ class FutureTests(unittest.TestCase): self.assertEquals(f1.result(timeout=5), 42) def test_result_with_cancel(self): + # TODO(brian@sweetapp.com): This test is timing dependant. def notification(): # Wait until the main thread is waiting for the result. time.sleep(1) - with f1._condition: - f1._state = CANCELLED - f1._condition.notify_all() + f1.cancel() f1 = create_future(state=PENDING) t = threading.Thread(target=notification) @@ -646,186 +804,16 @@ class FutureTests(unittest.TestCase): self.assertTrue(isinstance(f1.exception(timeout=5), IOError)) -class FutureListTests(unittest.TestCase): - # FutureList.wait() is further tested by WaitsTest. - # FutureList.cancel() is tested by CancelTests. - def test_wait_RETURN_IMMEDIATELY(self): - f = futures.FutureList(futures=None, event_sink=None) - f.wait(return_when=futures.RETURN_IMMEDIATELY) - - def test_wait_timeout(self): - f = futures.FutureList([PENDING_FUTURE], - futures._base.ThreadEventSink()) - - for t in [futures.FIRST_COMPLETED, - futures.FIRST_EXCEPTION, - futures.ALL_COMPLETED]: - f.wait(timeout=0.1, return_when=t) - self.assertFalse(PENDING_FUTURE.done()) - - def test_wait_all_done(self): - f = futures.FutureList([CANCELLED_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - SUCCESSFUL_FUTURE, - EXCEPTION_FUTURE], - futures._base.ThreadEventSink()) - - f.wait(return_when=futures.ALL_COMPLETED) - - def test_filters(self): - fs = [PENDING_FUTURE, - RUNNING_FUTURE, - CANCELLED_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE] - f = futures.FutureList(fs, None) - - self.assertEqual(list(f.running_futures()), [RUNNING_FUTURE]) - self.assertEqual(list(f.cancelled_futures()), - [CANCELLED_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE]) - self.assertEqual(list(f.done_futures()), - [CANCELLED_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE]) - self.assertEqual(list(f.successful_futures()), - [SUCCESSFUL_FUTURE]) - self.assertEqual(list(f.exception_futures()), - [EXCEPTION_FUTURE]) - - def test_has_running_futures(self): - self.assertFalse( - futures.FutureList([PENDING_FUTURE, - CANCELLED_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - SUCCESSFUL_FUTURE, - EXCEPTION_FUTURE], - None).has_running_futures()) - self.assertTrue( - futures.FutureList([RUNNING_FUTURE], - None).has_running_futures()) - - def test_has_cancelled_futures(self): - self.assertFalse( - futures.FutureList([PENDING_FUTURE, - RUNNING_FUTURE, - SUCCESSFUL_FUTURE, - EXCEPTION_FUTURE], - None).has_cancelled_futures()) - self.assertTrue( - futures.FutureList([CANCELLED_FUTURE], - None).has_cancelled_futures()) - - self.assertTrue( - futures.FutureList([CANCELLED_AND_NOTIFIED_FUTURE], - None).has_cancelled_futures()) - - def test_has_done_futures(self): - self.assertFalse( - futures.FutureList([PENDING_FUTURE, - RUNNING_FUTURE], - None).has_done_futures()) - self.assertTrue( - futures.FutureList([CANCELLED_FUTURE], - None).has_done_futures()) - - self.assertTrue( - futures.FutureList([CANCELLED_AND_NOTIFIED_FUTURE], - None).has_done_futures()) - - self.assertTrue( - futures.FutureList([EXCEPTION_FUTURE], - None).has_done_futures()) - - self.assertTrue( - futures.FutureList([SUCCESSFUL_FUTURE], - None).has_done_futures()) - - def test_has_successful_futures(self): - self.assertFalse( - futures.FutureList([PENDING_FUTURE, - RUNNING_FUTURE, - CANCELLED_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE], - None).has_successful_futures()) - - self.assertTrue( - futures.FutureList([SUCCESSFUL_FUTURE], - None).has_successful_futures()) - - def test_has_exception_futures(self): - self.assertFalse( - futures.FutureList([PENDING_FUTURE, - RUNNING_FUTURE, - CANCELLED_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - SUCCESSFUL_FUTURE], - None).has_exception_futures()) - - self.assertTrue( - futures.FutureList([EXCEPTION_FUTURE], - None).has_exception_futures()) - - def test_get_item(self): - fs = [PENDING_FUTURE, RUNNING_FUTURE, CANCELLED_FUTURE] - f = futures.FutureList(fs, None) - self.assertEqual(f[0], PENDING_FUTURE) - self.assertEqual(f[1], RUNNING_FUTURE) - self.assertEqual(f[2], CANCELLED_FUTURE) - self.assertRaises(IndexError, f.__getitem__, 3) - - def test_len(self): - f = futures.FutureList([PENDING_FUTURE, - RUNNING_FUTURE, - CANCELLED_FUTURE], - None) - self.assertEqual(len(f), 3) - - def test_iter(self): - fs = [PENDING_FUTURE, RUNNING_FUTURE, CANCELLED_FUTURE] - f = futures.FutureList(fs, None) - self.assertEqual(list(iter(f)), fs) - - def test_contains(self): - f = futures.FutureList([PENDING_FUTURE, - RUNNING_FUTURE], - None) - self.assertTrue(PENDING_FUTURE in f) - self.assertTrue(RUNNING_FUTURE in f) - self.assertFalse(CANCELLED_FUTURE in f) - - def test_repr(self): - pending = create_future(state=PENDING) - cancelled = create_future(state=CANCELLED) - cancelled2 = create_future(state=CANCELLED_AND_NOTIFIED) - running = create_future(state=RUNNING) - finished = create_future(state=FINISHED) - - f = futures.FutureList( - [PENDING_FUTURE] * 4 + [CANCELLED_FUTURE] * 2 + - [CANCELLED_AND_NOTIFIED_FUTURE] + - [RUNNING_FUTURE] * 2 + - [SUCCESSFUL_FUTURE, EXCEPTION_FUTURE] * 3, - None) - - self.assertEqual(repr(f), - '<FutureList #futures=15 ' - '[#pending=4 #cancelled=3 #running=2 #finished=6]>') - def test_main(): - test.support.run_unittest(ProcessPoolCancelTests, - ThreadPoolCancelTests, - ProcessPoolExecutorTest, + test.support.run_unittest(ProcessPoolExecutorTest, ThreadPoolExecutorTest, ProcessPoolWaitTests, ThreadPoolWaitTests, + ProcessPoolAsCompletedTests, + ThreadPoolAsCompletedTests, FutureTests, - FutureListTests, ProcessPoolShutdownTest, ThreadPoolShutdownTest) if __name__ == "__main__": - test_main()
\ No newline at end of file + test_main() |