summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Gr?nholm <alex.gronholm@nextday.fi>2013-06-24 01:20:47 +0300
committerAlex Gr?nholm <alex.gronholm@nextday.fi>2013-06-24 01:20:47 +0300
commitacb8961109b61390bac0a0cd795d53924db20f4c (patch)
tree810b408eff45e7507e131e6f4f7eed6c798c7206
parent24ffe396c4da66cfa65b5d8641f9c39aa3ada5b1 (diff)
downloadfutures-acb8961109b61390bac0a0cd795d53924db20f4c.tar.gz
Redid the port from scratch using Python 3.2.5 as base2.1.4
-rw-r--r--.hgignore11
-rw-r--r--CHANGES6
-rw-r--r--concurrent/futures/_base.py12
-rw-r--r--concurrent/futures/process.py170
-rw-r--r--concurrent/futures/thread.py60
-rwxr-xr-xsetup.py2
-rw-r--r--test_futures.py853
-rw-r--r--tox.ini9
8 files changed, 520 insertions, 603 deletions
diff --git a/.hgignore b/.hgignore
new file mode 100644
index 0000000..78f3a52
--- /dev/null
+++ b/.hgignore
@@ -0,0 +1,11 @@
+
+syntax: glob
+*.egg-info
+syntax: regexp
+^\.tox$
+syntax: regexp
+^\.project$
+syntax: regexp
+^\.pydevproject$
+syntax: regexp
+^dist$ \ No newline at end of file
diff --git a/CHANGES b/CHANGES
index 1a84b63..81df636 100644
--- a/CHANGES
+++ b/CHANGES
@@ -1,3 +1,9 @@
+2.1.4
+=====
+
+- Ported the library again from Python 3.2.5 to get the latest bug fixes
+
+
2.1.3
=====
diff --git a/concurrent/futures/_base.py b/concurrent/futures/_base.py
index aaefa2b..8ed69b7 100644
--- a/concurrent/futures/_base.py
+++ b/concurrent/futures/_base.py
@@ -2,7 +2,6 @@
# Licensed to PSF under a Contributor Agreement.
from __future__ import with_statement
-import functools
import logging
import threading
import time
@@ -46,8 +45,6 @@ _STATE_TO_DESCRIPTION_MAP = {
# Logger for internal use by the futures package.
LOGGER = logging.getLogger("concurrent.futures")
-STDERR_HANDLER = logging.StreamHandler()
-LOGGER.addHandler(STDERR_HANDLER)
class Error(Exception):
"""Base class for all future-related exceptions."""
@@ -119,11 +116,14 @@ class _AllCompletedWaiter(_Waiter):
def __init__(self, num_pending_calls, stop_on_exception):
self.num_pending_calls = num_pending_calls
self.stop_on_exception = stop_on_exception
+ self.lock = threading.Lock()
super(_AllCompletedWaiter, self).__init__()
def _decrement_pending_calls(self):
- if self.num_pending_calls == len(self.finished_futures):
- self.event.set()
+ with self.lock:
+ self.num_pending_calls -= 1
+ if not self.num_pending_calls:
+ self.event.set()
def add_result(self, future):
super(_AllCompletedWaiter, self).add_result(future)
@@ -523,7 +523,7 @@ class Executor(object):
"""Returns a iterator equivalent to map(fn, iter).
Args:
- fn: A callable that will take take as many arguments as there are
+ fn: A callable that will take as many arguments as there are
passed iterables.
timeout: The maximum number of seconds to wait. If None, then there
is no limit on the wait time.
diff --git a/concurrent/futures/process.py b/concurrent/futures/process.py
index 87dc789..98684f8 100644
--- a/concurrent/futures/process.py
+++ b/concurrent/futures/process.py
@@ -73,28 +73,17 @@ __author__ = 'Brian Quinlan (brian@sweetapp.com)'
# workers to exit when their work queues are empty and then waits until the
# threads/processes finish.
-_thread_references = set()
+_threads_queues = weakref.WeakKeyDictionary()
_shutdown = False
def _python_exit():
global _shutdown
_shutdown = True
- for thread_reference in _thread_references:
- thread = thread_reference()
- if thread is not None:
- thread.join()
-
-def _remove_dead_thread_references():
- """Remove inactive threads from _thread_references.
-
- Should be called periodically to prevent memory leaks in scenarios such as:
- >>> while True:
- >>> ... t = ThreadPoolExecutor(max_workers=5)
- >>> ... t.map(int, ['1', '2', '3', '4', '5'])
- """
- for thread_reference in set(_thread_references):
- if thread_reference() is None:
- _thread_references.discard(thread_reference)
+ items = list(_threads_queues.items())
+ for t, q in items:
+ q.put(None)
+ for t, q in items:
+ t.join()
# Controls how many more calls than processes will be queued in the call queue.
# A smaller number will mean that processes spend more time idle waiting for
@@ -122,10 +111,10 @@ class _CallItem(object):
self.args = args
self.kwargs = kwargs
-def _process_worker(call_queue, result_queue, shutdown):
+def _process_worker(call_queue, result_queue):
"""Evaluates calls from call_queue and places the results in result_queue.
- This worker is run in a seperate process.
+ This worker is run in a separate process.
Args:
call_queue: A multiprocessing.Queue of _CallItems that will be read and
@@ -136,21 +125,20 @@ def _process_worker(call_queue, result_queue, shutdown):
worker that it should exit when call_queue is empty.
"""
while True:
+ call_item = call_queue.get(block=True)
+ if call_item is None:
+ # Wake up queue management thread
+ result_queue.put(None)
+ return
try:
- call_item = call_queue.get(block=True, timeout=0.1)
- except queue.Empty:
- if shutdown.is_set():
- return
+ r = call_item.fn(*call_item.args, **call_item.kwargs)
+ except BaseException:
+ e = sys.exc_info()[1]
+ result_queue.put(_ResultItem(call_item.work_id,
+ exception=e))
else:
- try:
- r = call_item.fn(*call_item.args, **call_item.kwargs)
- except BaseException:
- e = sys.exc_info()[1]
- result_queue.put(_ResultItem(call_item.work_id,
- exception=e))
- else:
- result_queue.put(_ResultItem(call_item.work_id,
- result=r))
+ result_queue.put(_ResultItem(call_item.work_id,
+ result=r))
def _add_call_item_to_queue(pending_work_items,
work_ids,
@@ -189,13 +177,12 @@ def _add_call_item_to_queue(pending_work_items,
del pending_work_items[work_id]
continue
-def _queue_manangement_worker(executor_reference,
- processes,
- pending_work_items,
- work_ids_queue,
- call_queue,
- result_queue,
- shutdown_process_event):
+def _queue_management_worker(executor_reference,
+ processes,
+ pending_work_items,
+ work_ids_queue,
+ call_queue,
+ result_queue):
"""Manages the communication between this process and the worker processes.
This function is run in a local thread.
@@ -213,37 +200,19 @@ def _queue_manangement_worker(executor_reference,
derived from _WorkItems for processing by the process workers.
result_queue: A multiprocessing.Queue of _ResultItems generated by the
process workers.
- shutdown_process_event: A multiprocessing.Event used to signal the
- process workers that they should exit when their work queue is
- empty.
"""
+ nb_shutdown_processes = [0]
+ def shutdown_one_process():
+ """Tell a worker to terminate, which will in turn wake us again"""
+ call_queue.put(None)
+ nb_shutdown_processes[0] += 1
while True:
_add_call_item_to_queue(pending_work_items,
work_ids_queue,
call_queue)
- try:
- result_item = result_queue.get(block=True, timeout=0.1)
- except queue.Empty:
- executor = executor_reference()
- # No more work items can be added if:
- # - The interpreter is shutting down OR
- # - The executor that owns this worker has been collected OR
- # - The executor that owns this worker has been shutdown.
- if _shutdown or executor is None or executor._shutdown_thread:
- # Since no new work items can be added, it is safe to shutdown
- # this thread if there are no pending work items.
- if not pending_work_items:
- shutdown_process_event.set()
-
- # If .join() is not called on the created processes then
- # some multiprocessing.Queue methods may deadlock on Mac OS
- # X.
- for p in processes:
- p.join()
- return
- del executor
- else:
+ result_item = result_queue.get(block=True)
+ if result_item is not None:
work_item = pending_work_items[result_item.work_id]
del pending_work_items[result_item.work_id]
@@ -251,6 +220,51 @@ def _queue_manangement_worker(executor_reference,
work_item.future.set_exception(result_item.exception)
else:
work_item.future.set_result(result_item.result)
+ # Check whether we should start shutting down.
+ executor = executor_reference()
+ # No more work items can be added if:
+ # - The interpreter is shutting down OR
+ # - The executor that owns this worker has been collected OR
+ # - The executor that owns this worker has been shutdown.
+ if _shutdown or executor is None or executor._shutdown_thread:
+ # Since no new work items can be added, it is safe to shutdown
+ # this thread if there are no pending work items.
+ if not pending_work_items:
+ while nb_shutdown_processes[0] < len(processes):
+ shutdown_one_process()
+ # If .join() is not called on the created processes then
+ # some multiprocessing.Queue methods may deadlock on Mac OS
+ # X.
+ for p in processes:
+ p.join()
+ call_queue.close()
+ return
+ del executor
+
+_system_limits_checked = False
+_system_limited = None
+def _check_system_limits():
+ global _system_limits_checked, _system_limited
+ if _system_limits_checked:
+ if _system_limited:
+ raise NotImplementedError(_system_limited)
+ _system_limits_checked = True
+ try:
+ import os
+ nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
+ except (AttributeError, ValueError):
+ # sysconf not available or setting not available
+ return
+ if nsems_max == -1:
+ # indetermine limit, assume that limit is determined
+ # by available memory only
+ return
+ if nsems_max >= 256:
+ # minimum number of semaphores available
+ # according to POSIX
+ return
+ _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max
+ raise NotImplementedError(_system_limited)
class ProcessPoolExecutor(_base.Executor):
def __init__(self, max_workers=None):
@@ -261,7 +275,7 @@ class ProcessPoolExecutor(_base.Executor):
execute the given calls. If None or not given then as many
worker processes will be created as the machine has processors.
"""
- _remove_dead_thread_references()
+ _check_system_limits()
if max_workers is None:
self._max_workers = multiprocessing.cpu_count()
@@ -280,33 +294,34 @@ class ProcessPoolExecutor(_base.Executor):
# Shutdown is a two-step process.
self._shutdown_thread = False
- self._shutdown_process_event = multiprocessing.Event()
self._shutdown_lock = threading.Lock()
self._queue_count = 0
self._pending_work_items = {}
def _start_queue_management_thread(self):
+ # When the executor gets lost, the weakref callback will wake up
+ # the queue management thread.
+ def weakref_cb(_, q=self._result_queue):
+ q.put(None)
if self._queue_management_thread is None:
self._queue_management_thread = threading.Thread(
- target=_queue_manangement_worker,
- args=(weakref.ref(self),
+ target=_queue_management_worker,
+ args=(weakref.ref(self, weakref_cb),
self._processes,
self._pending_work_items,
self._work_ids,
self._call_queue,
- self._result_queue,
- self._shutdown_process_event))
+ self._result_queue))
self._queue_management_thread.daemon = True
self._queue_management_thread.start()
- _thread_references.add(weakref.ref(self._queue_management_thread))
+ _threads_queues[self._queue_management_thread] = self._result_queue
def _adjust_process_count(self):
for _ in range(len(self._processes), self._max_workers):
p = multiprocessing.Process(
target=_process_worker,
args=(self._call_queue,
- self._result_queue,
- self._shutdown_process_event))
+ self._result_queue))
p.start()
self._processes.add(p)
@@ -321,6 +336,8 @@ class ProcessPoolExecutor(_base.Executor):
self._pending_work_items[self._queue_count] = w
self._work_ids.put(self._queue_count)
self._queue_count += 1
+ # Wake up queue management thread
+ self._result_queue.put(None)
self._start_queue_management_thread()
self._adjust_process_count()
@@ -330,15 +347,16 @@ class ProcessPoolExecutor(_base.Executor):
def shutdown(self, wait=True):
with self._shutdown_lock:
self._shutdown_thread = True
- if wait:
- if self._queue_management_thread:
+ if self._queue_management_thread:
+ # Wake up queue management thread
+ self._result_queue.put(None)
+ if wait:
self._queue_management_thread.join()
# To reduce the risk of openning too many files, remove references to
# objects that use file descriptors.
self._queue_management_thread = None
self._call_queue = None
self._result_queue = None
- self._shutdown_process_event = None
self._processes = None
shutdown.__doc__ = _base.Executor.shutdown.__doc__
diff --git a/concurrent/futures/thread.py b/concurrent/futures/thread.py
index ce0dda0..a45959d 100644
--- a/concurrent/futures/thread.py
+++ b/concurrent/futures/thread.py
@@ -32,28 +32,17 @@ __author__ = 'Brian Quinlan (brian@sweetapp.com)'
# workers to exit when their work queues are empty and then waits until the
# threads finish.
-_thread_references = set()
+_threads_queues = weakref.WeakKeyDictionary()
_shutdown = False
def _python_exit():
global _shutdown
_shutdown = True
- for thread_reference in _thread_references:
- thread = thread_reference()
- if thread is not None:
- thread.join()
-
-def _remove_dead_thread_references():
- """Remove inactive threads from _thread_references.
-
- Should be called periodically to prevent memory leaks in scenarios such as:
- >>> while True:
- ... t = ThreadPoolExecutor(max_workers=5)
- ... t.map(int, ['1', '2', '3', '4', '5'])
- """
- for thread_reference in set(_thread_references):
- if thread_reference() is None:
- _thread_references.discard(thread_reference)
+ items = list(_threads_queues.items())
+ for t, q in items:
+ q.put(None)
+ for t, q in items:
+ t.join()
atexit.register(_python_exit)
@@ -79,19 +68,20 @@ class _WorkItem(object):
def _worker(executor_reference, work_queue):
try:
while True:
- try:
- work_item = work_queue.get(block=True, timeout=0.1)
- except queue.Empty:
- executor = executor_reference()
- # Exit if:
- # - The interpreter is shutting down OR
- # - The executor that owns the worker has been collected OR
- # - The executor that owns the worker has been shutdown.
- if _shutdown or executor is None or executor._shutdown:
- return
- del executor
- else:
+ work_item = work_queue.get(block=True)
+ if work_item is not None:
work_item.run()
+ continue
+ executor = executor_reference()
+ # Exit if:
+ # - The interpreter is shutting down OR
+ # - The executor that owns the worker has been collected OR
+ # - The executor that owns the worker has been shutdown.
+ if _shutdown or executor is None or executor._shutdown:
+ # Notice other workers
+ work_queue.put(None)
+ return
+ del executor
except BaseException:
_base.LOGGER.critical('Exception in worker', exc_info=True)
@@ -103,8 +93,6 @@ class ThreadPoolExecutor(_base.Executor):
max_workers: The maximum number of threads that can be used to
execute the given calls.
"""
- _remove_dead_thread_references()
-
self._max_workers = max_workers
self._work_queue = queue.Queue()
self._threads = set()
@@ -125,19 +113,25 @@ class ThreadPoolExecutor(_base.Executor):
submit.__doc__ = _base.Executor.submit.__doc__
def _adjust_thread_count(self):
+ # When the executor gets lost, the weakref callback will wake up
+ # the worker threads.
+ def weakref_cb(_, q=self._work_queue):
+ q.put(None)
# TODO(bquinlan): Should avoid creating new threads if there are more
# idle threads than items in the work queue.
if len(self._threads) < self._max_workers:
t = threading.Thread(target=_worker,
- args=(weakref.ref(self), self._work_queue))
+ args=(weakref.ref(self, weakref_cb),
+ self._work_queue))
t.daemon = True
t.start()
self._threads.add(t)
- _thread_references.add(weakref.ref(t))
+ _threads_queues[t] = self._work_queue
def shutdown(self, wait=True):
with self._shutdown_lock:
self._shutdown = True
+ self._work_queue.put(None)
if wait:
for t in self._threads:
t.join()
diff --git a/setup.py b/setup.py
index f0dcd89..c08461e 100755
--- a/setup.py
+++ b/setup.py
@@ -11,7 +11,7 @@ except ImportError:
from distutils.core import setup
setup(name='futures',
- version='2.1.3',
+ version='2.1.4',
description='Backport of the concurrent.futures package from Python 3.2',
author='Brian Quinlan',
author_email='brian@sweetapp.com',
diff --git a/test_futures.py b/test_futures.py
index 725a60b..dd7fd3e 100644
--- a/test_futures.py
+++ b/test_futures.py
@@ -1,11 +1,22 @@
from __future__ import with_statement
-import logging
-import multiprocessing
-import re
+import os
+import subprocess
import sys
import threading
+import functools
+import contextlib
+import logging
+import re
import time
-import unittest
+
+from concurrent import futures
+from concurrent.futures._base import (
+ PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future)
+
+try:
+ import unittest2 as unittest
+except ImportError:
+ import unittest
try:
from StringIO import StringIO
@@ -13,21 +24,95 @@ except ImportError:
from io import StringIO
try:
- from test.test_support import run_unittest
+ from test import test_support
except ImportError:
- from test.support import run_unittest
+ from test import support as test_support
-if sys.version_info < (3, 0):
+try:
+ next
+except NameError:
next = lambda x: x.next()
-if sys.platform.startswith('win'):
- import ctypes
- import ctypes.wintypes
-from concurrent import futures
-from concurrent.futures._base import (
- PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future,
- LOGGER, STDERR_HANDLER)
+def reap_threads(func):
+ """Use this function when threads are being used. This will
+ ensure that the threads are cleaned up even when the test fails.
+ If threading is unavailable this function does nothing.
+ """
+ @functools.wraps(func)
+ def decorator(*args):
+ key = test_support.threading_setup()
+ try:
+ return func(*args)
+ finally:
+ test_support.threading_cleanup(*key)
+ return decorator
+
+
+# Executing the interpreter in a subprocess
+def _assert_python(expected_success, *args, **env_vars):
+ cmd_line = [sys.executable]
+ if not env_vars:
+ cmd_line.append('-E')
+ # Need to preserve the original environment, for in-place testing of
+ # shared library builds.
+ env = os.environ.copy()
+ # But a special flag that can be set to override -- in this case, the
+ # caller is responsible to pass the full environment.
+ if env_vars.pop('__cleanenv', None):
+ env = {}
+ env.update(env_vars)
+ cmd_line.extend(args)
+ p = subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ env=env)
+ try:
+ out, err = p.communicate()
+ finally:
+ subprocess._cleanup()
+ p.stdout.close()
+ p.stderr.close()
+ rc = p.returncode
+ err = strip_python_stderr(err)
+ if (rc and expected_success) or (not rc and not expected_success):
+ raise AssertionError(
+ "Process return code is %d, "
+ "stderr follows:\n%s" % (rc, err.decode('ascii', 'ignore')))
+ return rc, out, err
+
+
+def assert_python_ok(*args, **env_vars):
+ """
+ Assert that running the interpreter with `args` and optional environment
+ variables `env_vars` is ok and return a (return code, stdout, stderr) tuple.
+ """
+ return _assert_python(True, *args, **env_vars)
+
+
+def strip_python_stderr(stderr):
+ """Strip the stderr of a Python process from potential debug output
+ emitted by the interpreter.
+
+ This will typically be run on the result of the communicate() method
+ of a subprocess.Popen object.
+ """
+ stderr = re.sub(r"\[\d+ refs\]\r?\n?$".encode(), "".encode(), stderr).strip()
+ return stderr
+
+
+@contextlib.contextmanager
+def captured_stderr():
+ """Return a context manager used by captured_stdout/stdin/stderr
+ that temporarily replaces the sys stream *stream_name* with a StringIO."""
+ logging_stream = StringIO()
+ handler = logging.StreamHandler(logging_stream)
+ logging.root.addHandler(handler)
+
+ try:
+ yield logging_stream
+ finally:
+ logging.root.removeHandler(handler)
+
def create_future(state=PENDING, exception=None, result=None):
f = Future()
@@ -36,6 +121,7 @@ def create_future(state=PENDING, exception=None, result=None):
f._result = result
return f
+
PENDING_FUTURE = create_future(state=PENDING)
RUNNING_FUTURE = create_future(state=RUNNING)
CANCELLED_FUTURE = create_future(state=CANCELLED)
@@ -43,138 +129,94 @@ CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError())
SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
+
def mul(x, y):
return x * y
-class Call(object):
- """A call that can be submitted to a future.Executor for testing.
- The call signals when it is called and waits for an event before finishing.
- """
- CALL_LOCKS = {}
- def _create_event(self):
- if sys.platform.startswith('win'):
- class SECURITY_ATTRIBUTES(ctypes.Structure):
- _fields_ = [("nLength", ctypes.wintypes.DWORD),
- ("lpSecurityDescriptor", ctypes.wintypes.LPVOID),
- ("bInheritHandle", ctypes.wintypes.BOOL)]
-
- s = SECURITY_ATTRIBUTES()
- s.nLength = ctypes.sizeof(s)
- s.lpSecurityDescriptor = None
- s.bInheritHandle = True
-
- handle = ctypes.windll.kernel32.CreateEventA(ctypes.pointer(s),
- True,
- False,
- None)
- assert handle is not None
- return handle
- else:
- event = multiprocessing.Event()
- self.CALL_LOCKS[id(event)] = event
- return id(event)
-
- def _wait_on_event(self, handle):
- if sys.platform.startswith('win'):
- r = ctypes.windll.kernel32.WaitForSingleObject(handle, 5 * 1000)
- assert r == 0
- else:
- self.CALL_LOCKS[handle].wait()
+def sleep_and_raise(t):
+ time.sleep(t)
+ raise Exception('this is an exception')
- def _signal_event(self, handle):
- if sys.platform.startswith('win'):
- r = ctypes.windll.kernel32.SetEvent(handle)
- assert r != 0
- else:
- self.CALL_LOCKS[handle].set()
+def sleep_and_print(t, msg):
+ time.sleep(t)
+ print(msg)
+ sys.stdout.flush()
- def __init__(self, manual_finish=False, result=42):
- self._called_event = self._create_event()
- self._can_finish = self._create_event()
- self._result = result
+class ExecutorMixin:
+ worker_count = 5
- if not manual_finish:
- self._signal_event(self._can_finish)
+ def setUp(self):
+ self.t1 = time.time()
+ try:
+ self.executor = self.executor_type(max_workers=self.worker_count)
+ except NotImplementedError:
+ e = sys.exc_info()[1]
+ self.skipTest(str(e))
+ self._prime_executor()
- def wait_on_called(self):
- self._wait_on_event(self._called_event)
+ def tearDown(self):
+ self.executor.shutdown(wait=True)
+ dt = time.time() - self.t1
+ if test_support.verbose:
+ print("%.2fs" % dt)
+ self.assertLess(dt, 60, "synchronization issue: test lasted too long")
- def set_can(self):
- self._signal_event(self._can_finish)
+ def _prime_executor(self):
+ # Make sure that the executor is ready to do work before running the
+ # tests. This should reduce the probability of timeouts in the tests.
+ futures = [self.executor.submit(time.sleep, 0.1)
+ for _ in range(self.worker_count)]
- def __call__(self):
- self._signal_event(self._called_event)
- self._wait_on_event(self._can_finish)
+ for f in futures:
+ f.result()
- return self._result
- def close(self):
- self.set_can()
- if sys.platform.startswith('win'):
- ctypes.windll.kernel32.CloseHandle(self._called_event)
- ctypes.windll.kernel32.CloseHandle(self._can_finish)
- else:
- del self.CALL_LOCKS[self._called_event]
- del self.CALL_LOCKS[self._can_finish]
+class ThreadPoolMixin(ExecutorMixin):
+ executor_type = futures.ThreadPoolExecutor
-class ExceptionCall(Call):
- def __call__(self):
- self._signal_event(self._called_event)
- self._wait_on_event(self._can_finish)
- raise ZeroDivisionError()
-class MapCall(Call):
- def __init__(self, result=42):
- super(MapCall, self).__init__(manual_finish=True, result=result)
+class ProcessPoolMixin(ExecutorMixin):
+ executor_type = futures.ProcessPoolExecutor
- def __call__(self, manual_finish):
- if manual_finish:
- super(MapCall, self).__call__()
- return self._result
class ExecutorShutdownTest(unittest.TestCase):
- def setUp(self):
- self.executor = futures.ProcessPoolExecutor(max_workers=1)
-
def test_run_after_shutdown(self):
self.executor.shutdown()
self.assertRaises(RuntimeError,
self.executor.submit,
pow, 2, 5)
- def _start_some_futures(self):
- call1 = Call(manual_finish=True)
- call2 = Call(manual_finish=True)
- call3 = Call(manual_finish=True)
-
- try:
- self.executor.submit(call1)
- self.executor.submit(call2)
- self.executor.submit(call3)
-
- call1.wait_on_called()
- call2.wait_on_called()
- call3.wait_on_called()
-
- call1.set_can()
- call2.set_can()
- call3.set_can()
- finally:
- call1.close()
- call2.close()
- call3.close()
+ def test_interpreter_shutdown(self):
+ # Test the atexit hook for shutdown of worker threads and processes
+ rc, out, err = assert_python_ok('-c', """if 1:
+ from concurrent.futures import %s
+ from time import sleep
+ from test_futures import sleep_and_print
+ t = %s(5)
+ t.submit(sleep_and_print, 1.0, "apple")
+ """ % (self.executor_type.__name__, self.executor_type.__name__))
+ # Errors in atexit hooks don't change the process exit code, check
+ # stderr manually.
+ self.assertFalse(err)
+ self.assertEqual(out.strip(), "apple".encode())
+
+ def test_hang_issue12364(self):
+ fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
+ self.executor.shutdown()
+ for f in fs:
+ f.result()
-class ThreadPoolShutdownTest(ExecutorShutdownTest):
- def setUp(self):
- self.executor = futures.ThreadPoolExecutor(max_workers=5)
- def tearDown(self):
- self.executor.shutdown(wait=True)
+class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest):
+ def _prime_executor(self):
+ pass
def test_threads_terminate(self):
- self._start_some_futures()
+ self.executor.submit(mul, 21, 2)
+ self.executor.submit(mul, 6, 7)
+ self.executor.submit(mul, 3, 14)
self.assertEqual(len(self.executor._threads), 3)
self.executor.shutdown()
for t in self.executor._threads:
@@ -198,15 +240,15 @@ class ThreadPoolShutdownTest(ExecutorShutdownTest):
for t in threads:
t.join()
-class ProcessPoolShutdownTest(ExecutorShutdownTest):
- def setUp(self):
- self.executor = futures.ProcessPoolExecutor(max_workers=5)
- def tearDown(self):
- self.executor.shutdown(wait=True)
+class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest):
+ def _prime_executor(self):
+ pass
def test_processes_terminate(self):
- self._start_some_futures()
+ self.executor.submit(mul, 21, 2)
+ self.executor.submit(mul, 6, 7)
+ self.executor.submit(mul, 3, 14)
self.assertEqual(len(self.executor._processes), 5)
processes = self.executor._processes
self.executor.shutdown()
@@ -216,11 +258,11 @@ class ProcessPoolShutdownTest(ExecutorShutdownTest):
def test_context_manager_shutdown(self):
with futures.ProcessPoolExecutor(max_workers=5) as e:
- executor = e
+ processes = e._processes
self.assertEqual(list(e.map(abs, range(-5, 5))),
[5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
- for p in self.executor._processes:
+ for p in processes:
p.join()
def test_del_shutdown(self):
@@ -234,325 +276,186 @@ class ProcessPoolShutdownTest(ExecutorShutdownTest):
for p in processes:
p.join()
+
class WaitTests(unittest.TestCase):
- def setUp(self):
- self.executor = futures.ProcessPoolExecutor(max_workers=1)
def test_first_completed(self):
- def wait_test():
- while not future1._waiters:
- pass
- call1.set_can()
+ future1 = self.executor.submit(mul, 21, 2)
+ future2 = self.executor.submit(time.sleep, 1.5)
- call1 = Call(manual_finish=True)
- call2 = Call(manual_finish=True)
- try:
- future1 = self.executor.submit(call1)
- future2 = self.executor.submit(call2)
+ done, not_done = futures.wait(
+ [CANCELLED_FUTURE, future1, future2],
+ return_when=futures.FIRST_COMPLETED)
- t = threading.Thread(target=wait_test)
- t.start()
- done, not_done = futures.wait(
- [CANCELLED_FUTURE, future1, future2],
- return_when=futures.FIRST_COMPLETED)
+ self.assertEqual(set([future1]), done)
+ self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
- self.assertEquals(set([future1]), done)
- self.assertEquals(set([CANCELLED_FUTURE, future2]), not_done)
- finally:
- call1.close()
- call2.close()
-
- def test_first_completed_one_already_completed(self):
- call1 = Call(manual_finish=True)
- try:
- future1 = self.executor.submit(call1)
+ def test_first_completed_some_already_completed(self):
+ future1 = self.executor.submit(time.sleep, 1.5)
- finished, pending = futures.wait(
- [SUCCESSFUL_FUTURE, future1],
- return_when=futures.FIRST_COMPLETED)
+ finished, pending = futures.wait(
+ [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
+ return_when=futures.FIRST_COMPLETED)
- self.assertEquals(set([SUCCESSFUL_FUTURE]), finished)
- self.assertEquals(set([future1]), pending)
- finally:
- call1.close()
+ self.assertEqual(
+ set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
+ finished)
+ self.assertEqual(set([future1]), pending)
def test_first_exception(self):
- def wait_test():
- while not future1._waiters:
- pass
- call1.set_can()
- call2.set_can()
-
- call1 = Call(manual_finish=True)
- call2 = ExceptionCall(manual_finish=True)
- call3 = Call(manual_finish=True)
- try:
- future1 = self.executor.submit(call1)
- future2 = self.executor.submit(call2)
- future3 = self.executor.submit(call3)
-
- t = threading.Thread(target=wait_test)
- t.start()
- finished, pending = futures.wait(
- [future1, future2, future3],
- return_when=futures.FIRST_EXCEPTION)
-
- self.assertEquals(set([future1, future2]), finished)
- self.assertEquals(set([future3]), pending)
- finally:
- call1.close()
- call2.close()
- call3.close()
+ future1 = self.executor.submit(mul, 2, 21)
+ future2 = self.executor.submit(sleep_and_raise, 1.5)
+ future3 = self.executor.submit(time.sleep, 3)
- def test_first_exception_some_already_complete(self):
- def wait_test():
- while not future1._waiters:
- pass
- call1.set_can()
-
- call1 = ExceptionCall(manual_finish=True)
- call2 = Call(manual_finish=True)
- try:
- future1 = self.executor.submit(call1)
- future2 = self.executor.submit(call2)
+ finished, pending = futures.wait(
+ [future1, future2, future3],
+ return_when=futures.FIRST_EXCEPTION)
- t = threading.Thread(target=wait_test)
- t.start()
- finished, pending = futures.wait(
- [SUCCESSFUL_FUTURE,
- CANCELLED_FUTURE,
- CANCELLED_AND_NOTIFIED_FUTURE,
- future1, future2],
- return_when=futures.FIRST_EXCEPTION)
+ self.assertEqual(set([future1, future2]), finished)
+ self.assertEqual(set([future3]), pending)
- self.assertEquals(set([SUCCESSFUL_FUTURE,
- CANCELLED_AND_NOTIFIED_FUTURE,
- future1]), finished)
- self.assertEquals(set([CANCELLED_FUTURE, future2]), pending)
+ def test_first_exception_some_already_complete(self):
+ future1 = self.executor.submit(divmod, 21, 0)
+ future2 = self.executor.submit(time.sleep, 1.5)
+ finished, pending = futures.wait(
+ [SUCCESSFUL_FUTURE,
+ CANCELLED_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE,
+ future1, future2],
+ return_when=futures.FIRST_EXCEPTION)
- finally:
- call1.close()
- call2.close()
+ self.assertEqual(set([SUCCESSFUL_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE,
+ future1]), finished)
+ self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
def test_first_exception_one_already_failed(self):
- call1 = Call(manual_finish=True)
- try:
- future1 = self.executor.submit(call1)
+ future1 = self.executor.submit(time.sleep, 2)
- finished, pending = futures.wait(
- [EXCEPTION_FUTURE, future1],
- return_when=futures.FIRST_EXCEPTION)
+ finished, pending = futures.wait(
+ [EXCEPTION_FUTURE, future1],
+ return_when=futures.FIRST_EXCEPTION)
- self.assertEquals(set([EXCEPTION_FUTURE]), finished)
- self.assertEquals(set([future1]), pending)
- finally:
- call1.close()
+ self.assertEqual(set([EXCEPTION_FUTURE]), finished)
+ self.assertEqual(set([future1]), pending)
def test_all_completed(self):
- def wait_test():
- while not future1._waiters:
- pass
- call1.set_can()
- call2.set_can()
-
- call1 = Call(manual_finish=True)
- call2 = Call(manual_finish=True)
- try:
- future1 = self.executor.submit(call1)
- future2 = self.executor.submit(call2)
-
- t = threading.Thread(target=wait_test)
- t.start()
- finished, pending = futures.wait(
- [future1, future2],
- return_when=futures.ALL_COMPLETED)
-
- self.assertEquals(set([future1, future2]), finished)
- self.assertEquals(set(), pending)
-
-
- finally:
- call1.close()
- call2.close()
-
- def test_all_completed_some_already_completed(self):
- def wait_test():
- while not future1._waiters:
- pass
-
- future4.cancel()
- call1.set_can()
- call2.set_can()
- call3.set_can()
-
- self.assertTrue(
- futures.process.EXTRA_QUEUED_CALLS <= 1,
- 'this test assumes that future4 will be cancelled before it is '
- 'queued to run - which might not be the case if '
- 'ProcessPoolExecutor is too aggresive in scheduling futures')
- call1 = Call(manual_finish=True)
- call2 = Call(manual_finish=True)
- call3 = Call(manual_finish=True)
- call4 = Call(manual_finish=True)
- try:
- future1 = self.executor.submit(call1)
- future2 = self.executor.submit(call2)
- future3 = self.executor.submit(call3)
- future4 = self.executor.submit(call4)
-
- t = threading.Thread(target=wait_test)
- t.start()
- finished, pending = futures.wait(
- [SUCCESSFUL_FUTURE,
- CANCELLED_AND_NOTIFIED_FUTURE,
- future1, future2, future3, future4],
- return_when=futures.ALL_COMPLETED)
-
- self.assertEquals(set([SUCCESSFUL_FUTURE,
- CANCELLED_AND_NOTIFIED_FUTURE,
- future1, future2, future3, future4]),
- finished)
- self.assertEquals(set(), pending)
- finally:
- call1.close()
- call2.close()
- call3.close()
- call4.close()
+ future1 = self.executor.submit(divmod, 2, 0)
+ future2 = self.executor.submit(mul, 2, 21)
+
+ finished, pending = futures.wait(
+ [SUCCESSFUL_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ future1,
+ future2],
+ return_when=futures.ALL_COMPLETED)
+
+ self.assertEqual(set([SUCCESSFUL_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ future1,
+ future2]), finished)
+ self.assertEqual(set(), pending)
def test_timeout(self):
- def wait_test():
- while not future1._waiters:
- pass
- call1.set_can()
-
- call1 = Call(manual_finish=True)
- call2 = Call(manual_finish=True)
+ future1 = self.executor.submit(mul, 6, 7)
+ future2 = self.executor.submit(time.sleep, 3)
+
+ finished, pending = futures.wait(
+ [CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ SUCCESSFUL_FUTURE,
+ future1, future2],
+ timeout=1.5,
+ return_when=futures.ALL_COMPLETED)
+
+ self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ SUCCESSFUL_FUTURE,
+ future1]), finished)
+ self.assertEqual(set([future2]), pending)
+
+
+class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests):
+
+ def test_pending_calls_race(self):
+ # Issue #14406: multi-threaded race condition when waiting on all
+ # futures.
+ event = threading.Event()
+ def future_func():
+ event.wait()
+ oldswitchinterval = sys.getcheckinterval()
+ sys.setcheckinterval(1)
try:
- future1 = self.executor.submit(call1)
- future2 = self.executor.submit(call2)
-
- t = threading.Thread(target=wait_test)
- t.start()
- finished, pending = futures.wait(
- [CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- SUCCESSFUL_FUTURE,
- future1, future2],
- timeout=1,
- return_when=futures.ALL_COMPLETED)
-
- self.assertEquals(set([CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- SUCCESSFUL_FUTURE,
- future1]), finished)
- self.assertEquals(set([future2]), pending)
-
-
+ fs = set(self.executor.submit(future_func) for i in range(100))
+ event.set()
+ futures.wait(fs, return_when=futures.ALL_COMPLETED)
finally:
- call1.close()
- call2.close()
-
+ sys.setcheckinterval(oldswitchinterval)
-class ThreadPoolWaitTests(WaitTests):
- def setUp(self):
- self.executor = futures.ThreadPoolExecutor(max_workers=1)
-
- def tearDown(self):
- self.executor.shutdown(wait=True)
-class ProcessPoolWaitTests(WaitTests):
- def setUp(self):
- self.executor = futures.ProcessPoolExecutor(max_workers=1)
+class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests):
+ pass
- def tearDown(self):
- self.executor.shutdown(wait=True)
class AsCompletedTests(unittest.TestCase):
- def setUp(self):
- self.executor = futures.ProcessPoolExecutor(max_workers=1)
-
# TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.
def test_no_timeout(self):
- def wait_test():
- while not future1._waiters:
- pass
- call1.set_can()
- call2.set_can()
-
- call1 = Call(manual_finish=True)
- call2 = Call(manual_finish=True)
- try:
- future1 = self.executor.submit(call1)
- future2 = self.executor.submit(call2)
+ future1 = self.executor.submit(mul, 2, 21)
+ future2 = self.executor.submit(mul, 7, 6)
+
+ completed = set(futures.as_completed(
+ [CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ SUCCESSFUL_FUTURE,
+ future1, future2]))
+ self.assertEqual(set(
+ [CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ SUCCESSFUL_FUTURE,
+ future1, future2]),
+ completed)
- t = threading.Thread(target=wait_test)
- t.start()
- completed = set(futures.as_completed(
- [CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- SUCCESSFUL_FUTURE,
- future1, future2]))
- self.assertEquals(set(
+ def test_zero_timeout(self):
+ future1 = self.executor.submit(time.sleep, 2)
+ completed_futures = set()
+ try:
+ for future in futures.as_completed(
[CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
- future1, future2]),
- completed)
- finally:
- call1.close()
- call2.close()
+ future1],
+ timeout=0):
+ completed_futures.add(future)
+ except futures.TimeoutError:
+ pass
- def test_zero_timeout(self):
- call1 = Call(manual_finish=True)
- try:
- future1 = self.executor.submit(call1)
- completed_futures = set()
- try:
- for future in futures.as_completed(
- [CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- SUCCESSFUL_FUTURE,
- future1],
- timeout=0):
- completed_futures.add(future)
- except futures.TimeoutError:
- pass
-
- self.assertEquals(set([CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- SUCCESSFUL_FUTURE]),
- completed_futures)
- finally:
- call1.close()
+ self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ SUCCESSFUL_FUTURE]),
+ completed_futures)
-class ThreadPoolAsCompletedTests(AsCompletedTests):
- def setUp(self):
- self.executor = futures.ThreadPoolExecutor(max_workers=1)
- def tearDown(self):
- self.executor.shutdown(wait=True)
+class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests):
+ pass
-class ProcessPoolAsCompletedTests(AsCompletedTests):
- def setUp(self):
- self.executor = futures.ProcessPoolExecutor(max_workers=1)
- def tearDown(self):
- self.executor.shutdown(wait=True)
+class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests):
+ pass
-class ExecutorTest(unittest.TestCase):
- def setUp(self):
- self.executor = futures.ProcessPoolExecutor(max_workers=1)
+class ExecutorTest(unittest.TestCase):
# Executor.shutdown() and context manager usage is tested by
# ExecutorShutdownTest.
def test_submit(self):
future = self.executor.submit(pow, 2, 8)
- self.assertEquals(256, future.result())
+ self.assertEqual(256, future.result())
def test_submit_keyword(self):
future = self.executor.submit(mul, 2, y=8)
- self.assertEquals(16, future.result())
+ self.assertEqual(16, future.result())
def test_map(self):
self.assertEqual(
@@ -567,139 +470,123 @@ class ExecutorTest(unittest.TestCase):
def test_map_timeout(self):
results = []
- timeout_call = MapCall()
try:
- try:
- for i in self.executor.map(timeout_call,
- [False, False, True],
- timeout=1):
- results.append(i)
- except futures.TimeoutError:
- pass
- else:
- self.fail('expected TimeoutError')
- finally:
- timeout_call.close()
+ for i in self.executor.map(time.sleep,
+ [0, 0, 3],
+ timeout=1.5):
+ results.append(i)
+ except futures.TimeoutError:
+ pass
+ else:
+ self.fail('expected TimeoutError')
- self.assertEquals([42, 42], results)
+ self.assertEqual([None, None], results)
-class ThreadPoolExecutorTest(ExecutorTest):
- def setUp(self):
- self.executor = futures.ThreadPoolExecutor(max_workers=1)
- def tearDown(self):
- self.executor.shutdown(wait=True)
+class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest):
+ pass
-class ProcessPoolExecutorTest(ExecutorTest):
- def setUp(self):
- self.executor = futures.ProcessPoolExecutor(max_workers=1)
- def tearDown(self):
- self.executor.shutdown(wait=True)
+class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest):
+ pass
+
class FutureTests(unittest.TestCase):
def test_done_callback_with_result(self):
- self.callback_result = None
+ callback_result = [None]
def fn(callback_future):
- self.callback_result = callback_future.result()
+ callback_result[0] = callback_future.result()
f = Future()
f.add_done_callback(fn)
f.set_result(5)
- self.assertEquals(5, self.callback_result)
+ self.assertEqual(5, callback_result[0])
def test_done_callback_with_exception(self):
- self.callback_exception = None
+ callback_exception = [None]
def fn(callback_future):
- self.callback_exception = callback_future.exception()
+ callback_exception[0] = callback_future.exception()
f = Future()
f.add_done_callback(fn)
f.set_exception(Exception('test'))
- self.assertEquals(('test',), self.callback_exception.args)
+ self.assertEqual(('test',), callback_exception[0].args)
def test_done_callback_with_cancel(self):
- self.was_cancelled = None
+ was_cancelled = [None]
def fn(callback_future):
- self.was_cancelled = callback_future.cancelled()
+ was_cancelled[0] = callback_future.cancelled()
f = Future()
f.add_done_callback(fn)
self.assertTrue(f.cancel())
- self.assertTrue(self.was_cancelled)
+ self.assertTrue(was_cancelled[0])
def test_done_callback_raises(self):
- LOGGER.removeHandler(STDERR_HANDLER)
- logging_stream = StringIO()
- handler = logging.StreamHandler(logging_stream)
- LOGGER.addHandler(handler)
- try:
- self.raising_was_called = False
- self.fn_was_called = False
+ with captured_stderr() as stderr:
+ raising_was_called = [False]
+ fn_was_called = [False]
def raising_fn(callback_future):
- self.raising_was_called = True
+ raising_was_called[0] = True
raise Exception('doh!')
def fn(callback_future):
- self.fn_was_called = True
+ fn_was_called[0] = True
f = Future()
f.add_done_callback(raising_fn)
f.add_done_callback(fn)
f.set_result(5)
- self.assertTrue(self.raising_was_called)
- self.assertTrue(self.fn_was_called)
- self.assertTrue('Exception: doh!' in logging_stream.getvalue())
- finally:
- LOGGER.removeHandler(handler)
- LOGGER.addHandler(STDERR_HANDLER)
+ self.assertTrue(raising_was_called)
+ self.assertTrue(fn_was_called)
+ self.assertIn('Exception: doh!', stderr.getvalue())
def test_done_callback_already_successful(self):
- self.callback_result = None
+ callback_result = [None]
def fn(callback_future):
- self.callback_result = callback_future.result()
+ callback_result[0] = callback_future.result()
f = Future()
f.set_result(5)
f.add_done_callback(fn)
- self.assertEquals(5, self.callback_result)
+ self.assertEqual(5, callback_result[0])
def test_done_callback_already_failed(self):
- self.callback_exception = None
+ callback_exception = [None]
def fn(callback_future):
- self.callback_exception = callback_future.exception()
+ callback_exception[0] = callback_future.exception()
f = Future()
f.set_exception(Exception('test'))
f.add_done_callback(fn)
- self.assertEquals(('test',), self.callback_exception.args)
+ self.assertEqual(('test',), callback_exception[0].args)
def test_done_callback_already_cancelled(self):
- self.was_cancelled = None
+ was_cancelled = [None]
def fn(callback_future):
- self.was_cancelled = callback_future.cancelled()
+ was_cancelled[0] = callback_future.cancelled()
f = Future()
self.assertTrue(f.cancel())
f.add_done_callback(fn)
- self.assertTrue(self.was_cancelled)
+ self.assertTrue(was_cancelled[0])
def test_repr(self):
- self.assertTrue(re.match('<Future at 0x[0-9a-f]+L? state=pending>',
- repr(PENDING_FUTURE)))
- self.assertTrue(re.match('<Future at 0x[0-9a-f]+L? state=running>',
- repr(RUNNING_FUTURE)))
- self.assertTrue(re.match('<Future at 0x[0-9a-f]+L? state=cancelled>',
- repr(CANCELLED_FUTURE)))
- self.assertTrue(re.match('<Future at 0x[0-9a-f]+L? state=cancelled>',
- repr(CANCELLED_AND_NOTIFIED_FUTURE)))
- self.assertTrue(re.match(
- '<Future at 0x[0-9a-f]+L? state=finished raised IOError>',
- repr(EXCEPTION_FUTURE)))
- self.assertTrue(re.match(
- '<Future at 0x[0-9a-f]+L? state=finished returned int>',
- repr(SUCCESSFUL_FUTURE)))
+ self.assertRegexpMatches(repr(PENDING_FUTURE),
+ '<Future at 0x[0-9a-f]+ state=pending>')
+ self.assertRegexpMatches(repr(RUNNING_FUTURE),
+ '<Future at 0x[0-9a-f]+ state=running>')
+ self.assertRegexpMatches(repr(CANCELLED_FUTURE),
+ '<Future at 0x[0-9a-f]+ state=cancelled>')
+ self.assertRegexpMatches(repr(CANCELLED_AND_NOTIFIED_FUTURE),
+ '<Future at 0x[0-9a-f]+ state=cancelled>')
+ self.assertRegexpMatches(
+ repr(EXCEPTION_FUTURE),
+ '<Future at 0x[0-9a-f]+ state=finished raised IOError>')
+ self.assertRegexpMatches(
+ repr(SUCCESSFUL_FUTURE),
+ '<Future at 0x[0-9a-f]+ state=finished returned int>')
def test_cancel(self):
f1 = create_future(state=PENDING)
@@ -710,22 +597,22 @@ class FutureTests(unittest.TestCase):
f6 = create_future(state=FINISHED, result=5)
self.assertTrue(f1.cancel())
- self.assertEquals(f1._state, CANCELLED)
+ self.assertEqual(f1._state, CANCELLED)
self.assertFalse(f2.cancel())
- self.assertEquals(f2._state, RUNNING)
+ self.assertEqual(f2._state, RUNNING)
self.assertTrue(f3.cancel())
- self.assertEquals(f3._state, CANCELLED)
+ self.assertEqual(f3._state, CANCELLED)
self.assertTrue(f4.cancel())
- self.assertEquals(f4._state, CANCELLED_AND_NOTIFIED)
+ self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED)
self.assertFalse(f5.cancel())
- self.assertEquals(f5._state, FINISHED)
+ self.assertEqual(f5._state, FINISHED)
self.assertFalse(f6.cancel())
- self.assertEquals(f6._state, FINISHED)
+ self.assertEqual(f6._state, FINISHED)
def test_cancelled(self):
self.assertFalse(PENDING_FUTURE.cancelled())
@@ -774,7 +661,7 @@ class FutureTests(unittest.TestCase):
t = threading.Thread(target=notification)
t.start()
- self.assertEquals(f1.result(timeout=5), 42)
+ self.assertEqual(f1.result(timeout=5), 42)
def test_result_with_cancel(self):
# TODO(brian@sweetapp.com): This test is timing dependant.
@@ -817,16 +704,20 @@ class FutureTests(unittest.TestCase):
self.assertTrue(isinstance(f1.exception(timeout=5), IOError))
+@reap_threads
def test_main():
- run_unittest(ProcessPoolExecutorTest,
- ThreadPoolExecutorTest,
- ProcessPoolWaitTests,
- ThreadPoolWaitTests,
- ProcessPoolAsCompletedTests,
- ThreadPoolAsCompletedTests,
- FutureTests,
- ProcessPoolShutdownTest,
- ThreadPoolShutdownTest)
+ try:
+ test_support.run_unittest(ProcessPoolExecutorTest,
+ ThreadPoolExecutorTest,
+ ProcessPoolWaitTests,
+ ThreadPoolWaitTests,
+ ProcessPoolAsCompletedTests,
+ ThreadPoolAsCompletedTests,
+ FutureTests,
+ ProcessPoolShutdownTest,
+ ThreadPoolShutdownTest)
+ finally:
+ test_support.reap_children()
if __name__ == "__main__":
test_main()
diff --git a/tox.ini b/tox.ini
index 83eda49..c1ff2f1 100644
--- a/tox.ini
+++ b/tox.ini
@@ -1,11 +1,8 @@
[tox]
-envlist = py25,py26,py27,py31
+envlist = py26,py27,py31
[testenv]
commands={envpython} test_futures.py []
-#[testenv:py24]
-#deps=multiprocessing
-#
-#[testenv:py25]
-#deps=multiprocessing
+[testenv:py26]
+deps=unittest2