diff options
Diffstat (limited to 'python3/test_futures.py')
-rw-r--r-- | python3/test_futures.py | 819 |
1 files changed, 0 insertions, 819 deletions
diff --git a/python3/test_futures.py b/python3/test_futures.py deleted file mode 100644 index 98eea27..0000000 --- a/python3/test_futures.py +++ /dev/null @@ -1,819 +0,0 @@ -import io -import logging -import multiprocessing -import sys -import threading -import test.support -import time -import unittest - -if sys.platform.startswith('win'): - import ctypes - import ctypes.wintypes - -import futures -from futures._base import ( - PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future, - LOGGER, STDERR_HANDLER, wait) -import futures.process - -def create_future(state=PENDING, exception=None, result=None): - f = Future() - f._state = state - f._exception = exception - f._result = result - return f - -PENDING_FUTURE = create_future(state=PENDING) -RUNNING_FUTURE = create_future(state=RUNNING) -CANCELLED_FUTURE = create_future(state=CANCELLED) -CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED) -EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError()) -SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42) - -def mul(x, y): - return x * y - -class Call(object): - """A call that can be submitted to a future.Executor for testing. - - The call signals when it is called and waits for an event before finishing. - """ - CALL_LOCKS = {} - def _create_event(self): - if sys.platform.startswith('win'): - class SECURITY_ATTRIBUTES(ctypes.Structure): - _fields_ = [("nLength", ctypes.wintypes.DWORD), - ("lpSecurityDescriptor", ctypes.wintypes.LPVOID), - ("bInheritHandle", ctypes.wintypes.BOOL)] - - s = SECURITY_ATTRIBUTES() - s.nLength = ctypes.sizeof(s) - s.lpSecurityDescriptor = None - s.bInheritHandle = True - - handle = ctypes.windll.kernel32.CreateEventA(ctypes.pointer(s), - True, - False, - None) - assert handle is not None - return handle - else: - event = multiprocessing.Event() - self.CALL_LOCKS[id(event)] = event - return id(event) - - def _wait_on_event(self, handle): - if sys.platform.startswith('win'): - r = ctypes.windll.kernel32.WaitForSingleObject(handle, 5 * 1000) - assert r == 0 - else: - self.CALL_LOCKS[handle].wait() - - def _signal_event(self, handle): - if sys.platform.startswith('win'): - r = ctypes.windll.kernel32.SetEvent(handle) - assert r != 0 - else: - self.CALL_LOCKS[handle].set() - - def __init__(self, manual_finish=False, result=42): - self._called_event = self._create_event() - self._can_finish = self._create_event() - - self._result = result - - if not manual_finish: - self._signal_event(self._can_finish) - - def wait_on_called(self): - self._wait_on_event(self._called_event) - - def set_can(self): - self._signal_event(self._can_finish) - - def __call__(self): - self._signal_event(self._called_event) - self._wait_on_event(self._can_finish) - - return self._result - - def close(self): - self.set_can() - if sys.platform.startswith('win'): - ctypes.windll.kernel32.CloseHandle(self._called_event) - ctypes.windll.kernel32.CloseHandle(self._can_finish) - else: - del self.CALL_LOCKS[self._called_event] - del self.CALL_LOCKS[self._can_finish] - -class ExceptionCall(Call): - def __call__(self): - self._signal_event(self._called_event) - self._wait_on_event(self._can_finish) - raise ZeroDivisionError() - -class MapCall(Call): - def __init__(self, result=42): - super().__init__(manual_finish=True, result=result) - - def __call__(self, manual_finish): - if manual_finish: - super().__call__() - return self._result - -class ExecutorShutdownTest(unittest.TestCase): - def test_run_after_shutdown(self): - self.executor.shutdown() - self.assertRaises(RuntimeError, - self.executor.submit, - pow, 2, 5) - - - def _start_some_futures(self): - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) - call3 = Call(manual_finish=True) - - try: - self.executor.submit(call1) - self.executor.submit(call2) - self.executor.submit(call3) - - call1.wait_on_called() - call2.wait_on_called() - call3.wait_on_called() - - call1.set_can() - call2.set_can() - call3.set_can() - finally: - call1.close() - call2.close() - call3.close() - -class ThreadPoolShutdownTest(ExecutorShutdownTest): - def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_workers=5) - - def tearDown(self): - self.executor.shutdown(wait=True) - - def test_threads_terminate(self): - self._start_some_futures() - self.assertEqual(len(self.executor._threads), 3) - self.executor.shutdown() - for t in self.executor._threads: - t.join() - - def test_context_manager_shutdown(self): - with futures.ThreadPoolExecutor(max_workers=5) as e: - executor = e - self.assertEqual(list(e.map(abs, range(-5, 5))), - [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) - - for t in executor._threads: - t.join() - - def test_del_shutdown(self): - executor = futures.ThreadPoolExecutor(max_workers=5) - executor.map(abs, range(-5, 5)) - threads = executor._threads - del executor - - for t in threads: - t.join() - -class ProcessPoolShutdownTest(ExecutorShutdownTest): - def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_workers=5) - - def tearDown(self): - self.executor.shutdown(wait=True) - - def test_processes_terminate(self): - self._start_some_futures() - self.assertEqual(len(self.executor._processes), 5) - processes = self.executor._processes - self.executor.shutdown() - - for p in processes: - p.join() - - def test_context_manager_shutdown(self): - with futures.ProcessPoolExecutor(max_workers=5) as e: - executor = e - self.assertEqual(list(e.map(abs, range(-5, 5))), - [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) - - for p in self.executor._processes: - p.join() - - def test_del_shutdown(self): - executor = futures.ProcessPoolExecutor(max_workers=5) - list(executor.map(abs, range(-5, 5))) - queue_management_thread = executor._queue_management_thread - processes = executor._processes - del executor - - queue_management_thread.join() - for p in processes: - p.join() - -class WaitTests(unittest.TestCase): - def test_first_completed(self): - def wait_test(): - while not future1._waiters: - pass - call1.set_can() - - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - - t = threading.Thread(target=wait_test) - t.start() - done, not_done = futures.wait( - [CANCELLED_FUTURE, future1, future2], - return_when=futures.FIRST_COMPLETED) - - self.assertEquals(set([future1]), done) - self.assertEquals(set([CANCELLED_FUTURE, future2]), not_done) - finally: - call1.close() - call2.close() - - def test_first_completed_one_already_completed(self): - call1 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - - finished, pending = futures.wait( - [SUCCESSFUL_FUTURE, future1], - return_when=futures.FIRST_COMPLETED) - - self.assertEquals(set([SUCCESSFUL_FUTURE]), finished) - self.assertEquals(set([future1]), pending) - finally: - call1.close() - - def test_first_exception(self): - def wait_test(): - while not future1._waiters: - pass - call1.set_can() - call2.set_can() - - call1 = Call(manual_finish=True) - call2 = ExceptionCall(manual_finish=True) - call3 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - future3 = self.executor.submit(call3) - - t = threading.Thread(target=wait_test) - t.start() - finished, pending = futures.wait( - [future1, future2, future3], - return_when=futures.FIRST_EXCEPTION) - - self.assertEquals(set([future1, future2]), finished) - self.assertEquals(set([future3]), pending) - finally: - call1.close() - call2.close() - call3.close() - - def test_first_exception_some_already_complete(self): - def wait_test(): - while not future1._waiters: - pass - call1.set_can() - - call1 = ExceptionCall(manual_finish=True) - call2 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - - t = threading.Thread(target=wait_test) - t.start() - finished, pending = futures.wait( - [SUCCESSFUL_FUTURE, - CANCELLED_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - future1, future2], - return_when=futures.FIRST_EXCEPTION) - - self.assertEquals(set([SUCCESSFUL_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - future1]), finished) - self.assertEquals(set([CANCELLED_FUTURE, future2]), pending) - - - finally: - call1.close() - call2.close() - - def test_first_exception_one_already_failed(self): - call1 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - - finished, pending = futures.wait( - [EXCEPTION_FUTURE, future1], - return_when=futures.FIRST_EXCEPTION) - - self.assertEquals(set([EXCEPTION_FUTURE]), finished) - self.assertEquals(set([future1]), pending) - finally: - call1.close() - - def test_all_completed(self): - def wait_test(): - while not future1._waiters: - pass - call1.set_can() - call2.set_can() - - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - - t = threading.Thread(target=wait_test) - t.start() - finished, pending = futures.wait( - [future1, future2], - return_when=futures.ALL_COMPLETED) - - self.assertEquals(set([future1, future2]), finished) - self.assertEquals(set(), pending) - - - finally: - call1.close() - call2.close() - - def test_all_completed_some_already_completed(self): - def wait_test(): - while not future1._waiters: - pass - - future4.cancel() - call1.set_can() - call2.set_can() - call3.set_can() - - self.assertLessEqual( - futures.process.EXTRA_QUEUED_CALLS, - 1, - 'this test assumes that future4 will be cancelled before it is ' - 'queued to run - which might not be the case if ' - 'ProcessPoolExecutor is too aggresive in scheduling futures') - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) - call3 = Call(manual_finish=True) - call4 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - future3 = self.executor.submit(call3) - future4 = self.executor.submit(call4) - - t = threading.Thread(target=wait_test) - t.start() - finished, pending = futures.wait( - [SUCCESSFUL_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - future1, future2, future3, future4], - return_when=futures.ALL_COMPLETED) - - self.assertEquals(set([SUCCESSFUL_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - future1, future2, future3, future4]), - finished) - self.assertEquals(set(), pending) - finally: - call1.close() - call2.close() - call3.close() - call4.close() - - def test_timeout(self): - def wait_test(): - while not future1._waiters: - pass - call1.set_can() - - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - - t = threading.Thread(target=wait_test) - t.start() - finished, pending = futures.wait( - [CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE, - future1, future2], - timeout=1, - return_when=futures.ALL_COMPLETED) - - self.assertEquals(set([CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE, - future1]), finished) - self.assertEquals(set([future2]), pending) - - - finally: - call1.close() - call2.close() - - -class ThreadPoolWaitTests(WaitTests): - def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_workers=1) - - def tearDown(self): - self.executor.shutdown(wait=True) - -class ProcessPoolWaitTests(WaitTests): - def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_workers=1) - - def tearDown(self): - self.executor.shutdown(wait=True) - -class AsCompletedTests(unittest.TestCase): - # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout. - def test_no_timeout(self): - def wait_test(): - while not future1._waiters: - pass - call1.set_can() - call2.set_can() - - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - - t = threading.Thread(target=wait_test) - t.start() - completed = set(futures.as_completed( - [CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE, - future1, future2])) - self.assertEquals(set( - [CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE, - future1, future2]), - completed) - finally: - call1.close() - call2.close() - - def test_zero_timeout(self): - call1 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - completed_futures = set() - try: - for future in futures.as_completed( - [CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE, - future1], - timeout=0): - completed_futures.add(future) - except futures.TimeoutError: - pass - - self.assertEquals(set([CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE]), - completed_futures) - finally: - call1.close() - -class ThreadPoolAsCompletedTests(AsCompletedTests): - def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_workers=1) - - def tearDown(self): - self.executor.shutdown(wait=True) - -class ProcessPoolAsCompletedTests(AsCompletedTests): - def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_workers=1) - - def tearDown(self): - self.executor.shutdown(wait=True) - -class ExecutorTest(unittest.TestCase): - # Executor.shutdown() and context manager usage is tested by - # ExecutorShutdownTest. - def test_submit(self): - future = self.executor.submit(pow, 2, 8) - self.assertEquals(256, future.result()) - - def test_submit_keyword(self): - future = self.executor.submit(mul, 2, y=8) - self.assertEquals(16, future.result()) - - def test_map(self): - self.assertEqual( - list(self.executor.map(pow, range(10), range(10))), - list(map(pow, range(10), range(10)))) - - def test_map_exception(self): - i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5]) - self.assertEqual(i.__next__(), (0, 1)) - self.assertEqual(i.__next__(), (0, 1)) - self.assertRaises(ZeroDivisionError, i.__next__) - - def test_map_timeout(self): - results = [] - timeout_call = MapCall() - try: - try: - for i in self.executor.map(timeout_call, - [False, False, True], - timeout=1): - results.append(i) - except futures.TimeoutError: - pass - else: - self.fail('expected TimeoutError') - finally: - timeout_call.close() - - self.assertEquals([42, 42], results) - -class ThreadPoolExecutorTest(ExecutorTest): - def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_workers=1) - - def tearDown(self): - self.executor.shutdown(wait=True) - -class ProcessPoolExecutorTest(ExecutorTest): - def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_workers=1) - - def tearDown(self): - self.executor.shutdown(wait=True) - -class FutureTests(unittest.TestCase): - def test_done_callback_with_result(self): - callback_result = None - def fn(callback_future): - nonlocal callback_result - callback_result = callback_future.result() - - f = Future() - f.add_done_callback(fn) - f.set_result(5) - self.assertEquals(5, callback_result) - - def test_done_callback_with_exception(self): - callback_exception = None - def fn(callback_future): - nonlocal callback_exception - callback_exception = callback_future.exception() - - f = Future() - f.add_done_callback(fn) - f.set_exception(Exception('test')) - self.assertEquals(('test',), callback_exception.args) - - def test_done_callback_with_cancel(self): - was_cancelled = None - def fn(callback_future): - nonlocal was_cancelled - was_cancelled = callback_future.cancelled() - - f = Future() - f.add_done_callback(fn) - self.assertTrue(f.cancel()) - self.assertTrue(was_cancelled) - - def test_done_callback_raises(self): - LOGGER.removeHandler(STDERR_HANDLER) - logging_stream = io.StringIO() - handler = logging.StreamHandler(logging_stream) - LOGGER.addHandler(handler) - try: - raising_was_called = False - fn_was_called = False - - def raising_fn(callback_future): - nonlocal raising_was_called - raising_was_called = True - raise Exception('doh!') - - def fn(callback_future): - nonlocal fn_was_called - fn_was_called = True - - f = Future() - f.add_done_callback(raising_fn) - f.add_done_callback(fn) - f.set_result(5) - self.assertTrue(raising_was_called) - self.assertTrue(fn_was_called) - self.assertIn('Exception: doh!', logging_stream.getvalue()) - finally: - LOGGER.removeHandler(handler) - LOGGER.addHandler(STDERR_HANDLER) - - def test_done_callback_already_successful(self): - callback_result = None - def fn(callback_future): - nonlocal callback_result - callback_result = callback_future.result() - - f = Future() - f.set_result(5) - f.add_done_callback(fn) - self.assertEquals(5, callback_result) - - def test_done_callback_already_failed(self): - callback_exception = None - def fn(callback_future): - nonlocal callback_exception - callback_exception = callback_future.exception() - - f = Future() - f.set_exception(Exception('test')) - f.add_done_callback(fn) - self.assertEquals(('test',), callback_exception.args) - - def test_done_callback_already_cancelled(self): - was_cancelled = None - def fn(callback_future): - nonlocal was_cancelled - was_cancelled = callback_future.cancelled() - - f = Future() - self.assertTrue(f.cancel()) - f.add_done_callback(fn) - self.assertTrue(was_cancelled) - - def test_repr(self): - self.assertRegexpMatches(repr(PENDING_FUTURE), - '<Future at 0x[0-9a-f]+ state=pending>') - self.assertRegexpMatches(repr(RUNNING_FUTURE), - '<Future at 0x[0-9a-f]+ state=running>') - self.assertRegexpMatches(repr(CANCELLED_FUTURE), - '<Future at 0x[0-9a-f]+ state=cancelled>') - self.assertRegexpMatches(repr(CANCELLED_AND_NOTIFIED_FUTURE), - '<Future at 0x[0-9a-f]+ state=cancelled>') - self.assertRegexpMatches( - repr(EXCEPTION_FUTURE), - '<Future at 0x[0-9a-f]+ state=finished raised IOError>') - self.assertRegexpMatches( - repr(SUCCESSFUL_FUTURE), - '<Future at 0x[0-9a-f]+ state=finished returned int>') - - - def test_cancel(self): - f1 = create_future(state=PENDING) - f2 = create_future(state=RUNNING) - f3 = create_future(state=CANCELLED) - f4 = create_future(state=CANCELLED_AND_NOTIFIED) - f5 = create_future(state=FINISHED, exception=IOError()) - f6 = create_future(state=FINISHED, result=5) - - self.assertTrue(f1.cancel()) - self.assertEquals(f1._state, CANCELLED) - - self.assertFalse(f2.cancel()) - self.assertEquals(f2._state, RUNNING) - - self.assertTrue(f3.cancel()) - self.assertEquals(f3._state, CANCELLED) - - self.assertTrue(f4.cancel()) - self.assertEquals(f4._state, CANCELLED_AND_NOTIFIED) - - self.assertFalse(f5.cancel()) - self.assertEquals(f5._state, FINISHED) - - self.assertFalse(f6.cancel()) - self.assertEquals(f6._state, FINISHED) - - def test_cancelled(self): - self.assertFalse(PENDING_FUTURE.cancelled()) - self.assertFalse(RUNNING_FUTURE.cancelled()) - self.assertTrue(CANCELLED_FUTURE.cancelled()) - self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled()) - self.assertFalse(EXCEPTION_FUTURE.cancelled()) - self.assertFalse(SUCCESSFUL_FUTURE.cancelled()) - - def test_done(self): - self.assertFalse(PENDING_FUTURE.done()) - self.assertFalse(RUNNING_FUTURE.done()) - self.assertTrue(CANCELLED_FUTURE.done()) - self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done()) - self.assertTrue(EXCEPTION_FUTURE.done()) - self.assertTrue(SUCCESSFUL_FUTURE.done()) - - def test_running(self): - self.assertFalse(PENDING_FUTURE.running()) - self.assertTrue(RUNNING_FUTURE.running()) - self.assertFalse(CANCELLED_FUTURE.running()) - self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running()) - self.assertFalse(EXCEPTION_FUTURE.running()) - self.assertFalse(SUCCESSFUL_FUTURE.running()) - - def test_result_with_timeout(self): - self.assertRaises(futures.TimeoutError, - PENDING_FUTURE.result, timeout=0) - self.assertRaises(futures.TimeoutError, - RUNNING_FUTURE.result, timeout=0) - self.assertRaises(futures.CancelledError, - CANCELLED_FUTURE.result, timeout=0) - self.assertRaises(futures.CancelledError, - CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0) - self.assertRaises(IOError, EXCEPTION_FUTURE.result, timeout=0) - self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42) - - def test_result_with_success(self): - # TODO(brian@sweetapp.com): This test is timing dependant. - def notification(): - # Wait until the main thread is waiting for the result. - time.sleep(1) - f1.set_result(42) - - f1 = create_future(state=PENDING) - t = threading.Thread(target=notification) - t.start() - - self.assertEquals(f1.result(timeout=5), 42) - - def test_result_with_cancel(self): - # TODO(brian@sweetapp.com): This test is timing dependant. - def notification(): - # Wait until the main thread is waiting for the result. - time.sleep(1) - f1.cancel() - - f1 = create_future(state=PENDING) - t = threading.Thread(target=notification) - t.start() - - self.assertRaises(futures.CancelledError, f1.result, timeout=5) - - def test_exception_with_timeout(self): - self.assertRaises(futures.TimeoutError, - PENDING_FUTURE.exception, timeout=0) - self.assertRaises(futures.TimeoutError, - RUNNING_FUTURE.exception, timeout=0) - self.assertRaises(futures.CancelledError, - CANCELLED_FUTURE.exception, timeout=0) - self.assertRaises(futures.CancelledError, - CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0) - self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0), - IOError)) - self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None) - - def test_exception_with_success(self): - def notification(): - # Wait until the main thread is waiting for the exception. - time.sleep(1) - with f1._condition: - f1._state = FINISHED - f1._exception = IOError() - f1._condition.notify_all() - - f1 = create_future(state=PENDING) - t = threading.Thread(target=notification) - t.start() - - self.assertTrue(isinstance(f1.exception(timeout=5), IOError)) - -def test_main(): - test.support.run_unittest(ProcessPoolExecutorTest, - ThreadPoolExecutorTest, - ProcessPoolWaitTests, - ThreadPoolWaitTests, - ProcessPoolAsCompletedTests, - ThreadPoolAsCompletedTests, - FutureTests, - ProcessPoolShutdownTest, - ThreadPoolShutdownTest) - -if __name__ == "__main__": - test_main() |