From acb8961109b61390bac0a0cd795d53924db20f4c Mon Sep 17 00:00:00 2001 From: Alex Gr?nholm Date: Mon, 24 Jun 2013 01:20:47 +0300 Subject: Redid the port from scratch using Python 3.2.5 as base --- .hgignore | 11 + CHANGES | 6 + concurrent/futures/_base.py | 12 +- concurrent/futures/process.py | 170 +++++---- concurrent/futures/thread.py | 60 ++- setup.py | 2 +- test_futures.py | 853 ++++++++++++++++++------------------------ tox.ini | 9 +- 8 files changed, 520 insertions(+), 603 deletions(-) create mode 100644 .hgignore diff --git a/.hgignore b/.hgignore new file mode 100644 index 0000000..78f3a52 --- /dev/null +++ b/.hgignore @@ -0,0 +1,11 @@ + +syntax: glob +*.egg-info +syntax: regexp +^\.tox$ +syntax: regexp +^\.project$ +syntax: regexp +^\.pydevproject$ +syntax: regexp +^dist$ \ No newline at end of file diff --git a/CHANGES b/CHANGES index 1a84b63..81df636 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,9 @@ +2.1.4 +===== + +- Ported the library again from Python 3.2.5 to get the latest bug fixes + + 2.1.3 ===== diff --git a/concurrent/futures/_base.py b/concurrent/futures/_base.py index aaefa2b..8ed69b7 100644 --- a/concurrent/futures/_base.py +++ b/concurrent/futures/_base.py @@ -2,7 +2,6 @@ # Licensed to PSF under a Contributor Agreement. from __future__ import with_statement -import functools import logging import threading import time @@ -46,8 +45,6 @@ _STATE_TO_DESCRIPTION_MAP = { # Logger for internal use by the futures package. LOGGER = logging.getLogger("concurrent.futures") -STDERR_HANDLER = logging.StreamHandler() -LOGGER.addHandler(STDERR_HANDLER) class Error(Exception): """Base class for all future-related exceptions.""" @@ -119,11 +116,14 @@ class _AllCompletedWaiter(_Waiter): def __init__(self, num_pending_calls, stop_on_exception): self.num_pending_calls = num_pending_calls self.stop_on_exception = stop_on_exception + self.lock = threading.Lock() super(_AllCompletedWaiter, self).__init__() def _decrement_pending_calls(self): - if self.num_pending_calls == len(self.finished_futures): - self.event.set() + with self.lock: + self.num_pending_calls -= 1 + if not self.num_pending_calls: + self.event.set() def add_result(self, future): super(_AllCompletedWaiter, self).add_result(future) @@ -523,7 +523,7 @@ class Executor(object): """Returns a iterator equivalent to map(fn, iter). Args: - fn: A callable that will take take as many arguments as there are + fn: A callable that will take as many arguments as there are passed iterables. timeout: The maximum number of seconds to wait. If None, then there is no limit on the wait time. diff --git a/concurrent/futures/process.py b/concurrent/futures/process.py index 87dc789..98684f8 100644 --- a/concurrent/futures/process.py +++ b/concurrent/futures/process.py @@ -73,28 +73,17 @@ __author__ = 'Brian Quinlan (brian@sweetapp.com)' # workers to exit when their work queues are empty and then waits until the # threads/processes finish. -_thread_references = set() +_threads_queues = weakref.WeakKeyDictionary() _shutdown = False def _python_exit(): global _shutdown _shutdown = True - for thread_reference in _thread_references: - thread = thread_reference() - if thread is not None: - thread.join() - -def _remove_dead_thread_references(): - """Remove inactive threads from _thread_references. - - Should be called periodically to prevent memory leaks in scenarios such as: - >>> while True: - >>> ... t = ThreadPoolExecutor(max_workers=5) - >>> ... t.map(int, ['1', '2', '3', '4', '5']) - """ - for thread_reference in set(_thread_references): - if thread_reference() is None: - _thread_references.discard(thread_reference) + items = list(_threads_queues.items()) + for t, q in items: + q.put(None) + for t, q in items: + t.join() # Controls how many more calls than processes will be queued in the call queue. # A smaller number will mean that processes spend more time idle waiting for @@ -122,10 +111,10 @@ class _CallItem(object): self.args = args self.kwargs = kwargs -def _process_worker(call_queue, result_queue, shutdown): +def _process_worker(call_queue, result_queue): """Evaluates calls from call_queue and places the results in result_queue. - This worker is run in a seperate process. + This worker is run in a separate process. Args: call_queue: A multiprocessing.Queue of _CallItems that will be read and @@ -136,21 +125,20 @@ def _process_worker(call_queue, result_queue, shutdown): worker that it should exit when call_queue is empty. """ while True: + call_item = call_queue.get(block=True) + if call_item is None: + # Wake up queue management thread + result_queue.put(None) + return try: - call_item = call_queue.get(block=True, timeout=0.1) - except queue.Empty: - if shutdown.is_set(): - return + r = call_item.fn(*call_item.args, **call_item.kwargs) + except BaseException: + e = sys.exc_info()[1] + result_queue.put(_ResultItem(call_item.work_id, + exception=e)) else: - try: - r = call_item.fn(*call_item.args, **call_item.kwargs) - except BaseException: - e = sys.exc_info()[1] - result_queue.put(_ResultItem(call_item.work_id, - exception=e)) - else: - result_queue.put(_ResultItem(call_item.work_id, - result=r)) + result_queue.put(_ResultItem(call_item.work_id, + result=r)) def _add_call_item_to_queue(pending_work_items, work_ids, @@ -189,13 +177,12 @@ def _add_call_item_to_queue(pending_work_items, del pending_work_items[work_id] continue -def _queue_manangement_worker(executor_reference, - processes, - pending_work_items, - work_ids_queue, - call_queue, - result_queue, - shutdown_process_event): +def _queue_management_worker(executor_reference, + processes, + pending_work_items, + work_ids_queue, + call_queue, + result_queue): """Manages the communication between this process and the worker processes. This function is run in a local thread. @@ -213,37 +200,19 @@ def _queue_manangement_worker(executor_reference, derived from _WorkItems for processing by the process workers. result_queue: A multiprocessing.Queue of _ResultItems generated by the process workers. - shutdown_process_event: A multiprocessing.Event used to signal the - process workers that they should exit when their work queue is - empty. """ + nb_shutdown_processes = [0] + def shutdown_one_process(): + """Tell a worker to terminate, which will in turn wake us again""" + call_queue.put(None) + nb_shutdown_processes[0] += 1 while True: _add_call_item_to_queue(pending_work_items, work_ids_queue, call_queue) - try: - result_item = result_queue.get(block=True, timeout=0.1) - except queue.Empty: - executor = executor_reference() - # No more work items can be added if: - # - The interpreter is shutting down OR - # - The executor that owns this worker has been collected OR - # - The executor that owns this worker has been shutdown. - if _shutdown or executor is None or executor._shutdown_thread: - # Since no new work items can be added, it is safe to shutdown - # this thread if there are no pending work items. - if not pending_work_items: - shutdown_process_event.set() - - # If .join() is not called on the created processes then - # some multiprocessing.Queue methods may deadlock on Mac OS - # X. - for p in processes: - p.join() - return - del executor - else: + result_item = result_queue.get(block=True) + if result_item is not None: work_item = pending_work_items[result_item.work_id] del pending_work_items[result_item.work_id] @@ -251,6 +220,51 @@ def _queue_manangement_worker(executor_reference, work_item.future.set_exception(result_item.exception) else: work_item.future.set_result(result_item.result) + # Check whether we should start shutting down. + executor = executor_reference() + # No more work items can be added if: + # - The interpreter is shutting down OR + # - The executor that owns this worker has been collected OR + # - The executor that owns this worker has been shutdown. + if _shutdown or executor is None or executor._shutdown_thread: + # Since no new work items can be added, it is safe to shutdown + # this thread if there are no pending work items. + if not pending_work_items: + while nb_shutdown_processes[0] < len(processes): + shutdown_one_process() + # If .join() is not called on the created processes then + # some multiprocessing.Queue methods may deadlock on Mac OS + # X. + for p in processes: + p.join() + call_queue.close() + return + del executor + +_system_limits_checked = False +_system_limited = None +def _check_system_limits(): + global _system_limits_checked, _system_limited + if _system_limits_checked: + if _system_limited: + raise NotImplementedError(_system_limited) + _system_limits_checked = True + try: + import os + nsems_max = os.sysconf("SC_SEM_NSEMS_MAX") + except (AttributeError, ValueError): + # sysconf not available or setting not available + return + if nsems_max == -1: + # indetermine limit, assume that limit is determined + # by available memory only + return + if nsems_max >= 256: + # minimum number of semaphores available + # according to POSIX + return + _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max + raise NotImplementedError(_system_limited) class ProcessPoolExecutor(_base.Executor): def __init__(self, max_workers=None): @@ -261,7 +275,7 @@ class ProcessPoolExecutor(_base.Executor): execute the given calls. If None or not given then as many worker processes will be created as the machine has processors. """ - _remove_dead_thread_references() + _check_system_limits() if max_workers is None: self._max_workers = multiprocessing.cpu_count() @@ -280,33 +294,34 @@ class ProcessPoolExecutor(_base.Executor): # Shutdown is a two-step process. self._shutdown_thread = False - self._shutdown_process_event = multiprocessing.Event() self._shutdown_lock = threading.Lock() self._queue_count = 0 self._pending_work_items = {} def _start_queue_management_thread(self): + # When the executor gets lost, the weakref callback will wake up + # the queue management thread. + def weakref_cb(_, q=self._result_queue): + q.put(None) if self._queue_management_thread is None: self._queue_management_thread = threading.Thread( - target=_queue_manangement_worker, - args=(weakref.ref(self), + target=_queue_management_worker, + args=(weakref.ref(self, weakref_cb), self._processes, self._pending_work_items, self._work_ids, self._call_queue, - self._result_queue, - self._shutdown_process_event)) + self._result_queue)) self._queue_management_thread.daemon = True self._queue_management_thread.start() - _thread_references.add(weakref.ref(self._queue_management_thread)) + _threads_queues[self._queue_management_thread] = self._result_queue def _adjust_process_count(self): for _ in range(len(self._processes), self._max_workers): p = multiprocessing.Process( target=_process_worker, args=(self._call_queue, - self._result_queue, - self._shutdown_process_event)) + self._result_queue)) p.start() self._processes.add(p) @@ -321,6 +336,8 @@ class ProcessPoolExecutor(_base.Executor): self._pending_work_items[self._queue_count] = w self._work_ids.put(self._queue_count) self._queue_count += 1 + # Wake up queue management thread + self._result_queue.put(None) self._start_queue_management_thread() self._adjust_process_count() @@ -330,15 +347,16 @@ class ProcessPoolExecutor(_base.Executor): def shutdown(self, wait=True): with self._shutdown_lock: self._shutdown_thread = True - if wait: - if self._queue_management_thread: + if self._queue_management_thread: + # Wake up queue management thread + self._result_queue.put(None) + if wait: self._queue_management_thread.join() # To reduce the risk of openning too many files, remove references to # objects that use file descriptors. self._queue_management_thread = None self._call_queue = None self._result_queue = None - self._shutdown_process_event = None self._processes = None shutdown.__doc__ = _base.Executor.shutdown.__doc__ diff --git a/concurrent/futures/thread.py b/concurrent/futures/thread.py index ce0dda0..a45959d 100644 --- a/concurrent/futures/thread.py +++ b/concurrent/futures/thread.py @@ -32,28 +32,17 @@ __author__ = 'Brian Quinlan (brian@sweetapp.com)' # workers to exit when their work queues are empty and then waits until the # threads finish. -_thread_references = set() +_threads_queues = weakref.WeakKeyDictionary() _shutdown = False def _python_exit(): global _shutdown _shutdown = True - for thread_reference in _thread_references: - thread = thread_reference() - if thread is not None: - thread.join() - -def _remove_dead_thread_references(): - """Remove inactive threads from _thread_references. - - Should be called periodically to prevent memory leaks in scenarios such as: - >>> while True: - ... t = ThreadPoolExecutor(max_workers=5) - ... t.map(int, ['1', '2', '3', '4', '5']) - """ - for thread_reference in set(_thread_references): - if thread_reference() is None: - _thread_references.discard(thread_reference) + items = list(_threads_queues.items()) + for t, q in items: + q.put(None) + for t, q in items: + t.join() atexit.register(_python_exit) @@ -79,19 +68,20 @@ class _WorkItem(object): def _worker(executor_reference, work_queue): try: while True: - try: - work_item = work_queue.get(block=True, timeout=0.1) - except queue.Empty: - executor = executor_reference() - # Exit if: - # - The interpreter is shutting down OR - # - The executor that owns the worker has been collected OR - # - The executor that owns the worker has been shutdown. - if _shutdown or executor is None or executor._shutdown: - return - del executor - else: + work_item = work_queue.get(block=True) + if work_item is not None: work_item.run() + continue + executor = executor_reference() + # Exit if: + # - The interpreter is shutting down OR + # - The executor that owns the worker has been collected OR + # - The executor that owns the worker has been shutdown. + if _shutdown or executor is None or executor._shutdown: + # Notice other workers + work_queue.put(None) + return + del executor except BaseException: _base.LOGGER.critical('Exception in worker', exc_info=True) @@ -103,8 +93,6 @@ class ThreadPoolExecutor(_base.Executor): max_workers: The maximum number of threads that can be used to execute the given calls. """ - _remove_dead_thread_references() - self._max_workers = max_workers self._work_queue = queue.Queue() self._threads = set() @@ -125,19 +113,25 @@ class ThreadPoolExecutor(_base.Executor): submit.__doc__ = _base.Executor.submit.__doc__ def _adjust_thread_count(self): + # When the executor gets lost, the weakref callback will wake up + # the worker threads. + def weakref_cb(_, q=self._work_queue): + q.put(None) # TODO(bquinlan): Should avoid creating new threads if there are more # idle threads than items in the work queue. if len(self._threads) < self._max_workers: t = threading.Thread(target=_worker, - args=(weakref.ref(self), self._work_queue)) + args=(weakref.ref(self, weakref_cb), + self._work_queue)) t.daemon = True t.start() self._threads.add(t) - _thread_references.add(weakref.ref(t)) + _threads_queues[t] = self._work_queue def shutdown(self, wait=True): with self._shutdown_lock: self._shutdown = True + self._work_queue.put(None) if wait: for t in self._threads: t.join() diff --git a/setup.py b/setup.py index f0dcd89..c08461e 100755 --- a/setup.py +++ b/setup.py @@ -11,7 +11,7 @@ except ImportError: from distutils.core import setup setup(name='futures', - version='2.1.3', + version='2.1.4', description='Backport of the concurrent.futures package from Python 3.2', author='Brian Quinlan', author_email='brian@sweetapp.com', diff --git a/test_futures.py b/test_futures.py index 725a60b..dd7fd3e 100644 --- a/test_futures.py +++ b/test_futures.py @@ -1,11 +1,22 @@ from __future__ import with_statement -import logging -import multiprocessing -import re +import os +import subprocess import sys import threading +import functools +import contextlib +import logging +import re import time -import unittest + +from concurrent import futures +from concurrent.futures._base import ( + PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future) + +try: + import unittest2 as unittest +except ImportError: + import unittest try: from StringIO import StringIO @@ -13,21 +24,95 @@ except ImportError: from io import StringIO try: - from test.test_support import run_unittest + from test import test_support except ImportError: - from test.support import run_unittest + from test import support as test_support -if sys.version_info < (3, 0): +try: + next +except NameError: next = lambda x: x.next() -if sys.platform.startswith('win'): - import ctypes - import ctypes.wintypes -from concurrent import futures -from concurrent.futures._base import ( - PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future, - LOGGER, STDERR_HANDLER) +def reap_threads(func): + """Use this function when threads are being used. This will + ensure that the threads are cleaned up even when the test fails. + If threading is unavailable this function does nothing. + """ + @functools.wraps(func) + def decorator(*args): + key = test_support.threading_setup() + try: + return func(*args) + finally: + test_support.threading_cleanup(*key) + return decorator + + +# Executing the interpreter in a subprocess +def _assert_python(expected_success, *args, **env_vars): + cmd_line = [sys.executable] + if not env_vars: + cmd_line.append('-E') + # Need to preserve the original environment, for in-place testing of + # shared library builds. + env = os.environ.copy() + # But a special flag that can be set to override -- in this case, the + # caller is responsible to pass the full environment. + if env_vars.pop('__cleanenv', None): + env = {} + env.update(env_vars) + cmd_line.extend(args) + p = subprocess.Popen(cmd_line, stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + env=env) + try: + out, err = p.communicate() + finally: + subprocess._cleanup() + p.stdout.close() + p.stderr.close() + rc = p.returncode + err = strip_python_stderr(err) + if (rc and expected_success) or (not rc and not expected_success): + raise AssertionError( + "Process return code is %d, " + "stderr follows:\n%s" % (rc, err.decode('ascii', 'ignore'))) + return rc, out, err + + +def assert_python_ok(*args, **env_vars): + """ + Assert that running the interpreter with `args` and optional environment + variables `env_vars` is ok and return a (return code, stdout, stderr) tuple. + """ + return _assert_python(True, *args, **env_vars) + + +def strip_python_stderr(stderr): + """Strip the stderr of a Python process from potential debug output + emitted by the interpreter. + + This will typically be run on the result of the communicate() method + of a subprocess.Popen object. + """ + stderr = re.sub(r"\[\d+ refs\]\r?\n?$".encode(), "".encode(), stderr).strip() + return stderr + + +@contextlib.contextmanager +def captured_stderr(): + """Return a context manager used by captured_stdout/stdin/stderr + that temporarily replaces the sys stream *stream_name* with a StringIO.""" + logging_stream = StringIO() + handler = logging.StreamHandler(logging_stream) + logging.root.addHandler(handler) + + try: + yield logging_stream + finally: + logging.root.removeHandler(handler) + def create_future(state=PENDING, exception=None, result=None): f = Future() @@ -36,6 +121,7 @@ def create_future(state=PENDING, exception=None, result=None): f._result = result return f + PENDING_FUTURE = create_future(state=PENDING) RUNNING_FUTURE = create_future(state=RUNNING) CANCELLED_FUTURE = create_future(state=CANCELLED) @@ -43,138 +129,94 @@ CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED) EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError()) SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42) + def mul(x, y): return x * y -class Call(object): - """A call that can be submitted to a future.Executor for testing. - The call signals when it is called and waits for an event before finishing. - """ - CALL_LOCKS = {} - def _create_event(self): - if sys.platform.startswith('win'): - class SECURITY_ATTRIBUTES(ctypes.Structure): - _fields_ = [("nLength", ctypes.wintypes.DWORD), - ("lpSecurityDescriptor", ctypes.wintypes.LPVOID), - ("bInheritHandle", ctypes.wintypes.BOOL)] - - s = SECURITY_ATTRIBUTES() - s.nLength = ctypes.sizeof(s) - s.lpSecurityDescriptor = None - s.bInheritHandle = True - - handle = ctypes.windll.kernel32.CreateEventA(ctypes.pointer(s), - True, - False, - None) - assert handle is not None - return handle - else: - event = multiprocessing.Event() - self.CALL_LOCKS[id(event)] = event - return id(event) - - def _wait_on_event(self, handle): - if sys.platform.startswith('win'): - r = ctypes.windll.kernel32.WaitForSingleObject(handle, 5 * 1000) - assert r == 0 - else: - self.CALL_LOCKS[handle].wait() +def sleep_and_raise(t): + time.sleep(t) + raise Exception('this is an exception') - def _signal_event(self, handle): - if sys.platform.startswith('win'): - r = ctypes.windll.kernel32.SetEvent(handle) - assert r != 0 - else: - self.CALL_LOCKS[handle].set() +def sleep_and_print(t, msg): + time.sleep(t) + print(msg) + sys.stdout.flush() - def __init__(self, manual_finish=False, result=42): - self._called_event = self._create_event() - self._can_finish = self._create_event() - self._result = result +class ExecutorMixin: + worker_count = 5 - if not manual_finish: - self._signal_event(self._can_finish) + def setUp(self): + self.t1 = time.time() + try: + self.executor = self.executor_type(max_workers=self.worker_count) + except NotImplementedError: + e = sys.exc_info()[1] + self.skipTest(str(e)) + self._prime_executor() - def wait_on_called(self): - self._wait_on_event(self._called_event) + def tearDown(self): + self.executor.shutdown(wait=True) + dt = time.time() - self.t1 + if test_support.verbose: + print("%.2fs" % dt) + self.assertLess(dt, 60, "synchronization issue: test lasted too long") - def set_can(self): - self._signal_event(self._can_finish) + def _prime_executor(self): + # Make sure that the executor is ready to do work before running the + # tests. This should reduce the probability of timeouts in the tests. + futures = [self.executor.submit(time.sleep, 0.1) + for _ in range(self.worker_count)] - def __call__(self): - self._signal_event(self._called_event) - self._wait_on_event(self._can_finish) + for f in futures: + f.result() - return self._result - def close(self): - self.set_can() - if sys.platform.startswith('win'): - ctypes.windll.kernel32.CloseHandle(self._called_event) - ctypes.windll.kernel32.CloseHandle(self._can_finish) - else: - del self.CALL_LOCKS[self._called_event] - del self.CALL_LOCKS[self._can_finish] +class ThreadPoolMixin(ExecutorMixin): + executor_type = futures.ThreadPoolExecutor -class ExceptionCall(Call): - def __call__(self): - self._signal_event(self._called_event) - self._wait_on_event(self._can_finish) - raise ZeroDivisionError() -class MapCall(Call): - def __init__(self, result=42): - super(MapCall, self).__init__(manual_finish=True, result=result) +class ProcessPoolMixin(ExecutorMixin): + executor_type = futures.ProcessPoolExecutor - def __call__(self, manual_finish): - if manual_finish: - super(MapCall, self).__call__() - return self._result class ExecutorShutdownTest(unittest.TestCase): - def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_workers=1) - def test_run_after_shutdown(self): self.executor.shutdown() self.assertRaises(RuntimeError, self.executor.submit, pow, 2, 5) - def _start_some_futures(self): - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) - call3 = Call(manual_finish=True) - - try: - self.executor.submit(call1) - self.executor.submit(call2) - self.executor.submit(call3) - - call1.wait_on_called() - call2.wait_on_called() - call3.wait_on_called() - - call1.set_can() - call2.set_can() - call3.set_can() - finally: - call1.close() - call2.close() - call3.close() + def test_interpreter_shutdown(self): + # Test the atexit hook for shutdown of worker threads and processes + rc, out, err = assert_python_ok('-c', """if 1: + from concurrent.futures import %s + from time import sleep + from test_futures import sleep_and_print + t = %s(5) + t.submit(sleep_and_print, 1.0, "apple") + """ % (self.executor_type.__name__, self.executor_type.__name__)) + # Errors in atexit hooks don't change the process exit code, check + # stderr manually. + self.assertFalse(err) + self.assertEqual(out.strip(), "apple".encode()) + + def test_hang_issue12364(self): + fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)] + self.executor.shutdown() + for f in fs: + f.result() -class ThreadPoolShutdownTest(ExecutorShutdownTest): - def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_workers=5) - def tearDown(self): - self.executor.shutdown(wait=True) +class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest): + def _prime_executor(self): + pass def test_threads_terminate(self): - self._start_some_futures() + self.executor.submit(mul, 21, 2) + self.executor.submit(mul, 6, 7) + self.executor.submit(mul, 3, 14) self.assertEqual(len(self.executor._threads), 3) self.executor.shutdown() for t in self.executor._threads: @@ -198,15 +240,15 @@ class ThreadPoolShutdownTest(ExecutorShutdownTest): for t in threads: t.join() -class ProcessPoolShutdownTest(ExecutorShutdownTest): - def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_workers=5) - def tearDown(self): - self.executor.shutdown(wait=True) +class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest): + def _prime_executor(self): + pass def test_processes_terminate(self): - self._start_some_futures() + self.executor.submit(mul, 21, 2) + self.executor.submit(mul, 6, 7) + self.executor.submit(mul, 3, 14) self.assertEqual(len(self.executor._processes), 5) processes = self.executor._processes self.executor.shutdown() @@ -216,11 +258,11 @@ class ProcessPoolShutdownTest(ExecutorShutdownTest): def test_context_manager_shutdown(self): with futures.ProcessPoolExecutor(max_workers=5) as e: - executor = e + processes = e._processes self.assertEqual(list(e.map(abs, range(-5, 5))), [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) - for p in self.executor._processes: + for p in processes: p.join() def test_del_shutdown(self): @@ -234,325 +276,186 @@ class ProcessPoolShutdownTest(ExecutorShutdownTest): for p in processes: p.join() + class WaitTests(unittest.TestCase): - def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_workers=1) def test_first_completed(self): - def wait_test(): - while not future1._waiters: - pass - call1.set_can() + future1 = self.executor.submit(mul, 21, 2) + future2 = self.executor.submit(time.sleep, 1.5) - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) + done, not_done = futures.wait( + [CANCELLED_FUTURE, future1, future2], + return_when=futures.FIRST_COMPLETED) - t = threading.Thread(target=wait_test) - t.start() - done, not_done = futures.wait( - [CANCELLED_FUTURE, future1, future2], - return_when=futures.FIRST_COMPLETED) + self.assertEqual(set([future1]), done) + self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done) - self.assertEquals(set([future1]), done) - self.assertEquals(set([CANCELLED_FUTURE, future2]), not_done) - finally: - call1.close() - call2.close() - - def test_first_completed_one_already_completed(self): - call1 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) + def test_first_completed_some_already_completed(self): + future1 = self.executor.submit(time.sleep, 1.5) - finished, pending = futures.wait( - [SUCCESSFUL_FUTURE, future1], - return_when=futures.FIRST_COMPLETED) + finished, pending = futures.wait( + [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1], + return_when=futures.FIRST_COMPLETED) - self.assertEquals(set([SUCCESSFUL_FUTURE]), finished) - self.assertEquals(set([future1]), pending) - finally: - call1.close() + self.assertEqual( + set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]), + finished) + self.assertEqual(set([future1]), pending) def test_first_exception(self): - def wait_test(): - while not future1._waiters: - pass - call1.set_can() - call2.set_can() - - call1 = Call(manual_finish=True) - call2 = ExceptionCall(manual_finish=True) - call3 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - future3 = self.executor.submit(call3) - - t = threading.Thread(target=wait_test) - t.start() - finished, pending = futures.wait( - [future1, future2, future3], - return_when=futures.FIRST_EXCEPTION) - - self.assertEquals(set([future1, future2]), finished) - self.assertEquals(set([future3]), pending) - finally: - call1.close() - call2.close() - call3.close() + future1 = self.executor.submit(mul, 2, 21) + future2 = self.executor.submit(sleep_and_raise, 1.5) + future3 = self.executor.submit(time.sleep, 3) - def test_first_exception_some_already_complete(self): - def wait_test(): - while not future1._waiters: - pass - call1.set_can() - - call1 = ExceptionCall(manual_finish=True) - call2 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) + finished, pending = futures.wait( + [future1, future2, future3], + return_when=futures.FIRST_EXCEPTION) - t = threading.Thread(target=wait_test) - t.start() - finished, pending = futures.wait( - [SUCCESSFUL_FUTURE, - CANCELLED_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - future1, future2], - return_when=futures.FIRST_EXCEPTION) + self.assertEqual(set([future1, future2]), finished) + self.assertEqual(set([future3]), pending) - self.assertEquals(set([SUCCESSFUL_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - future1]), finished) - self.assertEquals(set([CANCELLED_FUTURE, future2]), pending) + def test_first_exception_some_already_complete(self): + future1 = self.executor.submit(divmod, 21, 0) + future2 = self.executor.submit(time.sleep, 1.5) + finished, pending = futures.wait( + [SUCCESSFUL_FUTURE, + CANCELLED_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + future1, future2], + return_when=futures.FIRST_EXCEPTION) - finally: - call1.close() - call2.close() + self.assertEqual(set([SUCCESSFUL_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + future1]), finished) + self.assertEqual(set([CANCELLED_FUTURE, future2]), pending) def test_first_exception_one_already_failed(self): - call1 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) + future1 = self.executor.submit(time.sleep, 2) - finished, pending = futures.wait( - [EXCEPTION_FUTURE, future1], - return_when=futures.FIRST_EXCEPTION) + finished, pending = futures.wait( + [EXCEPTION_FUTURE, future1], + return_when=futures.FIRST_EXCEPTION) - self.assertEquals(set([EXCEPTION_FUTURE]), finished) - self.assertEquals(set([future1]), pending) - finally: - call1.close() + self.assertEqual(set([EXCEPTION_FUTURE]), finished) + self.assertEqual(set([future1]), pending) def test_all_completed(self): - def wait_test(): - while not future1._waiters: - pass - call1.set_can() - call2.set_can() - - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - - t = threading.Thread(target=wait_test) - t.start() - finished, pending = futures.wait( - [future1, future2], - return_when=futures.ALL_COMPLETED) - - self.assertEquals(set([future1, future2]), finished) - self.assertEquals(set(), pending) - - - finally: - call1.close() - call2.close() - - def test_all_completed_some_already_completed(self): - def wait_test(): - while not future1._waiters: - pass - - future4.cancel() - call1.set_can() - call2.set_can() - call3.set_can() - - self.assertTrue( - futures.process.EXTRA_QUEUED_CALLS <= 1, - 'this test assumes that future4 will be cancelled before it is ' - 'queued to run - which might not be the case if ' - 'ProcessPoolExecutor is too aggresive in scheduling futures') - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) - call3 = Call(manual_finish=True) - call4 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - future3 = self.executor.submit(call3) - future4 = self.executor.submit(call4) - - t = threading.Thread(target=wait_test) - t.start() - finished, pending = futures.wait( - [SUCCESSFUL_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - future1, future2, future3, future4], - return_when=futures.ALL_COMPLETED) - - self.assertEquals(set([SUCCESSFUL_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - future1, future2, future3, future4]), - finished) - self.assertEquals(set(), pending) - finally: - call1.close() - call2.close() - call3.close() - call4.close() + future1 = self.executor.submit(divmod, 2, 0) + future2 = self.executor.submit(mul, 2, 21) + + finished, pending = futures.wait( + [SUCCESSFUL_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + future1, + future2], + return_when=futures.ALL_COMPLETED) + + self.assertEqual(set([SUCCESSFUL_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + future1, + future2]), finished) + self.assertEqual(set(), pending) def test_timeout(self): - def wait_test(): - while not future1._waiters: - pass - call1.set_can() - - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) + future1 = self.executor.submit(mul, 6, 7) + future2 = self.executor.submit(time.sleep, 3) + + finished, pending = futures.wait( + [CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE, + future1, future2], + timeout=1.5, + return_when=futures.ALL_COMPLETED) + + self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE, + future1]), finished) + self.assertEqual(set([future2]), pending) + + +class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests): + + def test_pending_calls_race(self): + # Issue #14406: multi-threaded race condition when waiting on all + # futures. + event = threading.Event() + def future_func(): + event.wait() + oldswitchinterval = sys.getcheckinterval() + sys.setcheckinterval(1) try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - - t = threading.Thread(target=wait_test) - t.start() - finished, pending = futures.wait( - [CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE, - future1, future2], - timeout=1, - return_when=futures.ALL_COMPLETED) - - self.assertEquals(set([CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE, - future1]), finished) - self.assertEquals(set([future2]), pending) - - + fs = set(self.executor.submit(future_func) for i in range(100)) + event.set() + futures.wait(fs, return_when=futures.ALL_COMPLETED) finally: - call1.close() - call2.close() - + sys.setcheckinterval(oldswitchinterval) -class ThreadPoolWaitTests(WaitTests): - def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_workers=1) - - def tearDown(self): - self.executor.shutdown(wait=True) -class ProcessPoolWaitTests(WaitTests): - def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_workers=1) +class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests): + pass - def tearDown(self): - self.executor.shutdown(wait=True) class AsCompletedTests(unittest.TestCase): - def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_workers=1) - # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout. def test_no_timeout(self): - def wait_test(): - while not future1._waiters: - pass - call1.set_can() - call2.set_can() - - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) + future1 = self.executor.submit(mul, 2, 21) + future2 = self.executor.submit(mul, 7, 6) + + completed = set(futures.as_completed( + [CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE, + future1, future2])) + self.assertEqual(set( + [CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE, + future1, future2]), + completed) - t = threading.Thread(target=wait_test) - t.start() - completed = set(futures.as_completed( - [CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE, - future1, future2])) - self.assertEquals(set( + def test_zero_timeout(self): + future1 = self.executor.submit(time.sleep, 2) + completed_futures = set() + try: + for future in futures.as_completed( [CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE, SUCCESSFUL_FUTURE, - future1, future2]), - completed) - finally: - call1.close() - call2.close() + future1], + timeout=0): + completed_futures.add(future) + except futures.TimeoutError: + pass - def test_zero_timeout(self): - call1 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - completed_futures = set() - try: - for future in futures.as_completed( - [CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE, - future1], - timeout=0): - completed_futures.add(future) - except futures.TimeoutError: - pass - - self.assertEquals(set([CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE]), - completed_futures) - finally: - call1.close() + self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE]), + completed_futures) -class ThreadPoolAsCompletedTests(AsCompletedTests): - def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_workers=1) - def tearDown(self): - self.executor.shutdown(wait=True) +class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests): + pass -class ProcessPoolAsCompletedTests(AsCompletedTests): - def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_workers=1) - def tearDown(self): - self.executor.shutdown(wait=True) +class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests): + pass -class ExecutorTest(unittest.TestCase): - def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_workers=1) +class ExecutorTest(unittest.TestCase): # Executor.shutdown() and context manager usage is tested by # ExecutorShutdownTest. def test_submit(self): future = self.executor.submit(pow, 2, 8) - self.assertEquals(256, future.result()) + self.assertEqual(256, future.result()) def test_submit_keyword(self): future = self.executor.submit(mul, 2, y=8) - self.assertEquals(16, future.result()) + self.assertEqual(16, future.result()) def test_map(self): self.assertEqual( @@ -567,139 +470,123 @@ class ExecutorTest(unittest.TestCase): def test_map_timeout(self): results = [] - timeout_call = MapCall() try: - try: - for i in self.executor.map(timeout_call, - [False, False, True], - timeout=1): - results.append(i) - except futures.TimeoutError: - pass - else: - self.fail('expected TimeoutError') - finally: - timeout_call.close() + for i in self.executor.map(time.sleep, + [0, 0, 3], + timeout=1.5): + results.append(i) + except futures.TimeoutError: + pass + else: + self.fail('expected TimeoutError') - self.assertEquals([42, 42], results) + self.assertEqual([None, None], results) -class ThreadPoolExecutorTest(ExecutorTest): - def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_workers=1) - def tearDown(self): - self.executor.shutdown(wait=True) +class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest): + pass -class ProcessPoolExecutorTest(ExecutorTest): - def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_workers=1) - def tearDown(self): - self.executor.shutdown(wait=True) +class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest): + pass + class FutureTests(unittest.TestCase): def test_done_callback_with_result(self): - self.callback_result = None + callback_result = [None] def fn(callback_future): - self.callback_result = callback_future.result() + callback_result[0] = callback_future.result() f = Future() f.add_done_callback(fn) f.set_result(5) - self.assertEquals(5, self.callback_result) + self.assertEqual(5, callback_result[0]) def test_done_callback_with_exception(self): - self.callback_exception = None + callback_exception = [None] def fn(callback_future): - self.callback_exception = callback_future.exception() + callback_exception[0] = callback_future.exception() f = Future() f.add_done_callback(fn) f.set_exception(Exception('test')) - self.assertEquals(('test',), self.callback_exception.args) + self.assertEqual(('test',), callback_exception[0].args) def test_done_callback_with_cancel(self): - self.was_cancelled = None + was_cancelled = [None] def fn(callback_future): - self.was_cancelled = callback_future.cancelled() + was_cancelled[0] = callback_future.cancelled() f = Future() f.add_done_callback(fn) self.assertTrue(f.cancel()) - self.assertTrue(self.was_cancelled) + self.assertTrue(was_cancelled[0]) def test_done_callback_raises(self): - LOGGER.removeHandler(STDERR_HANDLER) - logging_stream = StringIO() - handler = logging.StreamHandler(logging_stream) - LOGGER.addHandler(handler) - try: - self.raising_was_called = False - self.fn_was_called = False + with captured_stderr() as stderr: + raising_was_called = [False] + fn_was_called = [False] def raising_fn(callback_future): - self.raising_was_called = True + raising_was_called[0] = True raise Exception('doh!') def fn(callback_future): - self.fn_was_called = True + fn_was_called[0] = True f = Future() f.add_done_callback(raising_fn) f.add_done_callback(fn) f.set_result(5) - self.assertTrue(self.raising_was_called) - self.assertTrue(self.fn_was_called) - self.assertTrue('Exception: doh!' in logging_stream.getvalue()) - finally: - LOGGER.removeHandler(handler) - LOGGER.addHandler(STDERR_HANDLER) + self.assertTrue(raising_was_called) + self.assertTrue(fn_was_called) + self.assertIn('Exception: doh!', stderr.getvalue()) def test_done_callback_already_successful(self): - self.callback_result = None + callback_result = [None] def fn(callback_future): - self.callback_result = callback_future.result() + callback_result[0] = callback_future.result() f = Future() f.set_result(5) f.add_done_callback(fn) - self.assertEquals(5, self.callback_result) + self.assertEqual(5, callback_result[0]) def test_done_callback_already_failed(self): - self.callback_exception = None + callback_exception = [None] def fn(callback_future): - self.callback_exception = callback_future.exception() + callback_exception[0] = callback_future.exception() f = Future() f.set_exception(Exception('test')) f.add_done_callback(fn) - self.assertEquals(('test',), self.callback_exception.args) + self.assertEqual(('test',), callback_exception[0].args) def test_done_callback_already_cancelled(self): - self.was_cancelled = None + was_cancelled = [None] def fn(callback_future): - self.was_cancelled = callback_future.cancelled() + was_cancelled[0] = callback_future.cancelled() f = Future() self.assertTrue(f.cancel()) f.add_done_callback(fn) - self.assertTrue(self.was_cancelled) + self.assertTrue(was_cancelled[0]) def test_repr(self): - self.assertTrue(re.match('', - repr(PENDING_FUTURE))) - self.assertTrue(re.match('', - repr(RUNNING_FUTURE))) - self.assertTrue(re.match('', - repr(CANCELLED_FUTURE))) - self.assertTrue(re.match('', - repr(CANCELLED_AND_NOTIFIED_FUTURE))) - self.assertTrue(re.match( - '', - repr(EXCEPTION_FUTURE))) - self.assertTrue(re.match( - '', - repr(SUCCESSFUL_FUTURE))) + self.assertRegexpMatches(repr(PENDING_FUTURE), + '') + self.assertRegexpMatches(repr(RUNNING_FUTURE), + '') + self.assertRegexpMatches(repr(CANCELLED_FUTURE), + '') + self.assertRegexpMatches(repr(CANCELLED_AND_NOTIFIED_FUTURE), + '') + self.assertRegexpMatches( + repr(EXCEPTION_FUTURE), + '') + self.assertRegexpMatches( + repr(SUCCESSFUL_FUTURE), + '') def test_cancel(self): f1 = create_future(state=PENDING) @@ -710,22 +597,22 @@ class FutureTests(unittest.TestCase): f6 = create_future(state=FINISHED, result=5) self.assertTrue(f1.cancel()) - self.assertEquals(f1._state, CANCELLED) + self.assertEqual(f1._state, CANCELLED) self.assertFalse(f2.cancel()) - self.assertEquals(f2._state, RUNNING) + self.assertEqual(f2._state, RUNNING) self.assertTrue(f3.cancel()) - self.assertEquals(f3._state, CANCELLED) + self.assertEqual(f3._state, CANCELLED) self.assertTrue(f4.cancel()) - self.assertEquals(f4._state, CANCELLED_AND_NOTIFIED) + self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED) self.assertFalse(f5.cancel()) - self.assertEquals(f5._state, FINISHED) + self.assertEqual(f5._state, FINISHED) self.assertFalse(f6.cancel()) - self.assertEquals(f6._state, FINISHED) + self.assertEqual(f6._state, FINISHED) def test_cancelled(self): self.assertFalse(PENDING_FUTURE.cancelled()) @@ -774,7 +661,7 @@ class FutureTests(unittest.TestCase): t = threading.Thread(target=notification) t.start() - self.assertEquals(f1.result(timeout=5), 42) + self.assertEqual(f1.result(timeout=5), 42) def test_result_with_cancel(self): # TODO(brian@sweetapp.com): This test is timing dependant. @@ -817,16 +704,20 @@ class FutureTests(unittest.TestCase): self.assertTrue(isinstance(f1.exception(timeout=5), IOError)) +@reap_threads def test_main(): - run_unittest(ProcessPoolExecutorTest, - ThreadPoolExecutorTest, - ProcessPoolWaitTests, - ThreadPoolWaitTests, - ProcessPoolAsCompletedTests, - ThreadPoolAsCompletedTests, - FutureTests, - ProcessPoolShutdownTest, - ThreadPoolShutdownTest) + try: + test_support.run_unittest(ProcessPoolExecutorTest, + ThreadPoolExecutorTest, + ProcessPoolWaitTests, + ThreadPoolWaitTests, + ProcessPoolAsCompletedTests, + ThreadPoolAsCompletedTests, + FutureTests, + ProcessPoolShutdownTest, + ThreadPoolShutdownTest) + finally: + test_support.reap_children() if __name__ == "__main__": test_main() diff --git a/tox.ini b/tox.ini index 83eda49..c1ff2f1 100644 --- a/tox.ini +++ b/tox.ini @@ -1,11 +1,8 @@ [tox] -envlist = py25,py26,py27,py31 +envlist = py26,py27,py31 [testenv] commands={envpython} test_futures.py [] -#[testenv:py24] -#deps=multiprocessing -# -#[testenv:py25] -#deps=multiprocessing +[testenv:py26] +deps=unittest2 -- cgit v1.2.1