summaryrefslogtreecommitdiff
path: root/python3/test_futures.py
diff options
context:
space:
mode:
Diffstat (limited to 'python3/test_futures.py')
-rw-r--r--python3/test_futures.py819
1 files changed, 0 insertions, 819 deletions
diff --git a/python3/test_futures.py b/python3/test_futures.py
deleted file mode 100644
index 98eea27..0000000
--- a/python3/test_futures.py
+++ /dev/null
@@ -1,819 +0,0 @@
-import io
-import logging
-import multiprocessing
-import sys
-import threading
-import test.support
-import time
-import unittest
-
-if sys.platform.startswith('win'):
- import ctypes
- import ctypes.wintypes
-
-import futures
-from futures._base import (
- PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future,
- LOGGER, STDERR_HANDLER, wait)
-import futures.process
-
-def create_future(state=PENDING, exception=None, result=None):
- f = Future()
- f._state = state
- f._exception = exception
- f._result = result
- return f
-
-PENDING_FUTURE = create_future(state=PENDING)
-RUNNING_FUTURE = create_future(state=RUNNING)
-CANCELLED_FUTURE = create_future(state=CANCELLED)
-CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
-EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError())
-SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
-
-def mul(x, y):
- return x * y
-
-class Call(object):
- """A call that can be submitted to a future.Executor for testing.
-
- The call signals when it is called and waits for an event before finishing.
- """
- CALL_LOCKS = {}
- def _create_event(self):
- if sys.platform.startswith('win'):
- class SECURITY_ATTRIBUTES(ctypes.Structure):
- _fields_ = [("nLength", ctypes.wintypes.DWORD),
- ("lpSecurityDescriptor", ctypes.wintypes.LPVOID),
- ("bInheritHandle", ctypes.wintypes.BOOL)]
-
- s = SECURITY_ATTRIBUTES()
- s.nLength = ctypes.sizeof(s)
- s.lpSecurityDescriptor = None
- s.bInheritHandle = True
-
- handle = ctypes.windll.kernel32.CreateEventA(ctypes.pointer(s),
- True,
- False,
- None)
- assert handle is not None
- return handle
- else:
- event = multiprocessing.Event()
- self.CALL_LOCKS[id(event)] = event
- return id(event)
-
- def _wait_on_event(self, handle):
- if sys.platform.startswith('win'):
- r = ctypes.windll.kernel32.WaitForSingleObject(handle, 5 * 1000)
- assert r == 0
- else:
- self.CALL_LOCKS[handle].wait()
-
- def _signal_event(self, handle):
- if sys.platform.startswith('win'):
- r = ctypes.windll.kernel32.SetEvent(handle)
- assert r != 0
- else:
- self.CALL_LOCKS[handle].set()
-
- def __init__(self, manual_finish=False, result=42):
- self._called_event = self._create_event()
- self._can_finish = self._create_event()
-
- self._result = result
-
- if not manual_finish:
- self._signal_event(self._can_finish)
-
- def wait_on_called(self):
- self._wait_on_event(self._called_event)
-
- def set_can(self):
- self._signal_event(self._can_finish)
-
- def __call__(self):
- self._signal_event(self._called_event)
- self._wait_on_event(self._can_finish)
-
- return self._result
-
- def close(self):
- self.set_can()
- if sys.platform.startswith('win'):
- ctypes.windll.kernel32.CloseHandle(self._called_event)
- ctypes.windll.kernel32.CloseHandle(self._can_finish)
- else:
- del self.CALL_LOCKS[self._called_event]
- del self.CALL_LOCKS[self._can_finish]
-
-class ExceptionCall(Call):
- def __call__(self):
- self._signal_event(self._called_event)
- self._wait_on_event(self._can_finish)
- raise ZeroDivisionError()
-
-class MapCall(Call):
- def __init__(self, result=42):
- super().__init__(manual_finish=True, result=result)
-
- def __call__(self, manual_finish):
- if manual_finish:
- super().__call__()
- return self._result
-
-class ExecutorShutdownTest(unittest.TestCase):
- def test_run_after_shutdown(self):
- self.executor.shutdown()
- self.assertRaises(RuntimeError,
- self.executor.submit,
- pow, 2, 5)
-
-
- def _start_some_futures(self):
- call1 = Call(manual_finish=True)
- call2 = Call(manual_finish=True)
- call3 = Call(manual_finish=True)
-
- try:
- self.executor.submit(call1)
- self.executor.submit(call2)
- self.executor.submit(call3)
-
- call1.wait_on_called()
- call2.wait_on_called()
- call3.wait_on_called()
-
- call1.set_can()
- call2.set_can()
- call3.set_can()
- finally:
- call1.close()
- call2.close()
- call3.close()
-
-class ThreadPoolShutdownTest(ExecutorShutdownTest):
- def setUp(self):
- self.executor = futures.ThreadPoolExecutor(max_workers=5)
-
- def tearDown(self):
- self.executor.shutdown(wait=True)
-
- def test_threads_terminate(self):
- self._start_some_futures()
- self.assertEqual(len(self.executor._threads), 3)
- self.executor.shutdown()
- for t in self.executor._threads:
- t.join()
-
- def test_context_manager_shutdown(self):
- with futures.ThreadPoolExecutor(max_workers=5) as e:
- executor = e
- self.assertEqual(list(e.map(abs, range(-5, 5))),
- [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
-
- for t in executor._threads:
- t.join()
-
- def test_del_shutdown(self):
- executor = futures.ThreadPoolExecutor(max_workers=5)
- executor.map(abs, range(-5, 5))
- threads = executor._threads
- del executor
-
- for t in threads:
- t.join()
-
-class ProcessPoolShutdownTest(ExecutorShutdownTest):
- def setUp(self):
- self.executor = futures.ProcessPoolExecutor(max_workers=5)
-
- def tearDown(self):
- self.executor.shutdown(wait=True)
-
- def test_processes_terminate(self):
- self._start_some_futures()
- self.assertEqual(len(self.executor._processes), 5)
- processes = self.executor._processes
- self.executor.shutdown()
-
- for p in processes:
- p.join()
-
- def test_context_manager_shutdown(self):
- with futures.ProcessPoolExecutor(max_workers=5) as e:
- executor = e
- self.assertEqual(list(e.map(abs, range(-5, 5))),
- [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
-
- for p in self.executor._processes:
- p.join()
-
- def test_del_shutdown(self):
- executor = futures.ProcessPoolExecutor(max_workers=5)
- list(executor.map(abs, range(-5, 5)))
- queue_management_thread = executor._queue_management_thread
- processes = executor._processes
- del executor
-
- queue_management_thread.join()
- for p in processes:
- p.join()
-
-class WaitTests(unittest.TestCase):
- def test_first_completed(self):
- def wait_test():
- while not future1._waiters:
- pass
- call1.set_can()
-
- call1 = Call(manual_finish=True)
- call2 = Call(manual_finish=True)
- try:
- future1 = self.executor.submit(call1)
- future2 = self.executor.submit(call2)
-
- t = threading.Thread(target=wait_test)
- t.start()
- done, not_done = futures.wait(
- [CANCELLED_FUTURE, future1, future2],
- return_when=futures.FIRST_COMPLETED)
-
- self.assertEquals(set([future1]), done)
- self.assertEquals(set([CANCELLED_FUTURE, future2]), not_done)
- finally:
- call1.close()
- call2.close()
-
- def test_first_completed_one_already_completed(self):
- call1 = Call(manual_finish=True)
- try:
- future1 = self.executor.submit(call1)
-
- finished, pending = futures.wait(
- [SUCCESSFUL_FUTURE, future1],
- return_when=futures.FIRST_COMPLETED)
-
- self.assertEquals(set([SUCCESSFUL_FUTURE]), finished)
- self.assertEquals(set([future1]), pending)
- finally:
- call1.close()
-
- def test_first_exception(self):
- def wait_test():
- while not future1._waiters:
- pass
- call1.set_can()
- call2.set_can()
-
- call1 = Call(manual_finish=True)
- call2 = ExceptionCall(manual_finish=True)
- call3 = Call(manual_finish=True)
- try:
- future1 = self.executor.submit(call1)
- future2 = self.executor.submit(call2)
- future3 = self.executor.submit(call3)
-
- t = threading.Thread(target=wait_test)
- t.start()
- finished, pending = futures.wait(
- [future1, future2, future3],
- return_when=futures.FIRST_EXCEPTION)
-
- self.assertEquals(set([future1, future2]), finished)
- self.assertEquals(set([future3]), pending)
- finally:
- call1.close()
- call2.close()
- call3.close()
-
- def test_first_exception_some_already_complete(self):
- def wait_test():
- while not future1._waiters:
- pass
- call1.set_can()
-
- call1 = ExceptionCall(manual_finish=True)
- call2 = Call(manual_finish=True)
- try:
- future1 = self.executor.submit(call1)
- future2 = self.executor.submit(call2)
-
- t = threading.Thread(target=wait_test)
- t.start()
- finished, pending = futures.wait(
- [SUCCESSFUL_FUTURE,
- CANCELLED_FUTURE,
- CANCELLED_AND_NOTIFIED_FUTURE,
- future1, future2],
- return_when=futures.FIRST_EXCEPTION)
-
- self.assertEquals(set([SUCCESSFUL_FUTURE,
- CANCELLED_AND_NOTIFIED_FUTURE,
- future1]), finished)
- self.assertEquals(set([CANCELLED_FUTURE, future2]), pending)
-
-
- finally:
- call1.close()
- call2.close()
-
- def test_first_exception_one_already_failed(self):
- call1 = Call(manual_finish=True)
- try:
- future1 = self.executor.submit(call1)
-
- finished, pending = futures.wait(
- [EXCEPTION_FUTURE, future1],
- return_when=futures.FIRST_EXCEPTION)
-
- self.assertEquals(set([EXCEPTION_FUTURE]), finished)
- self.assertEquals(set([future1]), pending)
- finally:
- call1.close()
-
- def test_all_completed(self):
- def wait_test():
- while not future1._waiters:
- pass
- call1.set_can()
- call2.set_can()
-
- call1 = Call(manual_finish=True)
- call2 = Call(manual_finish=True)
- try:
- future1 = self.executor.submit(call1)
- future2 = self.executor.submit(call2)
-
- t = threading.Thread(target=wait_test)
- t.start()
- finished, pending = futures.wait(
- [future1, future2],
- return_when=futures.ALL_COMPLETED)
-
- self.assertEquals(set([future1, future2]), finished)
- self.assertEquals(set(), pending)
-
-
- finally:
- call1.close()
- call2.close()
-
- def test_all_completed_some_already_completed(self):
- def wait_test():
- while not future1._waiters:
- pass
-
- future4.cancel()
- call1.set_can()
- call2.set_can()
- call3.set_can()
-
- self.assertLessEqual(
- futures.process.EXTRA_QUEUED_CALLS,
- 1,
- 'this test assumes that future4 will be cancelled before it is '
- 'queued to run - which might not be the case if '
- 'ProcessPoolExecutor is too aggresive in scheduling futures')
- call1 = Call(manual_finish=True)
- call2 = Call(manual_finish=True)
- call3 = Call(manual_finish=True)
- call4 = Call(manual_finish=True)
- try:
- future1 = self.executor.submit(call1)
- future2 = self.executor.submit(call2)
- future3 = self.executor.submit(call3)
- future4 = self.executor.submit(call4)
-
- t = threading.Thread(target=wait_test)
- t.start()
- finished, pending = futures.wait(
- [SUCCESSFUL_FUTURE,
- CANCELLED_AND_NOTIFIED_FUTURE,
- future1, future2, future3, future4],
- return_when=futures.ALL_COMPLETED)
-
- self.assertEquals(set([SUCCESSFUL_FUTURE,
- CANCELLED_AND_NOTIFIED_FUTURE,
- future1, future2, future3, future4]),
- finished)
- self.assertEquals(set(), pending)
- finally:
- call1.close()
- call2.close()
- call3.close()
- call4.close()
-
- def test_timeout(self):
- def wait_test():
- while not future1._waiters:
- pass
- call1.set_can()
-
- call1 = Call(manual_finish=True)
- call2 = Call(manual_finish=True)
- try:
- future1 = self.executor.submit(call1)
- future2 = self.executor.submit(call2)
-
- t = threading.Thread(target=wait_test)
- t.start()
- finished, pending = futures.wait(
- [CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- SUCCESSFUL_FUTURE,
- future1, future2],
- timeout=1,
- return_when=futures.ALL_COMPLETED)
-
- self.assertEquals(set([CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- SUCCESSFUL_FUTURE,
- future1]), finished)
- self.assertEquals(set([future2]), pending)
-
-
- finally:
- call1.close()
- call2.close()
-
-
-class ThreadPoolWaitTests(WaitTests):
- def setUp(self):
- self.executor = futures.ThreadPoolExecutor(max_workers=1)
-
- def tearDown(self):
- self.executor.shutdown(wait=True)
-
-class ProcessPoolWaitTests(WaitTests):
- def setUp(self):
- self.executor = futures.ProcessPoolExecutor(max_workers=1)
-
- def tearDown(self):
- self.executor.shutdown(wait=True)
-
-class AsCompletedTests(unittest.TestCase):
- # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.
- def test_no_timeout(self):
- def wait_test():
- while not future1._waiters:
- pass
- call1.set_can()
- call2.set_can()
-
- call1 = Call(manual_finish=True)
- call2 = Call(manual_finish=True)
- try:
- future1 = self.executor.submit(call1)
- future2 = self.executor.submit(call2)
-
- t = threading.Thread(target=wait_test)
- t.start()
- completed = set(futures.as_completed(
- [CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- SUCCESSFUL_FUTURE,
- future1, future2]))
- self.assertEquals(set(
- [CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- SUCCESSFUL_FUTURE,
- future1, future2]),
- completed)
- finally:
- call1.close()
- call2.close()
-
- def test_zero_timeout(self):
- call1 = Call(manual_finish=True)
- try:
- future1 = self.executor.submit(call1)
- completed_futures = set()
- try:
- for future in futures.as_completed(
- [CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- SUCCESSFUL_FUTURE,
- future1],
- timeout=0):
- completed_futures.add(future)
- except futures.TimeoutError:
- pass
-
- self.assertEquals(set([CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- SUCCESSFUL_FUTURE]),
- completed_futures)
- finally:
- call1.close()
-
-class ThreadPoolAsCompletedTests(AsCompletedTests):
- def setUp(self):
- self.executor = futures.ThreadPoolExecutor(max_workers=1)
-
- def tearDown(self):
- self.executor.shutdown(wait=True)
-
-class ProcessPoolAsCompletedTests(AsCompletedTests):
- def setUp(self):
- self.executor = futures.ProcessPoolExecutor(max_workers=1)
-
- def tearDown(self):
- self.executor.shutdown(wait=True)
-
-class ExecutorTest(unittest.TestCase):
- # Executor.shutdown() and context manager usage is tested by
- # ExecutorShutdownTest.
- def test_submit(self):
- future = self.executor.submit(pow, 2, 8)
- self.assertEquals(256, future.result())
-
- def test_submit_keyword(self):
- future = self.executor.submit(mul, 2, y=8)
- self.assertEquals(16, future.result())
-
- def test_map(self):
- self.assertEqual(
- list(self.executor.map(pow, range(10), range(10))),
- list(map(pow, range(10), range(10))))
-
- def test_map_exception(self):
- i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
- self.assertEqual(i.__next__(), (0, 1))
- self.assertEqual(i.__next__(), (0, 1))
- self.assertRaises(ZeroDivisionError, i.__next__)
-
- def test_map_timeout(self):
- results = []
- timeout_call = MapCall()
- try:
- try:
- for i in self.executor.map(timeout_call,
- [False, False, True],
- timeout=1):
- results.append(i)
- except futures.TimeoutError:
- pass
- else:
- self.fail('expected TimeoutError')
- finally:
- timeout_call.close()
-
- self.assertEquals([42, 42], results)
-
-class ThreadPoolExecutorTest(ExecutorTest):
- def setUp(self):
- self.executor = futures.ThreadPoolExecutor(max_workers=1)
-
- def tearDown(self):
- self.executor.shutdown(wait=True)
-
-class ProcessPoolExecutorTest(ExecutorTest):
- def setUp(self):
- self.executor = futures.ProcessPoolExecutor(max_workers=1)
-
- def tearDown(self):
- self.executor.shutdown(wait=True)
-
-class FutureTests(unittest.TestCase):
- def test_done_callback_with_result(self):
- callback_result = None
- def fn(callback_future):
- nonlocal callback_result
- callback_result = callback_future.result()
-
- f = Future()
- f.add_done_callback(fn)
- f.set_result(5)
- self.assertEquals(5, callback_result)
-
- def test_done_callback_with_exception(self):
- callback_exception = None
- def fn(callback_future):
- nonlocal callback_exception
- callback_exception = callback_future.exception()
-
- f = Future()
- f.add_done_callback(fn)
- f.set_exception(Exception('test'))
- self.assertEquals(('test',), callback_exception.args)
-
- def test_done_callback_with_cancel(self):
- was_cancelled = None
- def fn(callback_future):
- nonlocal was_cancelled
- was_cancelled = callback_future.cancelled()
-
- f = Future()
- f.add_done_callback(fn)
- self.assertTrue(f.cancel())
- self.assertTrue(was_cancelled)
-
- def test_done_callback_raises(self):
- LOGGER.removeHandler(STDERR_HANDLER)
- logging_stream = io.StringIO()
- handler = logging.StreamHandler(logging_stream)
- LOGGER.addHandler(handler)
- try:
- raising_was_called = False
- fn_was_called = False
-
- def raising_fn(callback_future):
- nonlocal raising_was_called
- raising_was_called = True
- raise Exception('doh!')
-
- def fn(callback_future):
- nonlocal fn_was_called
- fn_was_called = True
-
- f = Future()
- f.add_done_callback(raising_fn)
- f.add_done_callback(fn)
- f.set_result(5)
- self.assertTrue(raising_was_called)
- self.assertTrue(fn_was_called)
- self.assertIn('Exception: doh!', logging_stream.getvalue())
- finally:
- LOGGER.removeHandler(handler)
- LOGGER.addHandler(STDERR_HANDLER)
-
- def test_done_callback_already_successful(self):
- callback_result = None
- def fn(callback_future):
- nonlocal callback_result
- callback_result = callback_future.result()
-
- f = Future()
- f.set_result(5)
- f.add_done_callback(fn)
- self.assertEquals(5, callback_result)
-
- def test_done_callback_already_failed(self):
- callback_exception = None
- def fn(callback_future):
- nonlocal callback_exception
- callback_exception = callback_future.exception()
-
- f = Future()
- f.set_exception(Exception('test'))
- f.add_done_callback(fn)
- self.assertEquals(('test',), callback_exception.args)
-
- def test_done_callback_already_cancelled(self):
- was_cancelled = None
- def fn(callback_future):
- nonlocal was_cancelled
- was_cancelled = callback_future.cancelled()
-
- f = Future()
- self.assertTrue(f.cancel())
- f.add_done_callback(fn)
- self.assertTrue(was_cancelled)
-
- def test_repr(self):
- self.assertRegexpMatches(repr(PENDING_FUTURE),
- '<Future at 0x[0-9a-f]+ state=pending>')
- self.assertRegexpMatches(repr(RUNNING_FUTURE),
- '<Future at 0x[0-9a-f]+ state=running>')
- self.assertRegexpMatches(repr(CANCELLED_FUTURE),
- '<Future at 0x[0-9a-f]+ state=cancelled>')
- self.assertRegexpMatches(repr(CANCELLED_AND_NOTIFIED_FUTURE),
- '<Future at 0x[0-9a-f]+ state=cancelled>')
- self.assertRegexpMatches(
- repr(EXCEPTION_FUTURE),
- '<Future at 0x[0-9a-f]+ state=finished raised IOError>')
- self.assertRegexpMatches(
- repr(SUCCESSFUL_FUTURE),
- '<Future at 0x[0-9a-f]+ state=finished returned int>')
-
-
- def test_cancel(self):
- f1 = create_future(state=PENDING)
- f2 = create_future(state=RUNNING)
- f3 = create_future(state=CANCELLED)
- f4 = create_future(state=CANCELLED_AND_NOTIFIED)
- f5 = create_future(state=FINISHED, exception=IOError())
- f6 = create_future(state=FINISHED, result=5)
-
- self.assertTrue(f1.cancel())
- self.assertEquals(f1._state, CANCELLED)
-
- self.assertFalse(f2.cancel())
- self.assertEquals(f2._state, RUNNING)
-
- self.assertTrue(f3.cancel())
- self.assertEquals(f3._state, CANCELLED)
-
- self.assertTrue(f4.cancel())
- self.assertEquals(f4._state, CANCELLED_AND_NOTIFIED)
-
- self.assertFalse(f5.cancel())
- self.assertEquals(f5._state, FINISHED)
-
- self.assertFalse(f6.cancel())
- self.assertEquals(f6._state, FINISHED)
-
- def test_cancelled(self):
- self.assertFalse(PENDING_FUTURE.cancelled())
- self.assertFalse(RUNNING_FUTURE.cancelled())
- self.assertTrue(CANCELLED_FUTURE.cancelled())
- self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled())
- self.assertFalse(EXCEPTION_FUTURE.cancelled())
- self.assertFalse(SUCCESSFUL_FUTURE.cancelled())
-
- def test_done(self):
- self.assertFalse(PENDING_FUTURE.done())
- self.assertFalse(RUNNING_FUTURE.done())
- self.assertTrue(CANCELLED_FUTURE.done())
- self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done())
- self.assertTrue(EXCEPTION_FUTURE.done())
- self.assertTrue(SUCCESSFUL_FUTURE.done())
-
- def test_running(self):
- self.assertFalse(PENDING_FUTURE.running())
- self.assertTrue(RUNNING_FUTURE.running())
- self.assertFalse(CANCELLED_FUTURE.running())
- self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running())
- self.assertFalse(EXCEPTION_FUTURE.running())
- self.assertFalse(SUCCESSFUL_FUTURE.running())
-
- def test_result_with_timeout(self):
- self.assertRaises(futures.TimeoutError,
- PENDING_FUTURE.result, timeout=0)
- self.assertRaises(futures.TimeoutError,
- RUNNING_FUTURE.result, timeout=0)
- self.assertRaises(futures.CancelledError,
- CANCELLED_FUTURE.result, timeout=0)
- self.assertRaises(futures.CancelledError,
- CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0)
- self.assertRaises(IOError, EXCEPTION_FUTURE.result, timeout=0)
- self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42)
-
- def test_result_with_success(self):
- # TODO(brian@sweetapp.com): This test is timing dependant.
- def notification():
- # Wait until the main thread is waiting for the result.
- time.sleep(1)
- f1.set_result(42)
-
- f1 = create_future(state=PENDING)
- t = threading.Thread(target=notification)
- t.start()
-
- self.assertEquals(f1.result(timeout=5), 42)
-
- def test_result_with_cancel(self):
- # TODO(brian@sweetapp.com): This test is timing dependant.
- def notification():
- # Wait until the main thread is waiting for the result.
- time.sleep(1)
- f1.cancel()
-
- f1 = create_future(state=PENDING)
- t = threading.Thread(target=notification)
- t.start()
-
- self.assertRaises(futures.CancelledError, f1.result, timeout=5)
-
- def test_exception_with_timeout(self):
- self.assertRaises(futures.TimeoutError,
- PENDING_FUTURE.exception, timeout=0)
- self.assertRaises(futures.TimeoutError,
- RUNNING_FUTURE.exception, timeout=0)
- self.assertRaises(futures.CancelledError,
- CANCELLED_FUTURE.exception, timeout=0)
- self.assertRaises(futures.CancelledError,
- CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0)
- self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0),
- IOError))
- self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None)
-
- def test_exception_with_success(self):
- def notification():
- # Wait until the main thread is waiting for the exception.
- time.sleep(1)
- with f1._condition:
- f1._state = FINISHED
- f1._exception = IOError()
- f1._condition.notify_all()
-
- f1 = create_future(state=PENDING)
- t = threading.Thread(target=notification)
- t.start()
-
- self.assertTrue(isinstance(f1.exception(timeout=5), IOError))
-
-def test_main():
- test.support.run_unittest(ProcessPoolExecutorTest,
- ThreadPoolExecutorTest,
- ProcessPoolWaitTests,
- ThreadPoolWaitTests,
- ProcessPoolAsCompletedTests,
- ThreadPoolAsCompletedTests,
- FutureTests,
- ProcessPoolShutdownTest,
- ThreadPoolShutdownTest)
-
-if __name__ == "__main__":
- test_main()