summaryrefslogtreecommitdiff
path: root/test_futures.py
diff options
context:
space:
mode:
Diffstat (limited to 'test_futures.py')
-rw-r--r--test_futures.py853
1 files changed, 372 insertions, 481 deletions
diff --git a/test_futures.py b/test_futures.py
index 725a60b..dd7fd3e 100644
--- a/test_futures.py
+++ b/test_futures.py
@@ -1,11 +1,22 @@
from __future__ import with_statement
-import logging
-import multiprocessing
-import re
+import os
+import subprocess
import sys
import threading
+import functools
+import contextlib
+import logging
+import re
import time
-import unittest
+
+from concurrent import futures
+from concurrent.futures._base import (
+ PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future)
+
+try:
+ import unittest2 as unittest
+except ImportError:
+ import unittest
try:
from StringIO import StringIO
@@ -13,21 +24,95 @@ except ImportError:
from io import StringIO
try:
- from test.test_support import run_unittest
+ from test import test_support
except ImportError:
- from test.support import run_unittest
+ from test import support as test_support
-if sys.version_info < (3, 0):
+try:
+ next
+except NameError:
next = lambda x: x.next()
-if sys.platform.startswith('win'):
- import ctypes
- import ctypes.wintypes
-from concurrent import futures
-from concurrent.futures._base import (
- PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future,
- LOGGER, STDERR_HANDLER)
+def reap_threads(func):
+ """Use this function when threads are being used. This will
+ ensure that the threads are cleaned up even when the test fails.
+ If threading is unavailable this function does nothing.
+ """
+ @functools.wraps(func)
+ def decorator(*args):
+ key = test_support.threading_setup()
+ try:
+ return func(*args)
+ finally:
+ test_support.threading_cleanup(*key)
+ return decorator
+
+
+# Executing the interpreter in a subprocess
+def _assert_python(expected_success, *args, **env_vars):
+ cmd_line = [sys.executable]
+ if not env_vars:
+ cmd_line.append('-E')
+ # Need to preserve the original environment, for in-place testing of
+ # shared library builds.
+ env = os.environ.copy()
+ # But a special flag that can be set to override -- in this case, the
+ # caller is responsible to pass the full environment.
+ if env_vars.pop('__cleanenv', None):
+ env = {}
+ env.update(env_vars)
+ cmd_line.extend(args)
+ p = subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ env=env)
+ try:
+ out, err = p.communicate()
+ finally:
+ subprocess._cleanup()
+ p.stdout.close()
+ p.stderr.close()
+ rc = p.returncode
+ err = strip_python_stderr(err)
+ if (rc and expected_success) or (not rc and not expected_success):
+ raise AssertionError(
+ "Process return code is %d, "
+ "stderr follows:\n%s" % (rc, err.decode('ascii', 'ignore')))
+ return rc, out, err
+
+
+def assert_python_ok(*args, **env_vars):
+ """
+ Assert that running the interpreter with `args` and optional environment
+ variables `env_vars` is ok and return a (return code, stdout, stderr) tuple.
+ """
+ return _assert_python(True, *args, **env_vars)
+
+
+def strip_python_stderr(stderr):
+ """Strip the stderr of a Python process from potential debug output
+ emitted by the interpreter.
+
+ This will typically be run on the result of the communicate() method
+ of a subprocess.Popen object.
+ """
+ stderr = re.sub(r"\[\d+ refs\]\r?\n?$".encode(), "".encode(), stderr).strip()
+ return stderr
+
+
+@contextlib.contextmanager
+def captured_stderr():
+ """Return a context manager used by captured_stdout/stdin/stderr
+ that temporarily replaces the sys stream *stream_name* with a StringIO."""
+ logging_stream = StringIO()
+ handler = logging.StreamHandler(logging_stream)
+ logging.root.addHandler(handler)
+
+ try:
+ yield logging_stream
+ finally:
+ logging.root.removeHandler(handler)
+
def create_future(state=PENDING, exception=None, result=None):
f = Future()
@@ -36,6 +121,7 @@ def create_future(state=PENDING, exception=None, result=None):
f._result = result
return f
+
PENDING_FUTURE = create_future(state=PENDING)
RUNNING_FUTURE = create_future(state=RUNNING)
CANCELLED_FUTURE = create_future(state=CANCELLED)
@@ -43,138 +129,94 @@ CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError())
SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
+
def mul(x, y):
return x * y
-class Call(object):
- """A call that can be submitted to a future.Executor for testing.
- The call signals when it is called and waits for an event before finishing.
- """
- CALL_LOCKS = {}
- def _create_event(self):
- if sys.platform.startswith('win'):
- class SECURITY_ATTRIBUTES(ctypes.Structure):
- _fields_ = [("nLength", ctypes.wintypes.DWORD),
- ("lpSecurityDescriptor", ctypes.wintypes.LPVOID),
- ("bInheritHandle", ctypes.wintypes.BOOL)]
-
- s = SECURITY_ATTRIBUTES()
- s.nLength = ctypes.sizeof(s)
- s.lpSecurityDescriptor = None
- s.bInheritHandle = True
-
- handle = ctypes.windll.kernel32.CreateEventA(ctypes.pointer(s),
- True,
- False,
- None)
- assert handle is not None
- return handle
- else:
- event = multiprocessing.Event()
- self.CALL_LOCKS[id(event)] = event
- return id(event)
-
- def _wait_on_event(self, handle):
- if sys.platform.startswith('win'):
- r = ctypes.windll.kernel32.WaitForSingleObject(handle, 5 * 1000)
- assert r == 0
- else:
- self.CALL_LOCKS[handle].wait()
+def sleep_and_raise(t):
+ time.sleep(t)
+ raise Exception('this is an exception')
- def _signal_event(self, handle):
- if sys.platform.startswith('win'):
- r = ctypes.windll.kernel32.SetEvent(handle)
- assert r != 0
- else:
- self.CALL_LOCKS[handle].set()
+def sleep_and_print(t, msg):
+ time.sleep(t)
+ print(msg)
+ sys.stdout.flush()
- def __init__(self, manual_finish=False, result=42):
- self._called_event = self._create_event()
- self._can_finish = self._create_event()
- self._result = result
+class ExecutorMixin:
+ worker_count = 5
- if not manual_finish:
- self._signal_event(self._can_finish)
+ def setUp(self):
+ self.t1 = time.time()
+ try:
+ self.executor = self.executor_type(max_workers=self.worker_count)
+ except NotImplementedError:
+ e = sys.exc_info()[1]
+ self.skipTest(str(e))
+ self._prime_executor()
- def wait_on_called(self):
- self._wait_on_event(self._called_event)
+ def tearDown(self):
+ self.executor.shutdown(wait=True)
+ dt = time.time() - self.t1
+ if test_support.verbose:
+ print("%.2fs" % dt)
+ self.assertLess(dt, 60, "synchronization issue: test lasted too long")
- def set_can(self):
- self._signal_event(self._can_finish)
+ def _prime_executor(self):
+ # Make sure that the executor is ready to do work before running the
+ # tests. This should reduce the probability of timeouts in the tests.
+ futures = [self.executor.submit(time.sleep, 0.1)
+ for _ in range(self.worker_count)]
- def __call__(self):
- self._signal_event(self._called_event)
- self._wait_on_event(self._can_finish)
+ for f in futures:
+ f.result()
- return self._result
- def close(self):
- self.set_can()
- if sys.platform.startswith('win'):
- ctypes.windll.kernel32.CloseHandle(self._called_event)
- ctypes.windll.kernel32.CloseHandle(self._can_finish)
- else:
- del self.CALL_LOCKS[self._called_event]
- del self.CALL_LOCKS[self._can_finish]
+class ThreadPoolMixin(ExecutorMixin):
+ executor_type = futures.ThreadPoolExecutor
-class ExceptionCall(Call):
- def __call__(self):
- self._signal_event(self._called_event)
- self._wait_on_event(self._can_finish)
- raise ZeroDivisionError()
-class MapCall(Call):
- def __init__(self, result=42):
- super(MapCall, self).__init__(manual_finish=True, result=result)
+class ProcessPoolMixin(ExecutorMixin):
+ executor_type = futures.ProcessPoolExecutor
- def __call__(self, manual_finish):
- if manual_finish:
- super(MapCall, self).__call__()
- return self._result
class ExecutorShutdownTest(unittest.TestCase):
- def setUp(self):
- self.executor = futures.ProcessPoolExecutor(max_workers=1)
-
def test_run_after_shutdown(self):
self.executor.shutdown()
self.assertRaises(RuntimeError,
self.executor.submit,
pow, 2, 5)
- def _start_some_futures(self):
- call1 = Call(manual_finish=True)
- call2 = Call(manual_finish=True)
- call3 = Call(manual_finish=True)
-
- try:
- self.executor.submit(call1)
- self.executor.submit(call2)
- self.executor.submit(call3)
-
- call1.wait_on_called()
- call2.wait_on_called()
- call3.wait_on_called()
-
- call1.set_can()
- call2.set_can()
- call3.set_can()
- finally:
- call1.close()
- call2.close()
- call3.close()
+ def test_interpreter_shutdown(self):
+ # Test the atexit hook for shutdown of worker threads and processes
+ rc, out, err = assert_python_ok('-c', """if 1:
+ from concurrent.futures import %s
+ from time import sleep
+ from test_futures import sleep_and_print
+ t = %s(5)
+ t.submit(sleep_and_print, 1.0, "apple")
+ """ % (self.executor_type.__name__, self.executor_type.__name__))
+ # Errors in atexit hooks don't change the process exit code, check
+ # stderr manually.
+ self.assertFalse(err)
+ self.assertEqual(out.strip(), "apple".encode())
+
+ def test_hang_issue12364(self):
+ fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
+ self.executor.shutdown()
+ for f in fs:
+ f.result()
-class ThreadPoolShutdownTest(ExecutorShutdownTest):
- def setUp(self):
- self.executor = futures.ThreadPoolExecutor(max_workers=5)
- def tearDown(self):
- self.executor.shutdown(wait=True)
+class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest):
+ def _prime_executor(self):
+ pass
def test_threads_terminate(self):
- self._start_some_futures()
+ self.executor.submit(mul, 21, 2)
+ self.executor.submit(mul, 6, 7)
+ self.executor.submit(mul, 3, 14)
self.assertEqual(len(self.executor._threads), 3)
self.executor.shutdown()
for t in self.executor._threads:
@@ -198,15 +240,15 @@ class ThreadPoolShutdownTest(ExecutorShutdownTest):
for t in threads:
t.join()
-class ProcessPoolShutdownTest(ExecutorShutdownTest):
- def setUp(self):
- self.executor = futures.ProcessPoolExecutor(max_workers=5)
- def tearDown(self):
- self.executor.shutdown(wait=True)
+class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest):
+ def _prime_executor(self):
+ pass
def test_processes_terminate(self):
- self._start_some_futures()
+ self.executor.submit(mul, 21, 2)
+ self.executor.submit(mul, 6, 7)
+ self.executor.submit(mul, 3, 14)
self.assertEqual(len(self.executor._processes), 5)
processes = self.executor._processes
self.executor.shutdown()
@@ -216,11 +258,11 @@ class ProcessPoolShutdownTest(ExecutorShutdownTest):
def test_context_manager_shutdown(self):
with futures.ProcessPoolExecutor(max_workers=5) as e:
- executor = e
+ processes = e._processes
self.assertEqual(list(e.map(abs, range(-5, 5))),
[5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
- for p in self.executor._processes:
+ for p in processes:
p.join()
def test_del_shutdown(self):
@@ -234,325 +276,186 @@ class ProcessPoolShutdownTest(ExecutorShutdownTest):
for p in processes:
p.join()
+
class WaitTests(unittest.TestCase):
- def setUp(self):
- self.executor = futures.ProcessPoolExecutor(max_workers=1)
def test_first_completed(self):
- def wait_test():
- while not future1._waiters:
- pass
- call1.set_can()
+ future1 = self.executor.submit(mul, 21, 2)
+ future2 = self.executor.submit(time.sleep, 1.5)
- call1 = Call(manual_finish=True)
- call2 = Call(manual_finish=True)
- try:
- future1 = self.executor.submit(call1)
- future2 = self.executor.submit(call2)
+ done, not_done = futures.wait(
+ [CANCELLED_FUTURE, future1, future2],
+ return_when=futures.FIRST_COMPLETED)
- t = threading.Thread(target=wait_test)
- t.start()
- done, not_done = futures.wait(
- [CANCELLED_FUTURE, future1, future2],
- return_when=futures.FIRST_COMPLETED)
+ self.assertEqual(set([future1]), done)
+ self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
- self.assertEquals(set([future1]), done)
- self.assertEquals(set([CANCELLED_FUTURE, future2]), not_done)
- finally:
- call1.close()
- call2.close()
-
- def test_first_completed_one_already_completed(self):
- call1 = Call(manual_finish=True)
- try:
- future1 = self.executor.submit(call1)
+ def test_first_completed_some_already_completed(self):
+ future1 = self.executor.submit(time.sleep, 1.5)
- finished, pending = futures.wait(
- [SUCCESSFUL_FUTURE, future1],
- return_when=futures.FIRST_COMPLETED)
+ finished, pending = futures.wait(
+ [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
+ return_when=futures.FIRST_COMPLETED)
- self.assertEquals(set([SUCCESSFUL_FUTURE]), finished)
- self.assertEquals(set([future1]), pending)
- finally:
- call1.close()
+ self.assertEqual(
+ set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
+ finished)
+ self.assertEqual(set([future1]), pending)
def test_first_exception(self):
- def wait_test():
- while not future1._waiters:
- pass
- call1.set_can()
- call2.set_can()
-
- call1 = Call(manual_finish=True)
- call2 = ExceptionCall(manual_finish=True)
- call3 = Call(manual_finish=True)
- try:
- future1 = self.executor.submit(call1)
- future2 = self.executor.submit(call2)
- future3 = self.executor.submit(call3)
-
- t = threading.Thread(target=wait_test)
- t.start()
- finished, pending = futures.wait(
- [future1, future2, future3],
- return_when=futures.FIRST_EXCEPTION)
-
- self.assertEquals(set([future1, future2]), finished)
- self.assertEquals(set([future3]), pending)
- finally:
- call1.close()
- call2.close()
- call3.close()
+ future1 = self.executor.submit(mul, 2, 21)
+ future2 = self.executor.submit(sleep_and_raise, 1.5)
+ future3 = self.executor.submit(time.sleep, 3)
- def test_first_exception_some_already_complete(self):
- def wait_test():
- while not future1._waiters:
- pass
- call1.set_can()
-
- call1 = ExceptionCall(manual_finish=True)
- call2 = Call(manual_finish=True)
- try:
- future1 = self.executor.submit(call1)
- future2 = self.executor.submit(call2)
+ finished, pending = futures.wait(
+ [future1, future2, future3],
+ return_when=futures.FIRST_EXCEPTION)
- t = threading.Thread(target=wait_test)
- t.start()
- finished, pending = futures.wait(
- [SUCCESSFUL_FUTURE,
- CANCELLED_FUTURE,
- CANCELLED_AND_NOTIFIED_FUTURE,
- future1, future2],
- return_when=futures.FIRST_EXCEPTION)
+ self.assertEqual(set([future1, future2]), finished)
+ self.assertEqual(set([future3]), pending)
- self.assertEquals(set([SUCCESSFUL_FUTURE,
- CANCELLED_AND_NOTIFIED_FUTURE,
- future1]), finished)
- self.assertEquals(set([CANCELLED_FUTURE, future2]), pending)
+ def test_first_exception_some_already_complete(self):
+ future1 = self.executor.submit(divmod, 21, 0)
+ future2 = self.executor.submit(time.sleep, 1.5)
+ finished, pending = futures.wait(
+ [SUCCESSFUL_FUTURE,
+ CANCELLED_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE,
+ future1, future2],
+ return_when=futures.FIRST_EXCEPTION)
- finally:
- call1.close()
- call2.close()
+ self.assertEqual(set([SUCCESSFUL_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE,
+ future1]), finished)
+ self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
def test_first_exception_one_already_failed(self):
- call1 = Call(manual_finish=True)
- try:
- future1 = self.executor.submit(call1)
+ future1 = self.executor.submit(time.sleep, 2)
- finished, pending = futures.wait(
- [EXCEPTION_FUTURE, future1],
- return_when=futures.FIRST_EXCEPTION)
+ finished, pending = futures.wait(
+ [EXCEPTION_FUTURE, future1],
+ return_when=futures.FIRST_EXCEPTION)
- self.assertEquals(set([EXCEPTION_FUTURE]), finished)
- self.assertEquals(set([future1]), pending)
- finally:
- call1.close()
+ self.assertEqual(set([EXCEPTION_FUTURE]), finished)
+ self.assertEqual(set([future1]), pending)
def test_all_completed(self):
- def wait_test():
- while not future1._waiters:
- pass
- call1.set_can()
- call2.set_can()
-
- call1 = Call(manual_finish=True)
- call2 = Call(manual_finish=True)
- try:
- future1 = self.executor.submit(call1)
- future2 = self.executor.submit(call2)
-
- t = threading.Thread(target=wait_test)
- t.start()
- finished, pending = futures.wait(
- [future1, future2],
- return_when=futures.ALL_COMPLETED)
-
- self.assertEquals(set([future1, future2]), finished)
- self.assertEquals(set(), pending)
-
-
- finally:
- call1.close()
- call2.close()
-
- def test_all_completed_some_already_completed(self):
- def wait_test():
- while not future1._waiters:
- pass
-
- future4.cancel()
- call1.set_can()
- call2.set_can()
- call3.set_can()
-
- self.assertTrue(
- futures.process.EXTRA_QUEUED_CALLS <= 1,
- 'this test assumes that future4 will be cancelled before it is '
- 'queued to run - which might not be the case if '
- 'ProcessPoolExecutor is too aggresive in scheduling futures')
- call1 = Call(manual_finish=True)
- call2 = Call(manual_finish=True)
- call3 = Call(manual_finish=True)
- call4 = Call(manual_finish=True)
- try:
- future1 = self.executor.submit(call1)
- future2 = self.executor.submit(call2)
- future3 = self.executor.submit(call3)
- future4 = self.executor.submit(call4)
-
- t = threading.Thread(target=wait_test)
- t.start()
- finished, pending = futures.wait(
- [SUCCESSFUL_FUTURE,
- CANCELLED_AND_NOTIFIED_FUTURE,
- future1, future2, future3, future4],
- return_when=futures.ALL_COMPLETED)
-
- self.assertEquals(set([SUCCESSFUL_FUTURE,
- CANCELLED_AND_NOTIFIED_FUTURE,
- future1, future2, future3, future4]),
- finished)
- self.assertEquals(set(), pending)
- finally:
- call1.close()
- call2.close()
- call3.close()
- call4.close()
+ future1 = self.executor.submit(divmod, 2, 0)
+ future2 = self.executor.submit(mul, 2, 21)
+
+ finished, pending = futures.wait(
+ [SUCCESSFUL_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ future1,
+ future2],
+ return_when=futures.ALL_COMPLETED)
+
+ self.assertEqual(set([SUCCESSFUL_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ future1,
+ future2]), finished)
+ self.assertEqual(set(), pending)
def test_timeout(self):
- def wait_test():
- while not future1._waiters:
- pass
- call1.set_can()
-
- call1 = Call(manual_finish=True)
- call2 = Call(manual_finish=True)
+ future1 = self.executor.submit(mul, 6, 7)
+ future2 = self.executor.submit(time.sleep, 3)
+
+ finished, pending = futures.wait(
+ [CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ SUCCESSFUL_FUTURE,
+ future1, future2],
+ timeout=1.5,
+ return_when=futures.ALL_COMPLETED)
+
+ self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ SUCCESSFUL_FUTURE,
+ future1]), finished)
+ self.assertEqual(set([future2]), pending)
+
+
+class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests):
+
+ def test_pending_calls_race(self):
+ # Issue #14406: multi-threaded race condition when waiting on all
+ # futures.
+ event = threading.Event()
+ def future_func():
+ event.wait()
+ oldswitchinterval = sys.getcheckinterval()
+ sys.setcheckinterval(1)
try:
- future1 = self.executor.submit(call1)
- future2 = self.executor.submit(call2)
-
- t = threading.Thread(target=wait_test)
- t.start()
- finished, pending = futures.wait(
- [CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- SUCCESSFUL_FUTURE,
- future1, future2],
- timeout=1,
- return_when=futures.ALL_COMPLETED)
-
- self.assertEquals(set([CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- SUCCESSFUL_FUTURE,
- future1]), finished)
- self.assertEquals(set([future2]), pending)
-
-
+ fs = set(self.executor.submit(future_func) for i in range(100))
+ event.set()
+ futures.wait(fs, return_when=futures.ALL_COMPLETED)
finally:
- call1.close()
- call2.close()
-
+ sys.setcheckinterval(oldswitchinterval)
-class ThreadPoolWaitTests(WaitTests):
- def setUp(self):
- self.executor = futures.ThreadPoolExecutor(max_workers=1)
-
- def tearDown(self):
- self.executor.shutdown(wait=True)
-class ProcessPoolWaitTests(WaitTests):
- def setUp(self):
- self.executor = futures.ProcessPoolExecutor(max_workers=1)
+class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests):
+ pass
- def tearDown(self):
- self.executor.shutdown(wait=True)
class AsCompletedTests(unittest.TestCase):
- def setUp(self):
- self.executor = futures.ProcessPoolExecutor(max_workers=1)
-
# TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.
def test_no_timeout(self):
- def wait_test():
- while not future1._waiters:
- pass
- call1.set_can()
- call2.set_can()
-
- call1 = Call(manual_finish=True)
- call2 = Call(manual_finish=True)
- try:
- future1 = self.executor.submit(call1)
- future2 = self.executor.submit(call2)
+ future1 = self.executor.submit(mul, 2, 21)
+ future2 = self.executor.submit(mul, 7, 6)
+
+ completed = set(futures.as_completed(
+ [CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ SUCCESSFUL_FUTURE,
+ future1, future2]))
+ self.assertEqual(set(
+ [CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ SUCCESSFUL_FUTURE,
+ future1, future2]),
+ completed)
- t = threading.Thread(target=wait_test)
- t.start()
- completed = set(futures.as_completed(
- [CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- SUCCESSFUL_FUTURE,
- future1, future2]))
- self.assertEquals(set(
+ def test_zero_timeout(self):
+ future1 = self.executor.submit(time.sleep, 2)
+ completed_futures = set()
+ try:
+ for future in futures.as_completed(
[CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
- future1, future2]),
- completed)
- finally:
- call1.close()
- call2.close()
+ future1],
+ timeout=0):
+ completed_futures.add(future)
+ except futures.TimeoutError:
+ pass
- def test_zero_timeout(self):
- call1 = Call(manual_finish=True)
- try:
- future1 = self.executor.submit(call1)
- completed_futures = set()
- try:
- for future in futures.as_completed(
- [CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- SUCCESSFUL_FUTURE,
- future1],
- timeout=0):
- completed_futures.add(future)
- except futures.TimeoutError:
- pass
-
- self.assertEquals(set([CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- SUCCESSFUL_FUTURE]),
- completed_futures)
- finally:
- call1.close()
+ self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ SUCCESSFUL_FUTURE]),
+ completed_futures)
-class ThreadPoolAsCompletedTests(AsCompletedTests):
- def setUp(self):
- self.executor = futures.ThreadPoolExecutor(max_workers=1)
- def tearDown(self):
- self.executor.shutdown(wait=True)
+class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests):
+ pass
-class ProcessPoolAsCompletedTests(AsCompletedTests):
- def setUp(self):
- self.executor = futures.ProcessPoolExecutor(max_workers=1)
- def tearDown(self):
- self.executor.shutdown(wait=True)
+class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests):
+ pass
-class ExecutorTest(unittest.TestCase):
- def setUp(self):
- self.executor = futures.ProcessPoolExecutor(max_workers=1)
+class ExecutorTest(unittest.TestCase):
# Executor.shutdown() and context manager usage is tested by
# ExecutorShutdownTest.
def test_submit(self):
future = self.executor.submit(pow, 2, 8)
- self.assertEquals(256, future.result())
+ self.assertEqual(256, future.result())
def test_submit_keyword(self):
future = self.executor.submit(mul, 2, y=8)
- self.assertEquals(16, future.result())
+ self.assertEqual(16, future.result())
def test_map(self):
self.assertEqual(
@@ -567,139 +470,123 @@ class ExecutorTest(unittest.TestCase):
def test_map_timeout(self):
results = []
- timeout_call = MapCall()
try:
- try:
- for i in self.executor.map(timeout_call,
- [False, False, True],
- timeout=1):
- results.append(i)
- except futures.TimeoutError:
- pass
- else:
- self.fail('expected TimeoutError')
- finally:
- timeout_call.close()
+ for i in self.executor.map(time.sleep,
+ [0, 0, 3],
+ timeout=1.5):
+ results.append(i)
+ except futures.TimeoutError:
+ pass
+ else:
+ self.fail('expected TimeoutError')
- self.assertEquals([42, 42], results)
+ self.assertEqual([None, None], results)
-class ThreadPoolExecutorTest(ExecutorTest):
- def setUp(self):
- self.executor = futures.ThreadPoolExecutor(max_workers=1)
- def tearDown(self):
- self.executor.shutdown(wait=True)
+class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest):
+ pass
-class ProcessPoolExecutorTest(ExecutorTest):
- def setUp(self):
- self.executor = futures.ProcessPoolExecutor(max_workers=1)
- def tearDown(self):
- self.executor.shutdown(wait=True)
+class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest):
+ pass
+
class FutureTests(unittest.TestCase):
def test_done_callback_with_result(self):
- self.callback_result = None
+ callback_result = [None]
def fn(callback_future):
- self.callback_result = callback_future.result()
+ callback_result[0] = callback_future.result()
f = Future()
f.add_done_callback(fn)
f.set_result(5)
- self.assertEquals(5, self.callback_result)
+ self.assertEqual(5, callback_result[0])
def test_done_callback_with_exception(self):
- self.callback_exception = None
+ callback_exception = [None]
def fn(callback_future):
- self.callback_exception = callback_future.exception()
+ callback_exception[0] = callback_future.exception()
f = Future()
f.add_done_callback(fn)
f.set_exception(Exception('test'))
- self.assertEquals(('test',), self.callback_exception.args)
+ self.assertEqual(('test',), callback_exception[0].args)
def test_done_callback_with_cancel(self):
- self.was_cancelled = None
+ was_cancelled = [None]
def fn(callback_future):
- self.was_cancelled = callback_future.cancelled()
+ was_cancelled[0] = callback_future.cancelled()
f = Future()
f.add_done_callback(fn)
self.assertTrue(f.cancel())
- self.assertTrue(self.was_cancelled)
+ self.assertTrue(was_cancelled[0])
def test_done_callback_raises(self):
- LOGGER.removeHandler(STDERR_HANDLER)
- logging_stream = StringIO()
- handler = logging.StreamHandler(logging_stream)
- LOGGER.addHandler(handler)
- try:
- self.raising_was_called = False
- self.fn_was_called = False
+ with captured_stderr() as stderr:
+ raising_was_called = [False]
+ fn_was_called = [False]
def raising_fn(callback_future):
- self.raising_was_called = True
+ raising_was_called[0] = True
raise Exception('doh!')
def fn(callback_future):
- self.fn_was_called = True
+ fn_was_called[0] = True
f = Future()
f.add_done_callback(raising_fn)
f.add_done_callback(fn)
f.set_result(5)
- self.assertTrue(self.raising_was_called)
- self.assertTrue(self.fn_was_called)
- self.assertTrue('Exception: doh!' in logging_stream.getvalue())
- finally:
- LOGGER.removeHandler(handler)
- LOGGER.addHandler(STDERR_HANDLER)
+ self.assertTrue(raising_was_called)
+ self.assertTrue(fn_was_called)
+ self.assertIn('Exception: doh!', stderr.getvalue())
def test_done_callback_already_successful(self):
- self.callback_result = None
+ callback_result = [None]
def fn(callback_future):
- self.callback_result = callback_future.result()
+ callback_result[0] = callback_future.result()
f = Future()
f.set_result(5)
f.add_done_callback(fn)
- self.assertEquals(5, self.callback_result)
+ self.assertEqual(5, callback_result[0])
def test_done_callback_already_failed(self):
- self.callback_exception = None
+ callback_exception = [None]
def fn(callback_future):
- self.callback_exception = callback_future.exception()
+ callback_exception[0] = callback_future.exception()
f = Future()
f.set_exception(Exception('test'))
f.add_done_callback(fn)
- self.assertEquals(('test',), self.callback_exception.args)
+ self.assertEqual(('test',), callback_exception[0].args)
def test_done_callback_already_cancelled(self):
- self.was_cancelled = None
+ was_cancelled = [None]
def fn(callback_future):
- self.was_cancelled = callback_future.cancelled()
+ was_cancelled[0] = callback_future.cancelled()
f = Future()
self.assertTrue(f.cancel())
f.add_done_callback(fn)
- self.assertTrue(self.was_cancelled)
+ self.assertTrue(was_cancelled[0])
def test_repr(self):
- self.assertTrue(re.match('<Future at 0x[0-9a-f]+L? state=pending>',
- repr(PENDING_FUTURE)))
- self.assertTrue(re.match('<Future at 0x[0-9a-f]+L? state=running>',
- repr(RUNNING_FUTURE)))
- self.assertTrue(re.match('<Future at 0x[0-9a-f]+L? state=cancelled>',
- repr(CANCELLED_FUTURE)))
- self.assertTrue(re.match('<Future at 0x[0-9a-f]+L? state=cancelled>',
- repr(CANCELLED_AND_NOTIFIED_FUTURE)))
- self.assertTrue(re.match(
- '<Future at 0x[0-9a-f]+L? state=finished raised IOError>',
- repr(EXCEPTION_FUTURE)))
- self.assertTrue(re.match(
- '<Future at 0x[0-9a-f]+L? state=finished returned int>',
- repr(SUCCESSFUL_FUTURE)))
+ self.assertRegexpMatches(repr(PENDING_FUTURE),
+ '<Future at 0x[0-9a-f]+ state=pending>')
+ self.assertRegexpMatches(repr(RUNNING_FUTURE),
+ '<Future at 0x[0-9a-f]+ state=running>')
+ self.assertRegexpMatches(repr(CANCELLED_FUTURE),
+ '<Future at 0x[0-9a-f]+ state=cancelled>')
+ self.assertRegexpMatches(repr(CANCELLED_AND_NOTIFIED_FUTURE),
+ '<Future at 0x[0-9a-f]+ state=cancelled>')
+ self.assertRegexpMatches(
+ repr(EXCEPTION_FUTURE),
+ '<Future at 0x[0-9a-f]+ state=finished raised IOError>')
+ self.assertRegexpMatches(
+ repr(SUCCESSFUL_FUTURE),
+ '<Future at 0x[0-9a-f]+ state=finished returned int>')
def test_cancel(self):
f1 = create_future(state=PENDING)
@@ -710,22 +597,22 @@ class FutureTests(unittest.TestCase):
f6 = create_future(state=FINISHED, result=5)
self.assertTrue(f1.cancel())
- self.assertEquals(f1._state, CANCELLED)
+ self.assertEqual(f1._state, CANCELLED)
self.assertFalse(f2.cancel())
- self.assertEquals(f2._state, RUNNING)
+ self.assertEqual(f2._state, RUNNING)
self.assertTrue(f3.cancel())
- self.assertEquals(f3._state, CANCELLED)
+ self.assertEqual(f3._state, CANCELLED)
self.assertTrue(f4.cancel())
- self.assertEquals(f4._state, CANCELLED_AND_NOTIFIED)
+ self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED)
self.assertFalse(f5.cancel())
- self.assertEquals(f5._state, FINISHED)
+ self.assertEqual(f5._state, FINISHED)
self.assertFalse(f6.cancel())
- self.assertEquals(f6._state, FINISHED)
+ self.assertEqual(f6._state, FINISHED)
def test_cancelled(self):
self.assertFalse(PENDING_FUTURE.cancelled())
@@ -774,7 +661,7 @@ class FutureTests(unittest.TestCase):
t = threading.Thread(target=notification)
t.start()
- self.assertEquals(f1.result(timeout=5), 42)
+ self.assertEqual(f1.result(timeout=5), 42)
def test_result_with_cancel(self):
# TODO(brian@sweetapp.com): This test is timing dependant.
@@ -817,16 +704,20 @@ class FutureTests(unittest.TestCase):
self.assertTrue(isinstance(f1.exception(timeout=5), IOError))
+@reap_threads
def test_main():
- run_unittest(ProcessPoolExecutorTest,
- ThreadPoolExecutorTest,
- ProcessPoolWaitTests,
- ThreadPoolWaitTests,
- ProcessPoolAsCompletedTests,
- ThreadPoolAsCompletedTests,
- FutureTests,
- ProcessPoolShutdownTest,
- ThreadPoolShutdownTest)
+ try:
+ test_support.run_unittest(ProcessPoolExecutorTest,
+ ThreadPoolExecutorTest,
+ ProcessPoolWaitTests,
+ ThreadPoolWaitTests,
+ ProcessPoolAsCompletedTests,
+ ThreadPoolAsCompletedTests,
+ FutureTests,
+ ProcessPoolShutdownTest,
+ ThreadPoolShutdownTest)
+ finally:
+ test_support.reap_children()
if __name__ == "__main__":
test_main()