From 3eb85180cce06b3cfc3cc6e75ba97bd0bbed23f3 Mon Sep 17 00:00:00 2001 From: Alex Gr?nholm Date: Sat, 18 Dec 2010 06:01:09 +0000 Subject: Moved the code in the futures package to concurrent.futures as per PEP 3148; unified the codebase to support both Python 2 and 3 in a single tree; added support to Python 2.5; added tox.ini for easy testing with multiple Python versions --- concurrent/__init__.py | 3 + concurrent/futures/__init__.py | 18 + concurrent/futures/_base.py | 575 +++++++++++++++++++++++++++++ concurrent/futures/_compat.py | 107 ++++++ concurrent/futures/process.py | 345 +++++++++++++++++ concurrent/futures/thread.py | 144 ++++++++ crawl.py | 74 ++++ docs/index.rst | 10 +- futures/__init__.py | 24 ++ futures/process.py | 1 + futures/thread.py | 1 + primes.py | 50 +++ python2/crawl.py | 68 ---- python2/futures/__init__.py | 18 - python2/futures/_base.py | 570 ---------------------------- python2/futures/process.py | 337 ----------------- python2/futures/thread.py | 136 ------- python2/primes.py | 47 --- python2/setup.py | 18 - python2/test_futures.py | 811 ---------------------------------------- python3/crawl.py | 68 ---- python3/futures/__init__.py | 18 - python3/futures/_base.py | 569 ---------------------------- python3/futures/process.py | 337 ----------------- python3/futures/thread.py | 136 ------- python3/primes.py | 47 --- python3/setup.py | 18 - python3/test_futures.py | 819 ---------------------------------------- setup.py | 32 ++ test_futures.py | 821 +++++++++++++++++++++++++++++++++++++++++ tox.ini | 11 + 31 files changed, 2211 insertions(+), 4022 deletions(-) create mode 100644 concurrent/__init__.py create mode 100644 concurrent/futures/__init__.py create mode 100644 concurrent/futures/_base.py create mode 100644 concurrent/futures/_compat.py create mode 100644 concurrent/futures/process.py create mode 100644 concurrent/futures/thread.py create mode 100644 crawl.py create mode 100644 futures/__init__.py create mode 100644 futures/process.py create mode 100644 futures/thread.py create mode 100644 primes.py delete mode 100644 python2/crawl.py delete mode 100644 python2/futures/__init__.py delete mode 100644 python2/futures/_base.py delete mode 100644 python2/futures/process.py delete mode 100644 python2/futures/thread.py delete mode 100644 python2/primes.py delete mode 100755 python2/setup.py delete mode 100644 python2/test_futures.py delete mode 100644 python3/crawl.py delete mode 100644 python3/futures/__init__.py delete mode 100644 python3/futures/_base.py delete mode 100644 python3/futures/process.py delete mode 100644 python3/futures/thread.py delete mode 100644 python3/primes.py delete mode 100755 python3/setup.py delete mode 100644 python3/test_futures.py create mode 100755 setup.py create mode 100644 test_futures.py create mode 100644 tox.ini diff --git a/concurrent/__init__.py b/concurrent/__init__.py new file mode 100644 index 0000000..b36383a --- /dev/null +++ b/concurrent/__init__.py @@ -0,0 +1,3 @@ +from pkgutil import extend_path + +__path__ = extend_path(__path__, __name__) diff --git a/concurrent/futures/__init__.py b/concurrent/futures/__init__.py new file mode 100644 index 0000000..b5231f8 --- /dev/null +++ b/concurrent/futures/__init__.py @@ -0,0 +1,18 @@ +# Copyright 2009 Brian Quinlan. All Rights Reserved. +# Licensed to PSF under a Contributor Agreement. + +"""Execute computations asynchronously using threads or processes.""" + +__author__ = 'Brian Quinlan (brian@sweetapp.com)' + +from concurrent.futures._base import (FIRST_COMPLETED, + FIRST_EXCEPTION, + ALL_COMPLETED, + CancelledError, + TimeoutError, + Future, + Executor, + wait, + as_completed) +from concurrent.futures.process import ProcessPoolExecutor +from concurrent.futures.thread import ThreadPoolExecutor diff --git a/concurrent/futures/_base.py b/concurrent/futures/_base.py new file mode 100644 index 0000000..1d90211 --- /dev/null +++ b/concurrent/futures/_base.py @@ -0,0 +1,575 @@ +# Copyright 2009 Brian Quinlan. All Rights Reserved. +# Licensed to PSF under a Contributor Agreement. + +from __future__ import with_statement +import functools +import logging +import threading +import time + +try: + from collections import namedtuple +except ImportError: + from concurrent.futures._compat import namedtuple + +__author__ = 'Brian Quinlan (brian@sweetapp.com)' + +FIRST_COMPLETED = 'FIRST_COMPLETED' +FIRST_EXCEPTION = 'FIRST_EXCEPTION' +ALL_COMPLETED = 'ALL_COMPLETED' +_AS_COMPLETED = '_AS_COMPLETED' + +# Possible future states (for internal use by the futures package). +PENDING = 'PENDING' +RUNNING = 'RUNNING' +# The future was cancelled by the user... +CANCELLED = 'CANCELLED' +# ...and _Waiter.add_cancelled() was called by a worker. +CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED' +FINISHED = 'FINISHED' + +_FUTURE_STATES = [ + PENDING, + RUNNING, + CANCELLED, + CANCELLED_AND_NOTIFIED, + FINISHED +] + +_STATE_TO_DESCRIPTION_MAP = { + PENDING: "pending", + RUNNING: "running", + CANCELLED: "cancelled", + CANCELLED_AND_NOTIFIED: "cancelled", + FINISHED: "finished" +} + +# Logger for internal use by the futures package. +LOGGER = logging.getLogger("concurrent.futures") +STDERR_HANDLER = logging.StreamHandler() +LOGGER.addHandler(STDERR_HANDLER) + +class Error(Exception): + """Base class for all future-related exceptions.""" + pass + +class CancelledError(Error): + """The Future was cancelled.""" + pass + +class TimeoutError(Error): + """The operation exceeded the given deadline.""" + pass + +class _Waiter(object): + """Provides the event that wait() and as_completed() block on.""" + def __init__(self): + self.event = threading.Event() + self.finished_futures = [] + + def add_result(self, future): + self.finished_futures.append(future) + + def add_exception(self, future): + self.finished_futures.append(future) + + def add_cancelled(self, future): + self.finished_futures.append(future) + +class _AsCompletedWaiter(_Waiter): + """Used by as_completed().""" + + def __init__(self): + super(_AsCompletedWaiter, self).__init__() + self.lock = threading.Lock() + + def add_result(self, future): + with self.lock: + super(_AsCompletedWaiter, self).add_result(future) + self.event.set() + + def add_exception(self, future): + with self.lock: + super(_AsCompletedWaiter, self).add_exception(future) + self.event.set() + + def add_cancelled(self, future): + with self.lock: + super(_AsCompletedWaiter, self).add_cancelled(future) + self.event.set() + +class _FirstCompletedWaiter(_Waiter): + """Used by wait(return_when=FIRST_COMPLETED).""" + + def add_result(self, future): + super(_FirstCompletedWaiter, self).add_result(future) + self.event.set() + + def add_exception(self, future): + super(_FirstCompletedWaiter, self).add_exception(future) + self.event.set() + + def add_cancelled(self, future): + super(_FirstCompletedWaiter, self).add_cancelled(future) + self.event.set() + +class _AllCompletedWaiter(_Waiter): + """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED).""" + + def __init__(self, num_pending_calls, stop_on_exception): + self.num_pending_calls = num_pending_calls + self.stop_on_exception = stop_on_exception + super(_AllCompletedWaiter, self).__init__() + + def _decrement_pending_calls(self): + self.num_pending_calls -= 1 + if not self.num_pending_calls: + self.event.set() + + def add_result(self, future): + super(_AllCompletedWaiter, self).add_result(future) + self._decrement_pending_calls() + + def add_exception(self, future): + super(_AllCompletedWaiter, self).add_exception(future) + if self.stop_on_exception: + self.event.set() + else: + self._decrement_pending_calls() + + def add_cancelled(self, future): + super(_AllCompletedWaiter, self).add_cancelled(future) + self._decrement_pending_calls() + +class _AcquireFutures(object): + """A context manager that does an ordered acquire of Future conditions.""" + + def __init__(self, futures): + self.futures = sorted(futures, key=id) + + def __enter__(self): + for future in self.futures: + future._condition.acquire() + + def __exit__(self, *args): + for future in self.futures: + future._condition.release() + +def _create_and_install_waiters(fs, return_when): + if return_when == _AS_COMPLETED: + waiter = _AsCompletedWaiter() + elif return_when == FIRST_COMPLETED: + waiter = _FirstCompletedWaiter() + else: + pending_count = sum( + f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs) + + if return_when == FIRST_EXCEPTION: + waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True) + elif return_when == ALL_COMPLETED: + waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False) + else: + raise ValueError("Invalid return condition: %r" % return_when) + + for f in fs: + f._waiters.append(waiter) + + return waiter + +def as_completed(fs, timeout=None): + """An iterator over the given futures that yields each as it completes. + + Args: + fs: The sequence of Futures (possibly created by different Executors) to + iterate over. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + + Returns: + An iterator that yields the given Futures as they complete (finished or + cancelled). + + Raises: + TimeoutError: If the entire result iterator could not be generated + before the given timeout. + """ + if timeout is not None: + end_time = timeout + time.time() + + with _AcquireFutures(fs): + finished = set( + f for f in fs + if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) + pending = set(fs) - finished + waiter = _create_and_install_waiters(fs, _AS_COMPLETED) + + try: + for future in finished: + yield future + + while pending: + if timeout is None: + wait_timeout = None + else: + wait_timeout = end_time - time.time() + if wait_timeout < 0: + raise TimeoutError( + '%d (of %d) futures unfinished' % ( + len(pending), len(fs))) + + waiter.event.wait(wait_timeout) + + with waiter.lock: + finished = waiter.finished_futures + waiter.finished_futures = [] + waiter.event.clear() + + for future in finished: + yield future + pending.remove(future) + + finally: + for f in fs: + f._waiters.remove(waiter) + +DoneAndNotDoneFutures = namedtuple( + 'DoneAndNotDoneFutures', 'done not_done') +def wait(fs, timeout=None, return_when=ALL_COMPLETED): + """Wait for the futures in the given sequence to complete. + + Args: + fs: The sequence of Futures (possibly created by different Executors) to + wait upon. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + return_when: Indicates when this function should return. The options + are: + + FIRST_COMPLETED - Return when any future finishes or is + cancelled. + FIRST_EXCEPTION - Return when any future finishes by raising an + exception. If no future raises an exception + then it is equivalent to ALL_COMPLETED. + ALL_COMPLETED - Return when all futures finish or are cancelled. + + Returns: + A named 2-tuple of sets. The first set, named 'done', contains the + futures that completed (is finished or cancelled) before the wait + completed. The second set, named 'not_done', contains uncompleted + futures. + """ + with _AcquireFutures(fs): + done = set(f for f in fs + if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) + not_done = set(fs) - done + + if (return_when == FIRST_COMPLETED) and done: + return DoneAndNotDoneFutures(done, not_done) + elif (return_when == FIRST_EXCEPTION) and done: + if any(f for f in done + if not f.cancelled() and f.exception() is not None): + return DoneAndNotDoneFutures(done, not_done) + + if len(done) == len(fs): + return DoneAndNotDoneFutures(done, not_done) + + waiter = _create_and_install_waiters(fs, return_when) + + waiter.event.wait(timeout) + for f in fs: + f._waiters.remove(waiter) + + done.update(waiter.finished_futures) + return DoneAndNotDoneFutures(done, set(fs) - done) + +class Future(object): + """Represents the result of an asynchronous computation.""" + + def __init__(self): + """Initializes the future. Should not be called by clients.""" + self._condition = threading.Condition() + self._state = PENDING + self._result = None + self._exception = None + self._waiters = [] + self._done_callbacks = [] + + def _invoke_callbacks(self): + for callback in self._done_callbacks: + try: + callback(self) + except Exception: + LOGGER.exception('exception calling callback for %r', self) + + def __repr__(self): + with self._condition: + if self._state == FINISHED: + if self._exception: + return '' % ( + hex(id(self)), + _STATE_TO_DESCRIPTION_MAP[self._state], + self._exception.__class__.__name__) + else: + return '' % ( + hex(id(self)), + _STATE_TO_DESCRIPTION_MAP[self._state], + self._result.__class__.__name__) + return '' % ( + hex(id(self)), + _STATE_TO_DESCRIPTION_MAP[self._state]) + + def cancel(self): + """Cancel the future if possible. + + Returns True if the future was cancelled, False otherwise. A future + cannot be cancelled if it is running or has already completed. + """ + with self._condition: + if self._state in [RUNNING, FINISHED]: + return False + + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + return True + + self._state = CANCELLED + self._condition.notify_all() + + self._invoke_callbacks() + return True + + def cancelled(self): + """Return True if the future has cancelled.""" + with self._condition: + return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED] + + def running(self): + """Return True if the future is currently executing.""" + with self._condition: + return self._state == RUNNING + + def done(self): + """Return True of the future was cancelled or finished executing.""" + with self._condition: + return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] + + def __get_result(self): + if self._exception: + raise self._exception + else: + return self._result + + def add_done_callback(self, fn): + """Attaches a callable that will be called when the future finishes. + + Args: + fn: A callable that will be called with this future as its only + argument when the future completes or is cancelled. The callable + will always be called by a thread in the same process in which + it was added. If the future has already completed or been + cancelled then the callable will be called immediately. These + callables are called in the order that they were added. + """ + with self._condition: + if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]: + self._done_callbacks.append(fn) + return + fn(self) + + def result(self, timeout=None): + """Return the result of the call that the future represents. + + Args: + timeout: The number of seconds to wait for the result if the future + isn't done. If None, then there is no limit on the wait time. + + Returns: + The result of the call that the future represents. + + Raises: + CancelledError: If the future was cancelled. + TimeoutError: If the future didn't finish executing before the given + timeout. + Exception: If the call raised then that exception will be raised. + """ + with self._condition: + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() + elif self._state == FINISHED: + return self.__get_result() + + self._condition.wait(timeout) + + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() + elif self._state == FINISHED: + return self.__get_result() + else: + raise TimeoutError() + + def exception(self, timeout=None): + """Return the exception raised by the call that the future represents. + + Args: + timeout: The number of seconds to wait for the exception if the + future isn't done. If None, then there is no limit on the wait + time. + + Returns: + The exception raised by the call that the future represents or None + if the call completed without raising. + + Raises: + CancelledError: If the future was cancelled. + TimeoutError: If the future didn't finish executing before the given + timeout. + """ + + with self._condition: + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() + elif self._state == FINISHED: + return self._exception + + self._condition.wait(timeout) + + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() + elif self._state == FINISHED: + return self._exception + else: + raise TimeoutError() + + # The following methods should only be used by Executors and in tests. + def set_running_or_notify_cancel(self): + """Mark the future as running or process any cancel notifications. + + Should only be used by Executor implementations and unit tests. + + If the future has been cancelled (cancel() was called and returned + True) then any threads waiting on the future completing (though calls + to as_completed() or wait()) are notified and False is returned. + + If the future was not cancelled then it is put in the running state + (future calls to running() will return True) and True is returned. + + This method should be called by Executor implementations before + executing the work associated with this future. If this method returns + False then the work should not be executed. + + Returns: + False if the Future was cancelled, True otherwise. + + Raises: + RuntimeError: if this method was already called or if set_result() + or set_exception() was called. + """ + with self._condition: + if self._state == CANCELLED: + self._state = CANCELLED_AND_NOTIFIED + for waiter in self._waiters: + waiter.add_cancelled(self) + # self._condition.notify_all() is not necessary because + # self.cancel() triggers a notification. + return False + elif self._state == PENDING: + self._state = RUNNING + return True + else: + LOGGER.critical('Future %s in unexpected state: %s', + id(self.future), + self.future._state) + raise RuntimeError('Future in unexpected state') + + def set_result(self, result): + """Sets the return value of work associated with the future. + + Should only be used by Executor implementations and unit tests. + """ + with self._condition: + self._result = result + self._state = FINISHED + for waiter in self._waiters: + waiter.add_result(self) + self._condition.notify_all() + self._invoke_callbacks() + + def set_exception(self, exception): + """Sets the result of the future as being the given exception. + + Should only be used by Executor implementations and unit tests. + """ + with self._condition: + self._exception = exception + self._state = FINISHED + for waiter in self._waiters: + waiter.add_exception(self) + self._condition.notify_all() + self._invoke_callbacks() + +class Executor(object): + """This is an abstract base class for concrete asynchronous executors.""" + + def submit(self, fn, *args, **kwargs): + """Submits a callable to be executed with the given arguments. + + Schedules the callable to be executed as fn(*args, **kwargs) and returns + a Future instance representing the execution of the callable. + + Returns: + A Future representing the given call. + """ + raise NotImplementedError() + + def map(self, fn, *iterables, **kwargs): + """Returns a iterator equivalent to map(fn, iter). + + Args: + fn: A callable that will take take as many arguments as there are + passed iterables. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + + Returns: + An iterator equivalent to: map(func, *iterables) but the calls may + be evaluated out-of-order. + + Raises: + TimeoutError: If the entire result iterator could not be generated + before the given timeout. + Exception: If fn(*args) raises for any values. + """ + timeout = kwargs.get('timeout') + if timeout is not None: + end_time = timeout + time.time() + + fs = [self.submit(fn, *args) for args in zip(*iterables)] + + try: + for future in fs: + if timeout is None: + yield future.result() + else: + yield future.result(end_time - time.time()) + finally: + for future in fs: + future.cancel() + + def shutdown(self, wait=True): + """Clean-up the resources associated with the Executor. + + It is safe to call this method several times. Otherwise, no other + methods can be called after this one. + + Args: + wait: If True then shutdown will not return until all running + futures have finished executing and the resources used by the + executor have been reclaimed. + """ + pass + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.shutdown(wait=True) + return False diff --git a/concurrent/futures/_compat.py b/concurrent/futures/_compat.py new file mode 100644 index 0000000..03175b7 --- /dev/null +++ b/concurrent/futures/_compat.py @@ -0,0 +1,107 @@ +import sys + +#if sys.version_info >= (2, 6): +# from collections import namedtuple +#else: +## Copied from Python 2.6 standard library +from keyword import iskeyword as _iskeyword +from operator import itemgetter as _itemgetter +_sys = sys + +def namedtuple(typename, field_names, verbose=False): + """Returns a new subclass of tuple with named fields. + + >>> Point = namedtuple('Point', 'x y') + >>> Point.__doc__ # docstring for the new class + 'Point(x, y)' + >>> p = Point(11, y=22) # instantiate with positional args or keywords + >>> p[0] + p[1] # indexable like a plain tuple + 33 + >>> x, y = p # unpack like a regular tuple + >>> x, y + (11, 22) + >>> p.x + p.y # fields also accessable by name + 33 + >>> d = p._asdict() # convert to a dictionary + >>> d['x'] + 11 + >>> Point(**d) # convert from a dictionary + Point(x=11, y=22) + >>> p._replace(x=100) # _replace() is like str.replace() but targets named fields + Point(x=100, y=22) + + """ + + # Parse and validate the field names. Validation serves two purposes, + # generating informative error messages and preventing template injection attacks. + if isinstance(field_names, basestring): + field_names = field_names.replace(',', ' ').split() # names separated by whitespace and/or commas + field_names = tuple(map(str, field_names)) + for name in (typename,) + field_names: + if not all(c.isalnum() or c=='_' for c in name): + raise ValueError('Type names and field names can only contain alphanumeric characters and underscores: %r' % name) + if _iskeyword(name): + raise ValueError('Type names and field names cannot be a keyword: %r' % name) + if name[0].isdigit(): + raise ValueError('Type names and field names cannot start with a number: %r' % name) + seen_names = set() + for name in field_names: + if name.startswith('_'): + raise ValueError('Field names cannot start with an underscore: %r' % name) + if name in seen_names: + raise ValueError('Encountered duplicate field name: %r' % name) + seen_names.add(name) + + # Create and fill-in the class template + numfields = len(field_names) + argtxt = repr(field_names).replace("'", "")[1:-1] # tuple repr without parens or quotes + reprtxt = ', '.join('%s=%%r' % name for name in field_names) + dicttxt = ', '.join('%r: t[%d]' % (name, pos) for pos, name in enumerate(field_names)) + template = '''class %(typename)s(tuple): + '%(typename)s(%(argtxt)s)' \n + __slots__ = () \n + _fields = %(field_names)r \n + def __new__(_cls, %(argtxt)s): + return _tuple.__new__(_cls, (%(argtxt)s)) \n + @classmethod + def _make(cls, iterable, new=tuple.__new__, len=len): + 'Make a new %(typename)s object from a sequence or iterable' + result = new(cls, iterable) + if len(result) != %(numfields)d: + raise TypeError('Expected %(numfields)d arguments, got %%d' %% len(result)) + return result \n + def __repr__(self): + return '%(typename)s(%(reprtxt)s)' %% self \n + def _asdict(t): + 'Return a new dict which maps field names to their values' + return {%(dicttxt)s} \n + def _replace(_self, **kwds): + 'Return a new %(typename)s object replacing specified fields with new values' + result = _self._make(map(kwds.pop, %(field_names)r, _self)) + if kwds: + raise ValueError('Got unexpected field names: %%r' %% kwds.keys()) + return result \n + def __getnewargs__(self): + return tuple(self) \n\n''' % locals() + for i, name in enumerate(field_names): + template += ' %s = _property(_itemgetter(%d))\n' % (name, i) + if verbose: + print template + + # Execute the template string in a temporary namespace and + # support tracing utilities by setting a value for frame.f_globals['__name__'] + namespace = dict(_itemgetter=_itemgetter, __name__='namedtuple_%s' % typename, + _property=property, _tuple=tuple) + try: + exec template in namespace + except SyntaxError, e: + raise SyntaxError(e.message + ':\n' + template) + result = namespace[typename] + + # For pickling to work, the __module__ variable needs to be set to the frame + # where the named tuple is created. Bypass this step in enviroments where + # sys._getframe is not defined (Jython for example). + if hasattr(_sys, '_getframe'): + result.__module__ = _sys._getframe(1).f_globals.get('__name__', '__main__') + + return result diff --git a/concurrent/futures/process.py b/concurrent/futures/process.py new file mode 100644 index 0000000..87dc789 --- /dev/null +++ b/concurrent/futures/process.py @@ -0,0 +1,345 @@ +# Copyright 2009 Brian Quinlan. All Rights Reserved. +# Licensed to PSF under a Contributor Agreement. + +"""Implements ProcessPoolExecutor. + +The follow diagram and text describe the data-flow through the system: + +|======================= In-process =====================|== Out-of-process ==| + ++----------+ +----------+ +--------+ +-----------+ +---------+ +| | => | Work Ids | => | | => | Call Q | => | | +| | +----------+ | | +-----------+ | | +| | | ... | | | | ... | | | +| | | 6 | | | | 5, call() | | | +| | | 7 | | | | ... | | | +| Process | | ... | | Local | +-----------+ | Process | +| Pool | +----------+ | Worker | | #1..n | +| Executor | | Thread | | | +| | +----------- + | | +-----------+ | | +| | <=> | Work Items | <=> | | <= | Result Q | <= | | +| | +------------+ | | +-----------+ | | +| | | 6: call() | | | | ... | | | +| | | future | | | | 4, result | | | +| | | ... | | | | 3, except | | | ++----------+ +------------+ +--------+ +-----------+ +---------+ + +Executor.submit() called: +- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict +- adds the id of the _WorkItem to the "Work Ids" queue + +Local worker thread: +- reads work ids from the "Work Ids" queue and looks up the corresponding + WorkItem from the "Work Items" dict: if the work item has been cancelled then + it is simply removed from the dict, otherwise it is repackaged as a + _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q" + until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because + calls placed in the "Call Q" can no longer be cancelled with Future.cancel(). +- reads _ResultItems from "Result Q", updates the future stored in the + "Work Items" dict and deletes the dict entry + +Process #1..n: +- reads _CallItems from "Call Q", executes the calls, and puts the resulting + _ResultItems in "Request Q" +""" + +from __future__ import with_statement +import atexit +import multiprocessing +import threading +import weakref +import sys + +from concurrent.futures import _base + +try: + import queue +except ImportError: + import Queue as queue + +__author__ = 'Brian Quinlan (brian@sweetapp.com)' + +# Workers are created as daemon threads and processes. This is done to allow the +# interpreter to exit when there are still idle processes in a +# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However, +# allowing workers to die with the interpreter has two undesirable properties: +# - The workers would still be running during interpretor shutdown, +# meaning that they would fail in unpredictable ways. +# - The workers could be killed while evaluating a work item, which could +# be bad if the callable being evaluated has external side-effects e.g. +# writing to a file. +# +# To work around this problem, an exit handler is installed which tells the +# workers to exit when their work queues are empty and then waits until the +# threads/processes finish. + +_thread_references = set() +_shutdown = False + +def _python_exit(): + global _shutdown + _shutdown = True + for thread_reference in _thread_references: + thread = thread_reference() + if thread is not None: + thread.join() + +def _remove_dead_thread_references(): + """Remove inactive threads from _thread_references. + + Should be called periodically to prevent memory leaks in scenarios such as: + >>> while True: + >>> ... t = ThreadPoolExecutor(max_workers=5) + >>> ... t.map(int, ['1', '2', '3', '4', '5']) + """ + for thread_reference in set(_thread_references): + if thread_reference() is None: + _thread_references.discard(thread_reference) + +# Controls how many more calls than processes will be queued in the call queue. +# A smaller number will mean that processes spend more time idle waiting for +# work while a larger number will make Future.cancel() succeed less frequently +# (Futures in the call queue cannot be cancelled). +EXTRA_QUEUED_CALLS = 1 + +class _WorkItem(object): + def __init__(self, future, fn, args, kwargs): + self.future = future + self.fn = fn + self.args = args + self.kwargs = kwargs + +class _ResultItem(object): + def __init__(self, work_id, exception=None, result=None): + self.work_id = work_id + self.exception = exception + self.result = result + +class _CallItem(object): + def __init__(self, work_id, fn, args, kwargs): + self.work_id = work_id + self.fn = fn + self.args = args + self.kwargs = kwargs + +def _process_worker(call_queue, result_queue, shutdown): + """Evaluates calls from call_queue and places the results in result_queue. + + This worker is run in a seperate process. + + Args: + call_queue: A multiprocessing.Queue of _CallItems that will be read and + evaluated by the worker. + result_queue: A multiprocessing.Queue of _ResultItems that will written + to by the worker. + shutdown: A multiprocessing.Event that will be set as a signal to the + worker that it should exit when call_queue is empty. + """ + while True: + try: + call_item = call_queue.get(block=True, timeout=0.1) + except queue.Empty: + if shutdown.is_set(): + return + else: + try: + r = call_item.fn(*call_item.args, **call_item.kwargs) + except BaseException: + e = sys.exc_info()[1] + result_queue.put(_ResultItem(call_item.work_id, + exception=e)) + else: + result_queue.put(_ResultItem(call_item.work_id, + result=r)) + +def _add_call_item_to_queue(pending_work_items, + work_ids, + call_queue): + """Fills call_queue with _WorkItems from pending_work_items. + + This function never blocks. + + Args: + pending_work_items: A dict mapping work ids to _WorkItems e.g. + {5: <_WorkItem...>, 6: <_WorkItem...>, ...} + work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids + are consumed and the corresponding _WorkItems from + pending_work_items are transformed into _CallItems and put in + call_queue. + call_queue: A multiprocessing.Queue that will be filled with _CallItems + derived from _WorkItems. + """ + while True: + if call_queue.full(): + return + try: + work_id = work_ids.get(block=False) + except queue.Empty: + return + else: + work_item = pending_work_items[work_id] + + if work_item.future.set_running_or_notify_cancel(): + call_queue.put(_CallItem(work_id, + work_item.fn, + work_item.args, + work_item.kwargs), + block=True) + else: + del pending_work_items[work_id] + continue + +def _queue_manangement_worker(executor_reference, + processes, + pending_work_items, + work_ids_queue, + call_queue, + result_queue, + shutdown_process_event): + """Manages the communication between this process and the worker processes. + + This function is run in a local thread. + + Args: + executor_reference: A weakref.ref to the ProcessPoolExecutor that owns + this thread. Used to determine if the ProcessPoolExecutor has been + garbage collected and that this function can exit. + process: A list of the multiprocessing.Process instances used as + workers. + pending_work_items: A dict mapping work ids to _WorkItems e.g. + {5: <_WorkItem...>, 6: <_WorkItem...>, ...} + work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]). + call_queue: A multiprocessing.Queue that will be filled with _CallItems + derived from _WorkItems for processing by the process workers. + result_queue: A multiprocessing.Queue of _ResultItems generated by the + process workers. + shutdown_process_event: A multiprocessing.Event used to signal the + process workers that they should exit when their work queue is + empty. + """ + while True: + _add_call_item_to_queue(pending_work_items, + work_ids_queue, + call_queue) + + try: + result_item = result_queue.get(block=True, timeout=0.1) + except queue.Empty: + executor = executor_reference() + # No more work items can be added if: + # - The interpreter is shutting down OR + # - The executor that owns this worker has been collected OR + # - The executor that owns this worker has been shutdown. + if _shutdown or executor is None or executor._shutdown_thread: + # Since no new work items can be added, it is safe to shutdown + # this thread if there are no pending work items. + if not pending_work_items: + shutdown_process_event.set() + + # If .join() is not called on the created processes then + # some multiprocessing.Queue methods may deadlock on Mac OS + # X. + for p in processes: + p.join() + return + del executor + else: + work_item = pending_work_items[result_item.work_id] + del pending_work_items[result_item.work_id] + + if result_item.exception: + work_item.future.set_exception(result_item.exception) + else: + work_item.future.set_result(result_item.result) + +class ProcessPoolExecutor(_base.Executor): + def __init__(self, max_workers=None): + """Initializes a new ProcessPoolExecutor instance. + + Args: + max_workers: The maximum number of processes that can be used to + execute the given calls. If None or not given then as many + worker processes will be created as the machine has processors. + """ + _remove_dead_thread_references() + + if max_workers is None: + self._max_workers = multiprocessing.cpu_count() + else: + self._max_workers = max_workers + + # Make the call queue slightly larger than the number of processes to + # prevent the worker processes from idling. But don't make it too big + # because futures in the call queue cannot be cancelled. + self._call_queue = multiprocessing.Queue(self._max_workers + + EXTRA_QUEUED_CALLS) + self._result_queue = multiprocessing.Queue() + self._work_ids = queue.Queue() + self._queue_management_thread = None + self._processes = set() + + # Shutdown is a two-step process. + self._shutdown_thread = False + self._shutdown_process_event = multiprocessing.Event() + self._shutdown_lock = threading.Lock() + self._queue_count = 0 + self._pending_work_items = {} + + def _start_queue_management_thread(self): + if self._queue_management_thread is None: + self._queue_management_thread = threading.Thread( + target=_queue_manangement_worker, + args=(weakref.ref(self), + self._processes, + self._pending_work_items, + self._work_ids, + self._call_queue, + self._result_queue, + self._shutdown_process_event)) + self._queue_management_thread.daemon = True + self._queue_management_thread.start() + _thread_references.add(weakref.ref(self._queue_management_thread)) + + def _adjust_process_count(self): + for _ in range(len(self._processes), self._max_workers): + p = multiprocessing.Process( + target=_process_worker, + args=(self._call_queue, + self._result_queue, + self._shutdown_process_event)) + p.start() + self._processes.add(p) + + def submit(self, fn, *args, **kwargs): + with self._shutdown_lock: + if self._shutdown_thread: + raise RuntimeError('cannot schedule new futures after shutdown') + + f = _base.Future() + w = _WorkItem(f, fn, args, kwargs) + + self._pending_work_items[self._queue_count] = w + self._work_ids.put(self._queue_count) + self._queue_count += 1 + + self._start_queue_management_thread() + self._adjust_process_count() + return f + submit.__doc__ = _base.Executor.submit.__doc__ + + def shutdown(self, wait=True): + with self._shutdown_lock: + self._shutdown_thread = True + if wait: + if self._queue_management_thread: + self._queue_management_thread.join() + # To reduce the risk of openning too many files, remove references to + # objects that use file descriptors. + self._queue_management_thread = None + self._call_queue = None + self._result_queue = None + self._shutdown_process_event = None + self._processes = None + shutdown.__doc__ = _base.Executor.shutdown.__doc__ + +atexit.register(_python_exit) diff --git a/concurrent/futures/thread.py b/concurrent/futures/thread.py new file mode 100644 index 0000000..ce0dda0 --- /dev/null +++ b/concurrent/futures/thread.py @@ -0,0 +1,144 @@ +# Copyright 2009 Brian Quinlan. All Rights Reserved. +# Licensed to PSF under a Contributor Agreement. + +"""Implements ThreadPoolExecutor.""" + +from __future__ import with_statement +import atexit +import threading +import weakref +import sys + +from concurrent.futures import _base + +try: + import queue +except ImportError: + import Queue as queue + +__author__ = 'Brian Quinlan (brian@sweetapp.com)' + +# Workers are created as daemon threads. This is done to allow the interpreter +# to exit when there are still idle threads in a ThreadPoolExecutor's thread +# pool (i.e. shutdown() was not called). However, allowing workers to die with +# the interpreter has two undesirable properties: +# - The workers would still be running during interpretor shutdown, +# meaning that they would fail in unpredictable ways. +# - The workers could be killed while evaluating a work item, which could +# be bad if the callable being evaluated has external side-effects e.g. +# writing to a file. +# +# To work around this problem, an exit handler is installed which tells the +# workers to exit when their work queues are empty and then waits until the +# threads finish. + +_thread_references = set() +_shutdown = False + +def _python_exit(): + global _shutdown + _shutdown = True + for thread_reference in _thread_references: + thread = thread_reference() + if thread is not None: + thread.join() + +def _remove_dead_thread_references(): + """Remove inactive threads from _thread_references. + + Should be called periodically to prevent memory leaks in scenarios such as: + >>> while True: + ... t = ThreadPoolExecutor(max_workers=5) + ... t.map(int, ['1', '2', '3', '4', '5']) + """ + for thread_reference in set(_thread_references): + if thread_reference() is None: + _thread_references.discard(thread_reference) + +atexit.register(_python_exit) + +class _WorkItem(object): + def __init__(self, future, fn, args, kwargs): + self.future = future + self.fn = fn + self.args = args + self.kwargs = kwargs + + def run(self): + if not self.future.set_running_or_notify_cancel(): + return + + try: + result = self.fn(*self.args, **self.kwargs) + except BaseException: + e = sys.exc_info()[1] + self.future.set_exception(e) + else: + self.future.set_result(result) + +def _worker(executor_reference, work_queue): + try: + while True: + try: + work_item = work_queue.get(block=True, timeout=0.1) + except queue.Empty: + executor = executor_reference() + # Exit if: + # - The interpreter is shutting down OR + # - The executor that owns the worker has been collected OR + # - The executor that owns the worker has been shutdown. + if _shutdown or executor is None or executor._shutdown: + return + del executor + else: + work_item.run() + except BaseException: + _base.LOGGER.critical('Exception in worker', exc_info=True) + +class ThreadPoolExecutor(_base.Executor): + def __init__(self, max_workers): + """Initializes a new ThreadPoolExecutor instance. + + Args: + max_workers: The maximum number of threads that can be used to + execute the given calls. + """ + _remove_dead_thread_references() + + self._max_workers = max_workers + self._work_queue = queue.Queue() + self._threads = set() + self._shutdown = False + self._shutdown_lock = threading.Lock() + + def submit(self, fn, *args, **kwargs): + with self._shutdown_lock: + if self._shutdown: + raise RuntimeError('cannot schedule new futures after shutdown') + + f = _base.Future() + w = _WorkItem(f, fn, args, kwargs) + + self._work_queue.put(w) + self._adjust_thread_count() + return f + submit.__doc__ = _base.Executor.submit.__doc__ + + def _adjust_thread_count(self): + # TODO(bquinlan): Should avoid creating new threads if there are more + # idle threads than items in the work queue. + if len(self._threads) < self._max_workers: + t = threading.Thread(target=_worker, + args=(weakref.ref(self), self._work_queue)) + t.daemon = True + t.start() + self._threads.add(t) + _thread_references.add(weakref.ref(t)) + + def shutdown(self, wait=True): + with self._shutdown_lock: + self._shutdown = True + if wait: + for t in self._threads: + t.join() + shutdown.__doc__ = _base.Executor.shutdown.__doc__ diff --git a/crawl.py b/crawl.py new file mode 100644 index 0000000..86e0af7 --- /dev/null +++ b/crawl.py @@ -0,0 +1,74 @@ +"""Compare the speed of downloading URLs sequentially vs. using futures.""" + +import functools +import time +import timeit +import sys + +try: + from urllib2 import urlopen +except ImportError: + from urllib.request import urlopen + +from concurrent.futures import (as_completed, ThreadPoolExecutor, + ProcessPoolExecutor) + +URLS = ['http://www.google.com/', + 'http://www.apple.com/', + 'http://www.ibm.com', + 'http://www.thisurlprobablydoesnotexist.com', + 'http://www.slashdot.org/', + 'http://www.python.org/', + 'http://www.bing.com/', + 'http://www.facebook.com/', + 'http://www.yahoo.com/', + 'http://www.youtube.com/', + 'http://www.blogger.com/'] + +def load_url(url, timeout): + kwargs = {'timeout': timeout} if sys.version_info >= (2, 6) else {} + return urlopen(url, **kwargs).read() + +def download_urls_sequential(urls, timeout=60): + url_to_content = {} + for url in urls: + try: + url_to_content[url] = load_url(url, timeout=timeout) + except: + pass + return url_to_content + +def download_urls_with_executor(urls, executor, timeout=60): + try: + url_to_content = {} + future_to_url = dict((executor.submit(load_url, url, timeout), url) + for url in urls) + + for future in as_completed(future_to_url): + try: + url_to_content[future_to_url[future]] = future.result() + except: + pass + return url_to_content + finally: + executor.shutdown() + +def main(): + for name, fn in [('sequential', + functools.partial(download_urls_sequential, URLS)), + ('processes', + functools.partial(download_urls_with_executor, + URLS, + ProcessPoolExecutor(10))), + ('threads', + functools.partial(download_urls_with_executor, + URLS, + ThreadPoolExecutor(10)))]: + sys.stdout.write('%s: ' % name.ljust(12)) + start = time.time() + url_map = fn() + sys.stdout.write('%.2f seconds (%d of %d downloaded)\n' % + (time.time() - start, len(url_map), len(URLS))) + +if __name__ == '__main__': + main() diff --git a/docs/index.rst b/docs/index.rst index ac006a8..a5d084c 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -1,11 +1,11 @@ -:mod:`futures` --- Asynchronous computation +:mod:`concurrent.futures` --- Asynchronous computation =========================================== -.. module:: futures +.. module:: concurrent.futures :synopsis: Execute computations asynchronously using threads or processes. -The :mod:`futures` module provides a high-level interface for asynchronously -executing callables. +The :mod:`concurrent.futures` module provides a high-level interface for +asynchronously executing callables. The asynchronous execution can be be performed by threads using :class:`ThreadPoolExecutor` or seperate processes using @@ -120,7 +120,7 @@ ThreadPoolExecutor Example ^^^^^^^^^^^^^^^^^^^^^^^^^^ :: - import futures + from concurrent import futures import urllib.request URLS = ['http://www.foxnews.com/', diff --git a/futures/__init__.py b/futures/__init__.py new file mode 100644 index 0000000..8f8b234 --- /dev/null +++ b/futures/__init__.py @@ -0,0 +1,24 @@ +# Copyright 2009 Brian Quinlan. All Rights Reserved. +# Licensed to PSF under a Contributor Agreement. + +"""Execute computations asynchronously using threads or processes.""" + +import warnings + +from concurrent.futures import (FIRST_COMPLETED, + FIRST_EXCEPTION, + ALL_COMPLETED, + CancelledError, + TimeoutError, + Future, + Executor, + wait, + as_completed, + ProcessPoolExecutor, + ThreadPoolExecutor) + +__author__ = 'Brian Quinlan (brian@sweetapp.com)' + +warnings.warn('The futures package has been deprecated. ' + 'Use the concurrent.futures package instead.', + DeprecationWarning) diff --git a/futures/process.py b/futures/process.py new file mode 100644 index 0000000..e9d37b1 --- /dev/null +++ b/futures/process.py @@ -0,0 +1 @@ +from concurrent.futures import ProcessPoolExecutor diff --git a/futures/thread.py b/futures/thread.py new file mode 100644 index 0000000..f6bd05d --- /dev/null +++ b/futures/thread.py @@ -0,0 +1 @@ +from concurrent.futures import ThreadPoolExecutor diff --git a/primes.py b/primes.py new file mode 100644 index 0000000..0da2b3e --- /dev/null +++ b/primes.py @@ -0,0 +1,50 @@ +from __future__ import with_statement +import math +import time +import sys + +from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor + +PRIMES = [ + 112272535095293, + 112582705942171, + 112272535095293, + 115280095190773, + 115797848077099, + 117450548693743, + 993960000099397] + +def is_prime(n): + if n % 2 == 0: + return False + + sqrt_n = int(math.floor(math.sqrt(n))) + for i in range(3, sqrt_n + 1, 2): + if n % i == 0: + return False + return True + +def sequential(): + return list(map(is_prime, PRIMES)) + +def with_process_pool_executor(): + with ProcessPoolExecutor(10) as executor: + return list(executor.map(is_prime, PRIMES)) + +def with_thread_pool_executor(): + with ThreadPoolExecutor(10) as executor: + return list(executor.map(is_prime, PRIMES)) + +def main(): + for name, fn in [('sequential', sequential), + ('processes', with_process_pool_executor), + ('threads', with_thread_pool_executor)]: + sys.stdout.write('%s: ' % name.ljust(12)) + start = time.time() + if fn() != [True] * len(PRIMES): + sys.stdout.write('failed\n') + else: + sys.stdout.write('%.2f seconds\n' % (time.time() - start)) + +if __name__ == '__main__': + main() diff --git a/python2/crawl.py b/python2/crawl.py deleted file mode 100644 index f1d29ca..0000000 --- a/python2/crawl.py +++ /dev/null @@ -1,68 +0,0 @@ -"""Compare the speed of downloading URLs sequentially vs. using futures.""" - -import datetime -import functools -import futures.thread -import time -import timeit -import urllib2 - -URLS = ['http://www.google.com/', - 'http://www.apple.com/', - 'http://www.ibm.com', - 'http://www.thisurlprobablydoesnotexist.com', - 'http://www.slashdot.org/', - 'http://www.python.org/', - 'http://www.bing.com/', - 'http://www.facebook.com/', - 'http://www.yahoo.com/', - 'http://www.youtube.com/', - 'http://www.blogger.com/'] - -def load_url(url, timeout): - return urllib2.urlopen(url, timeout=timeout).read() - -def download_urls_sequential(urls, timeout=60): - url_to_content = {} - for url in urls: - try: - url_to_content[url] = load_url(url, timeout=timeout) - except: - pass - return url_to_content - -def download_urls_with_executor(urls, executor, timeout=60): - try: - url_to_content = {} - future_to_url = dict((executor.submit(load_url, url, timeout), url) - for url in urls) - - for future in futures.as_completed(future_to_url): - try: - url_to_content[future_to_url[future]] = future.result() - except: - pass - return url_to_content - finally: - executor.shutdown() - -def main(): - for name, fn in [('sequential', - functools.partial(download_urls_sequential, URLS)), - ('processes', - functools.partial(download_urls_with_executor, - URLS, - futures.ProcessPoolExecutor(10))), - ('threads', - functools.partial(download_urls_with_executor, - URLS, - futures.ThreadPoolExecutor(10)))]: - print name.ljust(12), - start = time.time() - url_map = fn() - print '%.2f seconds (%d of %d downloaded)' % (time.time() - start, - len(url_map), - len(URLS)) - -if __name__ == '__main__': - main() diff --git a/python2/futures/__init__.py b/python2/futures/__init__.py deleted file mode 100644 index 8331d53..0000000 --- a/python2/futures/__init__.py +++ /dev/null @@ -1,18 +0,0 @@ -# Copyright 2009 Brian Quinlan. All Rights Reserved. -# Licensed to PSF under a Contributor Agreement. - -"""Execute computations asynchronously using threads or processes.""" - -__author__ = 'Brian Quinlan (brian@sweetapp.com)' - -from futures._base import (FIRST_COMPLETED, - FIRST_EXCEPTION, - ALL_COMPLETED, - CancelledError, - TimeoutError, - Future, - Executor, - wait, - as_completed) -from futures.process import ProcessPoolExecutor -from futures.thread import ThreadPoolExecutor diff --git a/python2/futures/_base.py b/python2/futures/_base.py deleted file mode 100644 index 58a2c5e..0000000 --- a/python2/futures/_base.py +++ /dev/null @@ -1,570 +0,0 @@ -# Copyright 2009 Brian Quinlan. All Rights Reserved. -# Licensed to PSF under a Contributor Agreement. - -__author__ = 'Brian Quinlan (brian@sweetapp.com)' - -import collections -import functools -import logging -import threading -import time - -FIRST_COMPLETED = 'FIRST_COMPLETED' -FIRST_EXCEPTION = 'FIRST_EXCEPTION' -ALL_COMPLETED = 'ALL_COMPLETED' -_AS_COMPLETED = '_AS_COMPLETED' - -# Possible future states (for internal use by the futures package). -PENDING = 'PENDING' -RUNNING = 'RUNNING' -# The future was cancelled by the user... -CANCELLED = 'CANCELLED' -# ...and _Waiter.add_cancelled() was called by a worker. -CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED' -FINISHED = 'FINISHED' - -_FUTURE_STATES = [ - PENDING, - RUNNING, - CANCELLED, - CANCELLED_AND_NOTIFIED, - FINISHED -] - -_STATE_TO_DESCRIPTION_MAP = { - PENDING: "pending", - RUNNING: "running", - CANCELLED: "cancelled", - CANCELLED_AND_NOTIFIED: "cancelled", - FINISHED: "finished" -} - -# Logger for internal use by the futures package. -LOGGER = logging.getLogger("futures") -STDERR_HANDLER = logging.StreamHandler() -LOGGER.addHandler(STDERR_HANDLER) - -class Error(Exception): - """Base class for all future-related exceptions.""" - pass - -class CancelledError(Error): - """The Future was cancelled.""" - pass - -class TimeoutError(Error): - """The operation exceeded the given deadline.""" - pass - -class _Waiter(object): - """Provides the event that wait() and as_completed() block on.""" - def __init__(self): - self.event = threading.Event() - self.finished_futures = [] - - def add_result(self, future): - self.finished_futures.append(future) - - def add_exception(self, future): - self.finished_futures.append(future) - - def add_cancelled(self, future): - self.finished_futures.append(future) - -class _AsCompletedWaiter(_Waiter): - """Used by as_completed().""" - - def __init__(self): - super(_AsCompletedWaiter, self).__init__() - self.lock = threading.Lock() - - def add_result(self, future): - with self.lock: - super(_AsCompletedWaiter, self).add_result(future) - self.event.set() - - def add_exception(self, future): - with self.lock: - super(_AsCompletedWaiter, self).add_exception(future) - self.event.set() - - def add_cancelled(self, future): - with self.lock: - super(_AsCompletedWaiter, self).add_cancelled(future) - self.event.set() - -class _FirstCompletedWaiter(_Waiter): - """Used by wait(return_when=FIRST_COMPLETED).""" - - def add_result(self, future): - super(_FirstCompletedWaiter, self).add_result(future) - self.event.set() - - def add_exception(self, future): - super(_FirstCompletedWaiter, self).add_exception(future) - self.event.set() - - def add_cancelled(self, future): - super(_FirstCompletedWaiter, self).add_cancelled(future) - self.event.set() - -class _AllCompletedWaiter(_Waiter): - """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED).""" - - def __init__(self, num_pending_calls, stop_on_exception): - self.num_pending_calls = num_pending_calls - self.stop_on_exception = stop_on_exception - super(_AllCompletedWaiter, self).__init__() - - def _decrement_pending_calls(self): - self.num_pending_calls -= 1 - if not self.num_pending_calls: - self.event.set() - - def add_result(self, future): - super(_AllCompletedWaiter, self).add_result(future) - self._decrement_pending_calls() - - def add_exception(self, future): - super(_AllCompletedWaiter, self).add_exception(future) - if self.stop_on_exception: - self.event.set() - else: - self._decrement_pending_calls() - - def add_cancelled(self, future): - super(_AllCompletedWaiter, self).add_cancelled(future) - self._decrement_pending_calls() - -class _AcquireFutures(object): - """A context manager that does an ordered acquire of Future conditions.""" - - def __init__(self, futures): - self.futures = sorted(futures, key=id) - - def __enter__(self): - for future in self.futures: - future._condition.acquire() - - def __exit__(self, *args): - for future in self.futures: - future._condition.release() - -def _create_and_install_waiters(fs, return_when): - if return_when == _AS_COMPLETED: - waiter = _AsCompletedWaiter() - elif return_when == FIRST_COMPLETED: - waiter = _FirstCompletedWaiter() - else: - pending_count = sum( - f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs) - - if return_when == FIRST_EXCEPTION: - waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True) - elif return_when == ALL_COMPLETED: - waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False) - else: - raise ValueError("Invalid return condition: %r" % return_when) - - for f in fs: - f._waiters.append(waiter) - - return waiter - -def as_completed(fs, timeout=None): - """An iterator over the given futures that yields each as it completes. - - Args: - fs: The sequence of Futures (possibly created by different Executors) to - iterate over. - timeout: The maximum number of seconds to wait. If None, then there - is no limit on the wait time. - - Returns: - An iterator that yields the given Futures as they complete (finished or - cancelled). - - Raises: - TimeoutError: If the entire result iterator could not be generated - before the given timeout. - """ - if timeout is not None: - end_time = timeout + time.time() - - with _AcquireFutures(fs): - finished = set( - f for f in fs - if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) - pending = set(fs) - finished - waiter = _create_and_install_waiters(fs, _AS_COMPLETED) - - try: - for future in finished: - yield future - - while pending: - if timeout is None: - wait_timeout = None - else: - wait_timeout = end_time - time.time() - if wait_timeout < 0: - raise TimeoutError( - '%d (of %d) futures unfinished' % ( - len(pending), len(fs))) - - waiter.event.wait(wait_timeout) - - with waiter.lock: - finished = waiter.finished_futures - waiter.finished_futures = [] - waiter.event.clear() - - for future in finished: - yield future - pending.remove(future) - - finally: - for f in fs: - f._waiters.remove(waiter) - -DoneAndNotDoneFutures = collections.namedtuple( - 'DoneAndNotDoneFutures', 'done not_done') -def wait(fs, timeout=None, return_when=ALL_COMPLETED): - """Wait for the futures in the given sequence to complete. - - Args: - fs: The sequence of Futures (possibly created by different Executors) to - wait upon. - timeout: The maximum number of seconds to wait. If None, then there - is no limit on the wait time. - return_when: Indicates when this function should return. The options - are: - - FIRST_COMPLETED - Return when any future finishes or is - cancelled. - FIRST_EXCEPTION - Return when any future finishes by raising an - exception. If no future raises an exception - then it is equivalent to ALL_COMPLETED. - ALL_COMPLETED - Return when all futures finish or are cancelled. - - Returns: - A named 2-tuple of sets. The first set, named 'done', contains the - futures that completed (is finished or cancelled) before the wait - completed. The second set, named 'not_done', contains uncompleted - futures. - """ - with _AcquireFutures(fs): - done = set(f for f in fs - if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) - not_done = set(fs) - done - - if (return_when == FIRST_COMPLETED) and done: - return DoneAndNotDoneFutures(done, not_done) - elif (return_when == FIRST_EXCEPTION) and done: - if any(f for f in done - if not f.cancelled() and f.exception() is not None): - return DoneAndNotDoneFutures(done, not_done) - - if len(done) == len(fs): - return DoneAndNotDoneFutures(done, not_done) - - waiter = _create_and_install_waiters(fs, return_when) - - waiter.event.wait(timeout) - for f in fs: - f._waiters.remove(waiter) - - done.update(waiter.finished_futures) - return DoneAndNotDoneFutures(done, set(fs) - done) - -class Future(object): - """Represents the result of an asynchronous computation.""" - - def __init__(self): - """Initializes the future. Should not be called by clients.""" - self._condition = threading.Condition() - self._state = PENDING - self._result = None - self._exception = None - self._waiters = [] - self._done_callbacks = [] - - def _invoke_callbacks(self): - for callback in self._done_callbacks: - try: - callback(self) - except Exception: - LOGGER.exception('exception calling callback for %r', self) - - def __repr__(self): - with self._condition: - if self._state == FINISHED: - if self._exception: - return '' % ( - hex(id(self)), - _STATE_TO_DESCRIPTION_MAP[self._state], - self._exception.__class__.__name__) - else: - return '' % ( - hex(id(self)), - _STATE_TO_DESCRIPTION_MAP[self._state], - self._result.__class__.__name__) - return '' % ( - hex(id(self)), - _STATE_TO_DESCRIPTION_MAP[self._state]) - - def cancel(self): - """Cancel the future if possible. - - Returns True if the future was cancelled, False otherwise. A future - cannot be cancelled if it is running or has already completed. - """ - with self._condition: - if self._state in [RUNNING, FINISHED]: - return False - - if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: - return True - - self._state = CANCELLED - self._condition.notify_all() - - self._invoke_callbacks() - return True - - def cancelled(self): - """Return True if the future has cancelled.""" - with self._condition: - return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED] - - def running(self): - """Return True if the future is currently executing.""" - with self._condition: - return self._state == RUNNING - - def done(self): - """Return True of the future was cancelled or finished executing.""" - with self._condition: - return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] - - def __get_result(self): - if self._exception: - raise self._exception - else: - return self._result - - def add_done_callback(self, fn): - """Attaches a callable that will be called when the future finishes. - - Args: - fn: A callable that will be called with this future as its only - argument when the future completes or is cancelled. The callable - will always be called by a thread in the same process in which - it was added. If the future has already completed or been - cancelled then the callable will be called immediately. These - callables are called in the order that they were added. - """ - with self._condition: - if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]: - self._done_callbacks.append(fn) - return - fn(self) - - def result(self, timeout=None): - """Return the result of the call that the future represents. - - Args: - timeout: The number of seconds to wait for the result if the future - isn't done. If None, then there is no limit on the wait time. - - Returns: - The result of the call that the future represents. - - Raises: - CancelledError: If the future was cancelled. - TimeoutError: If the future didn't finish executing before the given - timeout. - Exception: If the call raised then that exception will be raised. - """ - with self._condition: - if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: - raise CancelledError() - elif self._state == FINISHED: - return self.__get_result() - - self._condition.wait(timeout) - - if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: - raise CancelledError() - elif self._state == FINISHED: - return self.__get_result() - else: - raise TimeoutError() - - def exception(self, timeout=None): - """Return the exception raised by the call that the future represents. - - Args: - timeout: The number of seconds to wait for the exception if the - future isn't done. If None, then there is no limit on the wait - time. - - Returns: - The exception raised by the call that the future represents or None - if the call completed without raising. - - Raises: - CancelledError: If the future was cancelled. - TimeoutError: If the future didn't finish executing before the given - timeout. - """ - - with self._condition: - if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: - raise CancelledError() - elif self._state == FINISHED: - return self._exception - - self._condition.wait(timeout) - - if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: - raise CancelledError() - elif self._state == FINISHED: - return self._exception - else: - raise TimeoutError() - - # The following methods should only be used by Executors and in tests. - def set_running_or_notify_cancel(self): - """Mark the future as running or process any cancel notifications. - - Should only be used by Executor implementations and unit tests. - - If the future has been cancelled (cancel() was called and returned - True) then any threads waiting on the future completing (though calls - to as_completed() or wait()) are notified and False is returned. - - If the future was not cancelled then it is put in the running state - (future calls to running() will return True) and True is returned. - - This method should be called by Executor implementations before - executing the work associated with this future. If this method returns - False then the work should not be executed. - - Returns: - False if the Future was cancelled, True otherwise. - - Raises: - RuntimeError: if this method was already called or if set_result() - or set_exception() was called. - """ - with self._condition: - if self._state == CANCELLED: - self._state = CANCELLED_AND_NOTIFIED - for waiter in self._waiters: - waiter.add_cancelled(self) - # self._condition.notify_all() is not necessary because - # self.cancel() triggers a notification. - return False - elif self._state == PENDING: - self._state = RUNNING - return True - else: - LOGGER.critical('Future %s in unexpected state: %s', - id(self.future), - self.future._state) - raise RuntimeError('Future in unexpected state') - - def set_result(self, result): - """Sets the return value of work associated with the future. - - Should only be used by Executor implementations and unit tests. - """ - with self._condition: - self._result = result - self._state = FINISHED - for waiter in self._waiters: - waiter.add_result(self) - self._condition.notify_all() - self._invoke_callbacks() - - def set_exception(self, exception): - """Sets the result of the future as being the given exception. - - Should only be used by Executor implementations and unit tests. - """ - with self._condition: - self._exception = exception - self._state = FINISHED - for waiter in self._waiters: - waiter.add_exception(self) - self._condition.notify_all() - self._invoke_callbacks() - -class Executor(object): - """This is an abstract base class for concrete asynchronous executors.""" - - def submit(self, fn, *args, **kwargs): - """Submits a callable to be executed with the given arguments. - - Schedules the callable to be executed as fn(*args, **kwargs) and returns - a Future instance representing the execution of the callable. - - Returns: - A Future representing the given call. - """ - raise NotImplementedError() - - def map(self, fn, *iterables, **kwargs): - """Returns a iterator equivalent to map(fn, iter). - - Args: - fn: A callable that will take take as many arguments as there are - passed iterables. - timeout: The maximum number of seconds to wait. If None, then there - is no limit on the wait time. - - Returns: - An iterator equivalent to: map(func, *iterables) but the calls may - be evaluated out-of-order. - - Raises: - TimeoutError: If the entire result iterator could not be generated - before the given timeout. - Exception: If fn(*args) raises for any values. - """ - timeout = kwargs.get('timeout') - if timeout is not None: - end_time = timeout + time.time() - - fs = [self.submit(fn, *args) for args in zip(*iterables)] - - try: - for future in fs: - if timeout is None: - yield future.result() - else: - yield future.result(end_time - time.time()) - finally: - for future in fs: - future.cancel() - - def shutdown(self, wait=True): - """Clean-up the resources associated with the Executor. - - It is safe to call this method several times. Otherwise, no other - methods can be called after this one. - - Args: - wait: If True then shutdown will not return until all running - futures have finished executing and the resources used by the - executor have been reclaimed. - """ - pass - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.shutdown(wait=True) - return False diff --git a/python2/futures/process.py b/python2/futures/process.py deleted file mode 100644 index ec48377..0000000 --- a/python2/futures/process.py +++ /dev/null @@ -1,337 +0,0 @@ -# Copyright 2009 Brian Quinlan. All Rights Reserved. -# Licensed to PSF under a Contributor Agreement. - -"""Implements ProcessPoolExecutor. - -The follow diagram and text describe the data-flow through the system: - -|======================= In-process =====================|== Out-of-process ==| - -+----------+ +----------+ +--------+ +-----------+ +---------+ -| | => | Work Ids | => | | => | Call Q | => | | -| | +----------+ | | +-----------+ | | -| | | ... | | | | ... | | | -| | | 6 | | | | 5, call() | | | -| | | 7 | | | | ... | | | -| Process | | ... | | Local | +-----------+ | Process | -| Pool | +----------+ | Worker | | #1..n | -| Executor | | Thread | | | -| | +----------- + | | +-----------+ | | -| | <=> | Work Items | <=> | | <= | Result Q | <= | | -| | +------------+ | | +-----------+ | | -| | | 6: call() | | | | ... | | | -| | | future | | | | 4, result | | | -| | | ... | | | | 3, except | | | -+----------+ +------------+ +--------+ +-----------+ +---------+ - -Executor.submit() called: -- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict -- adds the id of the _WorkItem to the "Work Ids" queue - -Local worker thread: -- reads work ids from the "Work Ids" queue and looks up the corresponding - WorkItem from the "Work Items" dict: if the work item has been cancelled then - it is simply removed from the dict, otherwise it is repackaged as a - _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q" - until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because - calls placed in the "Call Q" can no longer be cancelled with Future.cancel(). -- reads _ResultItems from "Result Q", updates the future stored in the - "Work Items" dict and deletes the dict entry - -Process #1..n: -- reads _CallItems from "Call Q", executes the calls, and puts the resulting - _ResultItems in "Request Q" -""" - -__author__ = 'Brian Quinlan (brian@sweetapp.com)' - -import atexit -import _base -import Queue -import multiprocessing -import threading -import weakref - -# Workers are created as daemon threads and processes. This is done to allow the -# interpreter to exit when there are still idle processes in a -# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However, -# allowing workers to die with the interpreter has two undesirable properties: -# - The workers would still be running during interpretor shutdown, -# meaning that they would fail in unpredictable ways. -# - The workers could be killed while evaluating a work item, which could -# be bad if the callable being evaluated has external side-effects e.g. -# writing to a file. -# -# To work around this problem, an exit handler is installed which tells the -# workers to exit when their work queues are empty and then waits until the -# threads/processes finish. - -_thread_references = set() -_shutdown = False - -def _python_exit(): - global _shutdown - _shutdown = True - for thread_reference in _thread_references: - thread = thread_reference() - if thread is not None: - thread.join() - -def _remove_dead_thread_references(): - """Remove inactive threads from _thread_references. - - Should be called periodically to prevent memory leaks in scenarios such as: - >>> while True: - >>> ... t = ThreadPoolExecutor(max_workers=5) - >>> ... t.map(int, ['1', '2', '3', '4', '5']) - """ - for thread_reference in set(_thread_references): - if thread_reference() is None: - _thread_references.discard(thread_reference) - -# Controls how many more calls than processes will be queued in the call queue. -# A smaller number will mean that processes spend more time idle waiting for -# work while a larger number will make Future.cancel() succeed less frequently -# (Futures in the call queue cannot be cancelled). -EXTRA_QUEUED_CALLS = 1 - -class _WorkItem(object): - def __init__(self, future, fn, args, kwargs): - self.future = future - self.fn = fn - self.args = args - self.kwargs = kwargs - -class _ResultItem(object): - def __init__(self, work_id, exception=None, result=None): - self.work_id = work_id - self.exception = exception - self.result = result - -class _CallItem(object): - def __init__(self, work_id, fn, args, kwargs): - self.work_id = work_id - self.fn = fn - self.args = args - self.kwargs = kwargs - -def _process_worker(call_queue, result_queue, shutdown): - """Evaluates calls from call_queue and places the results in result_queue. - - This worker is run in a seperate process. - - Args: - call_queue: A multiprocessing.Queue of _CallItems that will be read and - evaluated by the worker. - result_queue: A multiprocessing.Queue of _ResultItems that will written - to by the worker. - shutdown: A multiprocessing.Event that will be set as a signal to the - worker that it should exit when call_queue is empty. - """ - while True: - try: - call_item = call_queue.get(block=True, timeout=0.1) - except Queue.Empty: - if shutdown.is_set(): - return - else: - try: - r = call_item.fn(*call_item.args, **call_item.kwargs) - except BaseException as e: - result_queue.put(_ResultItem(call_item.work_id, - exception=e)) - else: - result_queue.put(_ResultItem(call_item.work_id, - result=r)) - -def _add_call_item_to_queue(pending_work_items, - work_ids, - call_queue): - """Fills call_queue with _WorkItems from pending_work_items. - - This function never blocks. - - Args: - pending_work_items: A dict mapping work ids to _WorkItems e.g. - {5: <_WorkItem...>, 6: <_WorkItem...>, ...} - work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids - are consumed and the corresponding _WorkItems from - pending_work_items are transformed into _CallItems and put in - call_queue. - call_queue: A multiprocessing.Queue that will be filled with _CallItems - derived from _WorkItems. - """ - while True: - if call_queue.full(): - return - try: - work_id = work_ids.get(block=False) - except Queue.Empty: - return - else: - work_item = pending_work_items[work_id] - - if work_item.future.set_running_or_notify_cancel(): - call_queue.put(_CallItem(work_id, - work_item.fn, - work_item.args, - work_item.kwargs), - block=True) - else: - del pending_work_items[work_id] - continue - -def _queue_manangement_worker(executor_reference, - processes, - pending_work_items, - work_ids_queue, - call_queue, - result_queue, - shutdown_process_event): - """Manages the communication between this process and the worker processes. - - This function is run in a local thread. - - Args: - executor_reference: A weakref.ref to the ProcessPoolExecutor that owns - this thread. Used to determine if the ProcessPoolExecutor has been - garbage collected and that this function can exit. - process: A list of the multiprocessing.Process instances used as - workers. - pending_work_items: A dict mapping work ids to _WorkItems e.g. - {5: <_WorkItem...>, 6: <_WorkItem...>, ...} - work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]). - call_queue: A multiprocessing.Queue that will be filled with _CallItems - derived from _WorkItems for processing by the process workers. - result_queue: A multiprocessing.Queue of _ResultItems generated by the - process workers. - shutdown_process_event: A multiprocessing.Event used to signal the - process workers that they should exit when their work queue is - empty. - """ - while True: - _add_call_item_to_queue(pending_work_items, - work_ids_queue, - call_queue) - - try: - result_item = result_queue.get(block=True, timeout=0.1) - except Queue.Empty: - executor = executor_reference() - # No more work items can be added if: - # - The interpreter is shutting down OR - # - The executor that owns this worker has been collected OR - # - The executor that owns this worker has been shutdown. - if _shutdown or executor is None or executor._shutdown_thread: - # Since no new work items can be added, it is safe to shutdown - # this thread if there are no pending work items. - if not pending_work_items: - shutdown_process_event.set() - - # If .join() is not called on the created processes then - # some multiprocessing.Queue methods may deadlock on Mac OS - # X. - for p in processes: - p.join() - return - del executor - else: - work_item = pending_work_items[result_item.work_id] - del pending_work_items[result_item.work_id] - - if result_item.exception: - work_item.future.set_exception(result_item.exception) - else: - work_item.future.set_result(result_item.result) - -class ProcessPoolExecutor(_base.Executor): - def __init__(self, max_workers=None): - """Initializes a new ProcessPoolExecutor instance. - - Args: - max_workers: The maximum number of processes that can be used to - execute the given calls. If None or not given then as many - worker processes will be created as the machine has processors. - """ - _remove_dead_thread_references() - - if max_workers is None: - self._max_workers = multiprocessing.cpu_count() - else: - self._max_workers = max_workers - - # Make the call queue slightly larger than the number of processes to - # prevent the worker processes from idling. But don't make it too big - # because futures in the call queue cannot be cancelled. - self._call_queue = multiprocessing.Queue(self._max_workers + - EXTRA_QUEUED_CALLS) - self._result_queue = multiprocessing.Queue() - self._work_ids = Queue.Queue() - self._queue_management_thread = None - self._processes = set() - - # Shutdown is a two-step process. - self._shutdown_thread = False - self._shutdown_process_event = multiprocessing.Event() - self._shutdown_lock = threading.Lock() - self._queue_count = 0 - self._pending_work_items = {} - - def _start_queue_management_thread(self): - if self._queue_management_thread is None: - self._queue_management_thread = threading.Thread( - target=_queue_manangement_worker, - args=(weakref.ref(self), - self._processes, - self._pending_work_items, - self._work_ids, - self._call_queue, - self._result_queue, - self._shutdown_process_event)) - self._queue_management_thread.daemon = True - self._queue_management_thread.start() - _thread_references.add(weakref.ref(self._queue_management_thread)) - - def _adjust_process_count(self): - for _ in range(len(self._processes), self._max_workers): - p = multiprocessing.Process( - target=_process_worker, - args=(self._call_queue, - self._result_queue, - self._shutdown_process_event)) - p.start() - self._processes.add(p) - - def submit(self, fn, *args, **kwargs): - with self._shutdown_lock: - if self._shutdown_thread: - raise RuntimeError('cannot schedule new futures after shutdown') - - f = _base.Future() - w = _WorkItem(f, fn, args, kwargs) - - self._pending_work_items[self._queue_count] = w - self._work_ids.put(self._queue_count) - self._queue_count += 1 - - self._start_queue_management_thread() - self._adjust_process_count() - return f - submit.__doc__ = _base.Executor.submit.__doc__ - - def shutdown(self, wait=True): - with self._shutdown_lock: - self._shutdown_thread = True - if wait: - if self._queue_management_thread: - self._queue_management_thread.join() - # To reduce the risk of openning too many files, remove references to - # objects that use file descriptors. - self._queue_management_thread = None - self._call_queue = None - self._result_queue = None - self._shutdown_process_event = None - self._processes = None - shutdown.__doc__ = _base.Executor.shutdown.__doc__ - -atexit.register(_python_exit) diff --git a/python2/futures/thread.py b/python2/futures/thread.py deleted file mode 100644 index 3f1584a..0000000 --- a/python2/futures/thread.py +++ /dev/null @@ -1,136 +0,0 @@ -# Copyright 2009 Brian Quinlan. All Rights Reserved. -# Licensed to PSF under a Contributor Agreement. - -"""Implements ThreadPoolExecutor.""" - -__author__ = 'Brian Quinlan (brian@sweetapp.com)' - -import atexit -import _base -import Queue -import threading -import weakref - -# Workers are created as daemon threads. This is done to allow the interpreter -# to exit when there are still idle threads in a ThreadPoolExecutor's thread -# pool (i.e. shutdown() was not called). However, allowing workers to die with -# the interpreter has two undesirable properties: -# - The workers would still be running during interpretor shutdown, -# meaning that they would fail in unpredictable ways. -# - The workers could be killed while evaluating a work item, which could -# be bad if the callable being evaluated has external side-effects e.g. -# writing to a file. -# -# To work around this problem, an exit handler is installed which tells the -# workers to exit when their work queues are empty and then waits until the -# threads finish. - -_thread_references = set() -_shutdown = False - -def _python_exit(): - global _shutdown - _shutdown = True - for thread_reference in _thread_references: - thread = thread_reference() - if thread is not None: - thread.join() - -def _remove_dead_thread_references(): - """Remove inactive threads from _thread_references. - - Should be called periodically to prevent memory leaks in scenarios such as: - >>> while True: - ... t = ThreadPoolExecutor(max_workers=5) - ... t.map(int, ['1', '2', '3', '4', '5']) - """ - for thread_reference in set(_thread_references): - if thread_reference() is None: - _thread_references.discard(thread_reference) - -atexit.register(_python_exit) - -class _WorkItem(object): - def __init__(self, future, fn, args, kwargs): - self.future = future - self.fn = fn - self.args = args - self.kwargs = kwargs - - def run(self): - if not self.future.set_running_or_notify_cancel(): - return - - try: - result = self.fn(*self.args, **self.kwargs) - except BaseException as e: - self.future.set_exception(e) - else: - self.future.set_result(result) - -def _worker(executor_reference, work_queue): - try: - while True: - try: - work_item = work_queue.get(block=True, timeout=0.1) - except Queue.Empty: - executor = executor_reference() - # Exit if: - # - The interpreter is shutting down OR - # - The executor that owns the worker has been collected OR - # - The executor that owns the worker has been shutdown. - if _shutdown or executor is None or executor._shutdown: - return - del executor - else: - work_item.run() - except BaseException as e: - _base.LOGGER.critical('Exception in worker', exc_info=True) - -class ThreadPoolExecutor(_base.Executor): - def __init__(self, max_workers): - """Initializes a new ThreadPoolExecutor instance. - - Args: - max_workers: The maximum number of threads that can be used to - execute the given calls. - """ - _remove_dead_thread_references() - - self._max_workers = max_workers - self._work_queue = Queue.Queue() - self._threads = set() - self._shutdown = False - self._shutdown_lock = threading.Lock() - - def submit(self, fn, *args, **kwargs): - with self._shutdown_lock: - if self._shutdown: - raise RuntimeError('cannot schedule new futures after shutdown') - - f = _base.Future() - w = _WorkItem(f, fn, args, kwargs) - - self._work_queue.put(w) - self._adjust_thread_count() - return f - submit.__doc__ = _base.Executor.submit.__doc__ - - def _adjust_thread_count(self): - # TODO(bquinlan): Should avoid creating new threads if there are more - # idle threads than items in the work queue. - if len(self._threads) < self._max_workers: - t = threading.Thread(target=_worker, - args=(weakref.ref(self), self._work_queue)) - t.daemon = True - t.start() - self._threads.add(t) - _thread_references.add(weakref.ref(t)) - - def shutdown(self, wait=True): - with self._shutdown_lock: - self._shutdown = True - if wait: - for t in self._threads: - t.join() - shutdown.__doc__ = _base.Executor.shutdown.__doc__ diff --git a/python2/primes.py b/python2/primes.py deleted file mode 100644 index fa6c355..0000000 --- a/python2/primes.py +++ /dev/null @@ -1,47 +0,0 @@ -import futures -import math -import time - -PRIMES = [ - 112272535095293, - 112582705942171, - 112272535095293, - 115280095190773, - 115797848077099, - 117450548693743, - 993960000099397] - -def is_prime(n): - if n % 2 == 0: - return False - - sqrt_n = int(math.floor(math.sqrt(n))) - for i in range(3, sqrt_n + 1, 2): - if n % i == 0: - return False - return True - -def sequential(): - return list(map(is_prime, PRIMES)) - -def with_process_pool_executor(): - with futures.ProcessPoolExecutor(10) as executor: - return list(executor.map(is_prime, PRIMES)) - -def with_thread_pool_executor(): - with futures.ThreadPoolExecutor(10) as executor: - return list(executor.map(is_prime, PRIMES)) - -def main(): - for name, fn in [('sequential', sequential), - ('processes', with_process_pool_executor), - ('threads', with_thread_pool_executor)]: - print name.ljust(12), - start = time.time() - if fn() != [True] * len(PRIMES): - print 'failed' - else: - print '%.2f seconds' % (time.time() - start) - -if __name__ == '__main__': - main() diff --git a/python2/setup.py b/python2/setup.py deleted file mode 100755 index 0465a9b..0000000 --- a/python2/setup.py +++ /dev/null @@ -1,18 +0,0 @@ -#!/usr/bin/env python - -from distutils.core import setup - -setup(name='futures', - version='2.0', - description='Java-style futures implementation in Python 2.x', - author='Brian Quinlan', - author_email='brian@sweetapp.com', - url='http://code.google.com/p/pythonfutures', - download_url='http://pypi.python.org/pypi/futures3/', - packages=['futures'], - license='BSD', - classifiers=['License :: OSI Approved :: BSD License', - 'Development Status :: 5 - Production/Stable', - 'Intended Audience :: Developers', - 'Programming Language :: Python :: 2'] - ) diff --git a/python2/test_futures.py b/python2/test_futures.py deleted file mode 100644 index 2d5672b..0000000 --- a/python2/test_futures.py +++ /dev/null @@ -1,811 +0,0 @@ -import logging -import multiprocessing -import re -import StringIO -import sys -import threading -from test import test_support -import time -import unittest - -if sys.platform.startswith('win'): - import ctypes - import ctypes.wintypes - -import futures -from futures._base import ( - PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future, - LOGGER, STDERR_HANDLER, wait) -import futures.process - -def create_future(state=PENDING, exception=None, result=None): - f = Future() - f._state = state - f._exception = exception - f._result = result - return f - -PENDING_FUTURE = create_future(state=PENDING) -RUNNING_FUTURE = create_future(state=RUNNING) -CANCELLED_FUTURE = create_future(state=CANCELLED) -CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED) -EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError()) -SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42) - -def mul(x, y): - return x * y - -class Call(object): - """A call that can be submitted to a future.Executor for testing. - - The call signals when it is called and waits for an event before finishing. - """ - CALL_LOCKS = {} - def _create_event(self): - if sys.platform.startswith('win'): - class SECURITY_ATTRIBUTES(ctypes.Structure): - _fields_ = [("nLength", ctypes.wintypes.DWORD), - ("lpSecurityDescriptor", ctypes.wintypes.LPVOID), - ("bInheritHandle", ctypes.wintypes.BOOL)] - - s = SECURITY_ATTRIBUTES() - s.nLength = ctypes.sizeof(s) - s.lpSecurityDescriptor = None - s.bInheritHandle = True - - handle = ctypes.windll.kernel32.CreateEventA(ctypes.pointer(s), - True, - False, - None) - assert handle is not None - return handle - else: - event = multiprocessing.Event() - self.CALL_LOCKS[id(event)] = event - return id(event) - - def _wait_on_event(self, handle): - if sys.platform.startswith('win'): - r = ctypes.windll.kernel32.WaitForSingleObject(handle, 5 * 1000) - assert r == 0 - else: - self.CALL_LOCKS[handle].wait() - - def _signal_event(self, handle): - if sys.platform.startswith('win'): - r = ctypes.windll.kernel32.SetEvent(handle) - assert r != 0 - else: - self.CALL_LOCKS[handle].set() - - def __init__(self, manual_finish=False, result=42): - self._called_event = self._create_event() - self._can_finish = self._create_event() - - self._result = result - - if not manual_finish: - self._signal_event(self._can_finish) - - def wait_on_called(self): - self._wait_on_event(self._called_event) - - def set_can(self): - self._signal_event(self._can_finish) - - def __call__(self): - self._signal_event(self._called_event) - self._wait_on_event(self._can_finish) - - return self._result - - def close(self): - self.set_can() - if sys.platform.startswith('win'): - ctypes.windll.kernel32.CloseHandle(self._called_event) - ctypes.windll.kernel32.CloseHandle(self._can_finish) - else: - del self.CALL_LOCKS[self._called_event] - del self.CALL_LOCKS[self._can_finish] - -class ExceptionCall(Call): - def __call__(self): - self._signal_event(self._called_event) - self._wait_on_event(self._can_finish) - raise ZeroDivisionError() - -class MapCall(Call): - def __init__(self, result=42): - super(MapCall, self).__init__(manual_finish=True, result=result) - - def __call__(self, manual_finish): - if manual_finish: - super(MapCall, self).__call__() - return self._result - -class ExecutorShutdownTest(unittest.TestCase): - def test_run_after_shutdown(self): - self.executor.shutdown() - self.assertRaises(RuntimeError, - self.executor.submit, - pow, 2, 5) - - - def _start_some_futures(self): - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) - call3 = Call(manual_finish=True) - - try: - self.executor.submit(call1) - self.executor.submit(call2) - self.executor.submit(call3) - - call1.wait_on_called() - call2.wait_on_called() - call3.wait_on_called() - - call1.set_can() - call2.set_can() - call3.set_can() - finally: - call1.close() - call2.close() - call3.close() - -class ThreadPoolShutdownTest(ExecutorShutdownTest): - def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_workers=5) - - def tearDown(self): - self.executor.shutdown(wait=True) - - def test_threads_terminate(self): - self._start_some_futures() - self.assertEqual(len(self.executor._threads), 3) - self.executor.shutdown() - for t in self.executor._threads: - t.join() - - def test_context_manager_shutdown(self): - with futures.ThreadPoolExecutor(max_workers=5) as e: - executor = e - self.assertEqual(list(e.map(abs, range(-5, 5))), - [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) - - for t in executor._threads: - t.join() - - def test_del_shutdown(self): - executor = futures.ThreadPoolExecutor(max_workers=5) - executor.map(abs, range(-5, 5)) - threads = executor._threads - del executor - - for t in threads: - t.join() - -class ProcessPoolShutdownTest(ExecutorShutdownTest): - def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_workers=5) - - def tearDown(self): - self.executor.shutdown(wait=True) - - def test_processes_terminate(self): - self._start_some_futures() - self.assertEqual(len(self.executor._processes), 5) - processes = self.executor._processes - self.executor.shutdown() - - for p in processes: - p.join() - - def test_context_manager_shutdown(self): - with futures.ProcessPoolExecutor(max_workers=5) as e: - executor = e - self.assertEqual(list(e.map(abs, range(-5, 5))), - [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) - - for p in self.executor._processes: - p.join() - - def test_del_shutdown(self): - executor = futures.ProcessPoolExecutor(max_workers=5) - list(executor.map(abs, range(-5, 5))) - queue_management_thread = executor._queue_management_thread - processes = executor._processes - del executor - - queue_management_thread.join() - for p in processes: - p.join() - -class WaitTests(unittest.TestCase): - def test_first_completed(self): - def wait_test(): - while not future1._waiters: - pass - call1.set_can() - - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - - t = threading.Thread(target=wait_test) - t.start() - done, not_done = futures.wait( - [CANCELLED_FUTURE, future1, future2], - return_when=futures.FIRST_COMPLETED) - - self.assertEquals(set([future1]), done) - self.assertEquals(set([CANCELLED_FUTURE, future2]), not_done) - finally: - call1.close() - call2.close() - - def test_first_completed_one_already_completed(self): - call1 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - - finished, pending = futures.wait( - [SUCCESSFUL_FUTURE, future1], - return_when=futures.FIRST_COMPLETED) - - self.assertEquals(set([SUCCESSFUL_FUTURE]), finished) - self.assertEquals(set([future1]), pending) - finally: - call1.close() - - def test_first_exception(self): - def wait_test(): - while not future1._waiters: - pass - call1.set_can() - call2.set_can() - - call1 = Call(manual_finish=True) - call2 = ExceptionCall(manual_finish=True) - call3 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - future3 = self.executor.submit(call3) - - t = threading.Thread(target=wait_test) - t.start() - finished, pending = futures.wait( - [future1, future2, future3], - return_when=futures.FIRST_EXCEPTION) - - self.assertEquals(set([future1, future2]), finished) - self.assertEquals(set([future3]), pending) - finally: - call1.close() - call2.close() - call3.close() - - def test_first_exception_some_already_complete(self): - def wait_test(): - while not future1._waiters: - pass - call1.set_can() - - call1 = ExceptionCall(manual_finish=True) - call2 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - - t = threading.Thread(target=wait_test) - t.start() - finished, pending = futures.wait( - [SUCCESSFUL_FUTURE, - CANCELLED_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - future1, future2], - return_when=futures.FIRST_EXCEPTION) - - self.assertEquals(set([SUCCESSFUL_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - future1]), finished) - self.assertEquals(set([CANCELLED_FUTURE, future2]), pending) - - - finally: - call1.close() - call2.close() - - def test_first_exception_one_already_failed(self): - call1 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - - finished, pending = futures.wait( - [EXCEPTION_FUTURE, future1], - return_when=futures.FIRST_EXCEPTION) - - self.assertEquals(set([EXCEPTION_FUTURE]), finished) - self.assertEquals(set([future1]), pending) - finally: - call1.close() - - def test_all_completed(self): - def wait_test(): - while not future1._waiters: - pass - call1.set_can() - call2.set_can() - - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - - t = threading.Thread(target=wait_test) - t.start() - finished, pending = futures.wait( - [future1, future2], - return_when=futures.ALL_COMPLETED) - - self.assertEquals(set([future1, future2]), finished) - self.assertEquals(set(), pending) - - - finally: - call1.close() - call2.close() - - def test_all_completed_some_already_completed(self): - def wait_test(): - while not future1._waiters: - pass - - future4.cancel() - call1.set_can() - call2.set_can() - call3.set_can() - - self.assertTrue( - futures.process.EXTRA_QUEUED_CALLS <= 1, - 'this test assumes that future4 will be cancelled before it is ' - 'queued to run - which might not be the case if ' - 'ProcessPoolExecutor is too aggresive in scheduling futures') - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) - call3 = Call(manual_finish=True) - call4 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - future3 = self.executor.submit(call3) - future4 = self.executor.submit(call4) - - t = threading.Thread(target=wait_test) - t.start() - finished, pending = futures.wait( - [SUCCESSFUL_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - future1, future2, future3, future4], - return_when=futures.ALL_COMPLETED) - - self.assertEquals(set([SUCCESSFUL_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - future1, future2, future3, future4]), - finished) - self.assertEquals(set(), pending) - finally: - call1.close() - call2.close() - call3.close() - call4.close() - - def test_timeout(self): - def wait_test(): - while not future1._waiters: - pass - call1.set_can() - - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - - t = threading.Thread(target=wait_test) - t.start() - finished, pending = futures.wait( - [CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE, - future1, future2], - timeout=1, - return_when=futures.ALL_COMPLETED) - - self.assertEquals(set([CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE, - future1]), finished) - self.assertEquals(set([future2]), pending) - - - finally: - call1.close() - call2.close() - - -class ThreadPoolWaitTests(WaitTests): - def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_workers=1) - - def tearDown(self): - self.executor.shutdown(wait=True) - -class ProcessPoolWaitTests(WaitTests): - def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_workers=1) - - def tearDown(self): - self.executor.shutdown(wait=True) - -class AsCompletedTests(unittest.TestCase): - # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout. - def test_no_timeout(self): - def wait_test(): - while not future1._waiters: - pass - call1.set_can() - call2.set_can() - - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - - t = threading.Thread(target=wait_test) - t.start() - completed = set(futures.as_completed( - [CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE, - future1, future2])) - self.assertEquals(set( - [CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE, - future1, future2]), - completed) - finally: - call1.close() - call2.close() - - def test_zero_timeout(self): - call1 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - completed_futures = set() - try: - for future in futures.as_completed( - [CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE, - future1], - timeout=0): - completed_futures.add(future) - except futures.TimeoutError: - pass - - self.assertEquals(set([CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE]), - completed_futures) - finally: - call1.close() - -class ThreadPoolAsCompletedTests(AsCompletedTests): - def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_workers=1) - - def tearDown(self): - self.executor.shutdown(wait=True) - -class ProcessPoolAsCompletedTests(AsCompletedTests): - def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_workers=1) - - def tearDown(self): - self.executor.shutdown(wait=True) - -class ExecutorTest(unittest.TestCase): - # Executor.shutdown() and context manager usage is tested by - # ExecutorShutdownTest. - def test_submit(self): - future = self.executor.submit(pow, 2, 8) - self.assertEquals(256, future.result()) - - def test_submit_keyword(self): - future = self.executor.submit(mul, 2, y=8) - self.assertEquals(16, future.result()) - - def test_map(self): - self.assertEqual( - list(self.executor.map(pow, range(10), range(10))), - list(map(pow, range(10), range(10)))) - - def test_map_exception(self): - i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5]) - self.assertEqual(i.next(), (0, 1)) - self.assertEqual(i.next(), (0, 1)) - self.assertRaises(ZeroDivisionError, i.next) - - def test_map_timeout(self): - results = [] - timeout_call = MapCall() - try: - try: - for i in self.executor.map(timeout_call, - [False, False, True], - timeout=1): - results.append(i) - except futures.TimeoutError: - pass - else: - self.fail('expected TimeoutError') - finally: - timeout_call.close() - - self.assertEquals([42, 42], results) - -class ThreadPoolExecutorTest(ExecutorTest): - def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_workers=1) - - def tearDown(self): - self.executor.shutdown(wait=True) - -class ProcessPoolExecutorTest(ExecutorTest): - def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_workers=1) - - def tearDown(self): - self.executor.shutdown(wait=True) - -class FutureTests(unittest.TestCase): - def test_done_callback_with_result(self): - self.callback_result = None - def fn(callback_future): - self.callback_result = callback_future.result() - - f = Future() - f.add_done_callback(fn) - f.set_result(5) - self.assertEquals(5, self.callback_result) - - def test_done_callback_with_exception(self): - self.callback_exception = None - def fn(callback_future): - self.callback_exception = callback_future.exception() - - f = Future() - f.add_done_callback(fn) - f.set_exception(Exception('test')) - self.assertEquals(('test',), self.callback_exception.args) - - def test_done_callback_with_cancel(self): - self.was_cancelled = None - def fn(callback_future): - self.was_cancelled = callback_future.cancelled() - - f = Future() - f.add_done_callback(fn) - self.assertTrue(f.cancel()) - self.assertTrue(self.was_cancelled) - - def test_done_callback_raises(self): - LOGGER.removeHandler(STDERR_HANDLER) - logging_stream = StringIO.StringIO() - handler = logging.StreamHandler(logging_stream) - LOGGER.addHandler(handler) - try: - self.raising_was_called = False - self.fn_was_called = False - - def raising_fn(callback_future): - self.raising_was_called = True - raise Exception('doh!') - - def fn(callback_future): - self.fn_was_called = True - - f = Future() - f.add_done_callback(raising_fn) - f.add_done_callback(fn) - f.set_result(5) - self.assertTrue(self.raising_was_called) - self.assertTrue(self.fn_was_called) - self.assertTrue('Exception: doh!' in logging_stream.getvalue()) - finally: - LOGGER.removeHandler(handler) - LOGGER.addHandler(STDERR_HANDLER) - - def test_done_callback_already_successful(self): - self.callback_result = None - def fn(callback_future): - self.callback_result = callback_future.result() - - f = Future() - f.set_result(5) - f.add_done_callback(fn) - self.assertEquals(5, self.callback_result) - - def test_done_callback_already_failed(self): - self.callback_exception = None - def fn(callback_future): - self.callback_exception = callback_future.exception() - - f = Future() - f.set_exception(Exception('test')) - f.add_done_callback(fn) - self.assertEquals(('test',), self.callback_exception.args) - - def test_done_callback_already_cancelled(self): - self.was_cancelled = None - def fn(callback_future): - self.was_cancelled = callback_future.cancelled() - - f = Future() - self.assertTrue(f.cancel()) - f.add_done_callback(fn) - self.assertTrue(self.was_cancelled) - - def test_repr(self): - self.assertTrue(re.match('', - repr(PENDING_FUTURE))) - self.assertTrue(re.match('', - repr(RUNNING_FUTURE))) - self.assertTrue(re.match('', - repr(CANCELLED_FUTURE))) - self.assertTrue(re.match('', - repr(CANCELLED_AND_NOTIFIED_FUTURE))) - self.assertTrue(re.match( - '', - repr(EXCEPTION_FUTURE))) - self.assertTrue(re.match( - '', - repr(SUCCESSFUL_FUTURE))) - - - def test_cancel(self): - f1 = create_future(state=PENDING) - f2 = create_future(state=RUNNING) - f3 = create_future(state=CANCELLED) - f4 = create_future(state=CANCELLED_AND_NOTIFIED) - f5 = create_future(state=FINISHED, exception=IOError()) - f6 = create_future(state=FINISHED, result=5) - - self.assertTrue(f1.cancel()) - self.assertEquals(f1._state, CANCELLED) - - self.assertFalse(f2.cancel()) - self.assertEquals(f2._state, RUNNING) - - self.assertTrue(f3.cancel()) - self.assertEquals(f3._state, CANCELLED) - - self.assertTrue(f4.cancel()) - self.assertEquals(f4._state, CANCELLED_AND_NOTIFIED) - - self.assertFalse(f5.cancel()) - self.assertEquals(f5._state, FINISHED) - - self.assertFalse(f6.cancel()) - self.assertEquals(f6._state, FINISHED) - - def test_cancelled(self): - self.assertFalse(PENDING_FUTURE.cancelled()) - self.assertFalse(RUNNING_FUTURE.cancelled()) - self.assertTrue(CANCELLED_FUTURE.cancelled()) - self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled()) - self.assertFalse(EXCEPTION_FUTURE.cancelled()) - self.assertFalse(SUCCESSFUL_FUTURE.cancelled()) - - def test_done(self): - self.assertFalse(PENDING_FUTURE.done()) - self.assertFalse(RUNNING_FUTURE.done()) - self.assertTrue(CANCELLED_FUTURE.done()) - self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done()) - self.assertTrue(EXCEPTION_FUTURE.done()) - self.assertTrue(SUCCESSFUL_FUTURE.done()) - - def test_running(self): - self.assertFalse(PENDING_FUTURE.running()) - self.assertTrue(RUNNING_FUTURE.running()) - self.assertFalse(CANCELLED_FUTURE.running()) - self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running()) - self.assertFalse(EXCEPTION_FUTURE.running()) - self.assertFalse(SUCCESSFUL_FUTURE.running()) - - def test_result_with_timeout(self): - self.assertRaises(futures.TimeoutError, - PENDING_FUTURE.result, timeout=0) - self.assertRaises(futures.TimeoutError, - RUNNING_FUTURE.result, timeout=0) - self.assertRaises(futures.CancelledError, - CANCELLED_FUTURE.result, timeout=0) - self.assertRaises(futures.CancelledError, - CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0) - self.assertRaises(IOError, EXCEPTION_FUTURE.result, timeout=0) - self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42) - - def test_result_with_success(self): - # TODO(brian@sweetapp.com): This test is timing dependant. - def notification(): - # Wait until the main thread is waiting for the result. - time.sleep(1) - f1.set_result(42) - - f1 = create_future(state=PENDING) - t = threading.Thread(target=notification) - t.start() - - self.assertEquals(f1.result(timeout=5), 42) - - def test_result_with_cancel(self): - # TODO(brian@sweetapp.com): This test is timing dependant. - def notification(): - # Wait until the main thread is waiting for the result. - time.sleep(1) - f1.cancel() - - f1 = create_future(state=PENDING) - t = threading.Thread(target=notification) - t.start() - - self.assertRaises(futures.CancelledError, f1.result, timeout=5) - - def test_exception_with_timeout(self): - self.assertRaises(futures.TimeoutError, - PENDING_FUTURE.exception, timeout=0) - self.assertRaises(futures.TimeoutError, - RUNNING_FUTURE.exception, timeout=0) - self.assertRaises(futures.CancelledError, - CANCELLED_FUTURE.exception, timeout=0) - self.assertRaises(futures.CancelledError, - CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0) - self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0), - IOError)) - self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None) - - def test_exception_with_success(self): - def notification(): - # Wait until the main thread is waiting for the exception. - time.sleep(1) - with f1._condition: - f1._state = FINISHED - f1._exception = IOError() - f1._condition.notify_all() - - f1 = create_future(state=PENDING) - t = threading.Thread(target=notification) - t.start() - - self.assertTrue(isinstance(f1.exception(timeout=5), IOError)) - -def test_main(): - test_support.run_unittest(ProcessPoolExecutorTest, - ThreadPoolExecutorTest, - ProcessPoolWaitTests, - ThreadPoolWaitTests, - ProcessPoolAsCompletedTests, - ThreadPoolAsCompletedTests, - FutureTests, - ProcessPoolShutdownTest, - ThreadPoolShutdownTest) - -if __name__ == "__main__": - test_main() diff --git a/python3/crawl.py b/python3/crawl.py deleted file mode 100644 index 7135682..0000000 --- a/python3/crawl.py +++ /dev/null @@ -1,68 +0,0 @@ -"""Compare the speed of downloading URLs sequentially vs. using futures.""" - -import datetime -import functools -import futures.thread -import time -import timeit -import urllib.request - -URLS = ['http://www.google.com/', - 'http://www.apple.com/', - 'http://www.ibm.com', - 'http://www.thisurlprobablydoesnotexist.com', - 'http://www.slashdot.org/', - 'http://www.python.org/', - 'http://www.bing.com/', - 'http://www.facebook.com/', - 'http://www.yahoo.com/', - 'http://www.youtube.com/', - 'http://www.blogger.com/'] - -def load_url(url, timeout): - return urllib.request.urlopen(url, timeout=timeout).read() - -def download_urls_sequential(urls, timeout=60): - url_to_content = {} - for url in urls: - try: - url_to_content[url] = load_url(url, timeout=timeout) - except: - pass - return url_to_content - -def download_urls_with_executor(urls, executor, timeout=60): - try: - url_to_content = {} - future_to_url = dict((executor.submit(load_url, url, timeout), url) - for url in urls) - - for future in futures.as_completed(future_to_url): - try: - url_to_content[future_to_url[future]] = future.result() - except: - pass - return url_to_content - finally: - executor.shutdown() - -def main(): - for name, fn in [('sequential', - functools.partial(download_urls_sequential, URLS)), - ('processes', - functools.partial(download_urls_with_executor, - URLS, - futures.ProcessPoolExecutor(10))), - ('threads', - functools.partial(download_urls_with_executor, - URLS, - futures.ThreadPoolExecutor(10)))]: - print('%s: ' % name.ljust(12), end='') - start = time.time() - url_map = fn() - print('%.2f seconds (%d of %d downloaded)' % (time.time() - start, - len(url_map), - len(URLS))) - -if __name__ == '__main__': - main() diff --git a/python3/futures/__init__.py b/python3/futures/__init__.py deleted file mode 100644 index 8331d53..0000000 --- a/python3/futures/__init__.py +++ /dev/null @@ -1,18 +0,0 @@ -# Copyright 2009 Brian Quinlan. All Rights Reserved. -# Licensed to PSF under a Contributor Agreement. - -"""Execute computations asynchronously using threads or processes.""" - -__author__ = 'Brian Quinlan (brian@sweetapp.com)' - -from futures._base import (FIRST_COMPLETED, - FIRST_EXCEPTION, - ALL_COMPLETED, - CancelledError, - TimeoutError, - Future, - Executor, - wait, - as_completed) -from futures.process import ProcessPoolExecutor -from futures.thread import ThreadPoolExecutor diff --git a/python3/futures/_base.py b/python3/futures/_base.py deleted file mode 100644 index 143f330..0000000 --- a/python3/futures/_base.py +++ /dev/null @@ -1,569 +0,0 @@ -# Copyright 2009 Brian Quinlan. All Rights Reserved. -# Licensed to PSF under a Contributor Agreement. - -__author__ = 'Brian Quinlan (brian@sweetapp.com)' - -import collections -import functools -import logging -import threading -import time - -FIRST_COMPLETED = 'FIRST_COMPLETED' -FIRST_EXCEPTION = 'FIRST_EXCEPTION' -ALL_COMPLETED = 'ALL_COMPLETED' -_AS_COMPLETED = '_AS_COMPLETED' - -# Possible future states (for internal use by the futures package). -PENDING = 'PENDING' -RUNNING = 'RUNNING' -# The future was cancelled by the user... -CANCELLED = 'CANCELLED' -# ...and _Waiter.add_cancelled() was called by a worker. -CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED' -FINISHED = 'FINISHED' - -_FUTURE_STATES = [ - PENDING, - RUNNING, - CANCELLED, - CANCELLED_AND_NOTIFIED, - FINISHED -] - -_STATE_TO_DESCRIPTION_MAP = { - PENDING: "pending", - RUNNING: "running", - CANCELLED: "cancelled", - CANCELLED_AND_NOTIFIED: "cancelled", - FINISHED: "finished" -} - -# Logger for internal use by the futures package. -LOGGER = logging.getLogger("futures") -STDERR_HANDLER = logging.StreamHandler() -LOGGER.addHandler(STDERR_HANDLER) - -class Error(Exception): - """Base class for all future-related exceptions.""" - pass - -class CancelledError(Error): - """The Future was cancelled.""" - pass - -class TimeoutError(Error): - """The operation exceeded the given deadline.""" - pass - -class _Waiter(object): - """Provides the event that wait() and as_completed() block on.""" - def __init__(self): - self.event = threading.Event() - self.finished_futures = [] - - def add_result(self, future): - self.finished_futures.append(future) - - def add_exception(self, future): - self.finished_futures.append(future) - - def add_cancelled(self, future): - self.finished_futures.append(future) - -class _AsCompletedWaiter(_Waiter): - """Used by as_completed().""" - - def __init__(self): - super(_AsCompletedWaiter, self).__init__() - self.lock = threading.Lock() - - def add_result(self, future): - with self.lock: - super(_AsCompletedWaiter, self).add_result(future) - self.event.set() - - def add_exception(self, future): - with self.lock: - super(_AsCompletedWaiter, self).add_exception(future) - self.event.set() - - def add_cancelled(self, future): - with self.lock: - super(_AsCompletedWaiter, self).add_cancelled(future) - self.event.set() - -class _FirstCompletedWaiter(_Waiter): - """Used by wait(return_when=FIRST_COMPLETED).""" - - def add_result(self, future): - super().add_result(future) - self.event.set() - - def add_exception(self, future): - super().add_exception(future) - self.event.set() - - def add_cancelled(self, future): - super().add_cancelled(future) - self.event.set() - -class _AllCompletedWaiter(_Waiter): - """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED).""" - - def __init__(self, num_pending_calls, stop_on_exception): - self.num_pending_calls = num_pending_calls - self.stop_on_exception = stop_on_exception - super().__init__() - - def _decrement_pending_calls(self): - self.num_pending_calls -= 1 - if not self.num_pending_calls: - self.event.set() - - def add_result(self, future): - super().add_result(future) - self._decrement_pending_calls() - - def add_exception(self, future): - super().add_exception(future) - if self.stop_on_exception: - self.event.set() - else: - self._decrement_pending_calls() - - def add_cancelled(self, future): - super().add_cancelled(future) - self._decrement_pending_calls() - -class _AcquireFutures(object): - """A context manager that does an ordered acquire of Future conditions.""" - - def __init__(self, futures): - self.futures = sorted(futures, key=id) - - def __enter__(self): - for future in self.futures: - future._condition.acquire() - - def __exit__(self, *args): - for future in self.futures: - future._condition.release() - -def _create_and_install_waiters(fs, return_when): - if return_when == _AS_COMPLETED: - waiter = _AsCompletedWaiter() - elif return_when == FIRST_COMPLETED: - waiter = _FirstCompletedWaiter() - else: - pending_count = sum( - f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs) - - if return_when == FIRST_EXCEPTION: - waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True) - elif return_when == ALL_COMPLETED: - waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False) - else: - raise ValueError("Invalid return condition: %r" % return_when) - - for f in fs: - f._waiters.append(waiter) - - return waiter - -def as_completed(fs, timeout=None): - """An iterator over the given futures that yields each as it completes. - - Args: - fs: The sequence of Futures (possibly created by different Executors) to - iterate over. - timeout: The maximum number of seconds to wait. If None, then there - is no limit on the wait time. - - Returns: - An iterator that yields the given Futures as they complete (finished or - cancelled). - - Raises: - TimeoutError: If the entire result iterator could not be generated - before the given timeout. - """ - if timeout is not None: - end_time = timeout + time.time() - - with _AcquireFutures(fs): - finished = set( - f for f in fs - if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) - pending = set(fs) - finished - waiter = _create_and_install_waiters(fs, _AS_COMPLETED) - - try: - for future in finished: - yield future - - while pending: - if timeout is None: - wait_timeout = None - else: - wait_timeout = end_time - time.time() - if wait_timeout < 0: - raise TimeoutError( - '%d (of %d) futures unfinished' % ( - len(pending), len(fs))) - - waiter.event.wait(wait_timeout) - - with waiter.lock: - finished = waiter.finished_futures - waiter.finished_futures = [] - waiter.event.clear() - - for future in finished: - yield future - pending.remove(future) - - finally: - for f in fs: - f._waiters.remove(waiter) - -DoneAndNotDoneFutures = collections.namedtuple( - 'DoneAndNotDoneFutures', 'done not_done') -def wait(fs, timeout=None, return_when=ALL_COMPLETED): - """Wait for the futures in the given sequence to complete. - - Args: - fs: The sequence of Futures (possibly created by different Executors) to - wait upon. - timeout: The maximum number of seconds to wait. If None, then there - is no limit on the wait time. - return_when: Indicates when this function should return. The options - are: - - FIRST_COMPLETED - Return when any future finishes or is - cancelled. - FIRST_EXCEPTION - Return when any future finishes by raising an - exception. If no future raises an exception - then it is equivalent to ALL_COMPLETED. - ALL_COMPLETED - Return when all futures finish or are cancelled. - - Returns: - A named 2-tuple of sets. The first set, named 'done', contains the - futures that completed (is finished or cancelled) before the wait - completed. The second set, named 'not_done', contains uncompleted - futures. - """ - with _AcquireFutures(fs): - done = set(f for f in fs - if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) - not_done = set(fs) - done - - if (return_when == FIRST_COMPLETED) and done: - return DoneAndNotDoneFutures(done, not_done) - elif (return_when == FIRST_EXCEPTION) and done: - if any(f for f in done - if not f.cancelled() and f.exception() is not None): - return DoneAndNotDoneFutures(done, not_done) - - if len(done) == len(fs): - return DoneAndNotDoneFutures(done, not_done) - - waiter = _create_and_install_waiters(fs, return_when) - - waiter.event.wait(timeout) - for f in fs: - f._waiters.remove(waiter) - - done.update(waiter.finished_futures) - return DoneAndNotDoneFutures(done, set(fs) - done) - -class Future(object): - """Represents the result of an asynchronous computation.""" - - def __init__(self): - """Initializes the future. Should not be called by clients.""" - self._condition = threading.Condition() - self._state = PENDING - self._result = None - self._exception = None - self._waiters = [] - self._done_callbacks = [] - - def _invoke_callbacks(self): - for callback in self._done_callbacks: - try: - callback(self) - except Exception: - LOGGER.exception('exception calling callback for %r', self) - - def __repr__(self): - with self._condition: - if self._state == FINISHED: - if self._exception: - return '' % ( - hex(id(self)), - _STATE_TO_DESCRIPTION_MAP[self._state], - self._exception.__class__.__name__) - else: - return '' % ( - hex(id(self)), - _STATE_TO_DESCRIPTION_MAP[self._state], - self._result.__class__.__name__) - return '' % ( - hex(id(self)), - _STATE_TO_DESCRIPTION_MAP[self._state]) - - def cancel(self): - """Cancel the future if possible. - - Returns True if the future was cancelled, False otherwise. A future - cannot be cancelled if it is running or has already completed. - """ - with self._condition: - if self._state in [RUNNING, FINISHED]: - return False - - if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: - return True - - self._state = CANCELLED - self._condition.notify_all() - - self._invoke_callbacks() - return True - - def cancelled(self): - """Return True if the future has cancelled.""" - with self._condition: - return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED] - - def running(self): - """Return True if the future is currently executing.""" - with self._condition: - return self._state == RUNNING - - def done(self): - """Return True of the future was cancelled or finished executing.""" - with self._condition: - return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] - - def __get_result(self): - if self._exception: - raise self._exception - else: - return self._result - - def add_done_callback(self, fn): - """Attaches a callable that will be called when the future finishes. - - Args: - fn: A callable that will be called with this future as its only - argument when the future completes or is cancelled. The callable - will always be called by a thread in the same process in which - it was added. If the future has already completed or been - cancelled then the callable will be called immediately. These - callables are called in the order that they were added. - """ - with self._condition: - if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]: - self._done_callbacks.append(fn) - return - fn(self) - - def result(self, timeout=None): - """Return the result of the call that the future represents. - - Args: - timeout: The number of seconds to wait for the result if the future - isn't done. If None, then there is no limit on the wait time. - - Returns: - The result of the call that the future represents. - - Raises: - CancelledError: If the future was cancelled. - TimeoutError: If the future didn't finish executing before the given - timeout. - Exception: If the call raised then that exception will be raised. - """ - with self._condition: - if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: - raise CancelledError() - elif self._state == FINISHED: - return self.__get_result() - - self._condition.wait(timeout) - - if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: - raise CancelledError() - elif self._state == FINISHED: - return self.__get_result() - else: - raise TimeoutError() - - def exception(self, timeout=None): - """Return the exception raised by the call that the future represents. - - Args: - timeout: The number of seconds to wait for the exception if the - future isn't done. If None, then there is no limit on the wait - time. - - Returns: - The exception raised by the call that the future represents or None - if the call completed without raising. - - Raises: - CancelledError: If the future was cancelled. - TimeoutError: If the future didn't finish executing before the given - timeout. - """ - - with self._condition: - if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: - raise CancelledError() - elif self._state == FINISHED: - return self._exception - - self._condition.wait(timeout) - - if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: - raise CancelledError() - elif self._state == FINISHED: - return self._exception - else: - raise TimeoutError() - - # The following methods should only be used by Executors and in tests. - def set_running_or_notify_cancel(self): - """Mark the future as running or process any cancel notifications. - - Should only be used by Executor implementations and unit tests. - - If the future has been cancelled (cancel() was called and returned - True) then any threads waiting on the future completing (though calls - to as_completed() or wait()) are notified and False is returned. - - If the future was not cancelled then it is put in the running state - (future calls to running() will return True) and True is returned. - - This method should be called by Executor implementations before - executing the work associated with this future. If this method returns - False then the work should not be executed. - - Returns: - False if the Future was cancelled, True otherwise. - - Raises: - RuntimeError: if this method was already called or if set_result() - or set_exception() was called. - """ - with self._condition: - if self._state == CANCELLED: - self._state = CANCELLED_AND_NOTIFIED - for waiter in self._waiters: - waiter.add_cancelled(self) - # self._condition.notify_all() is not necessary because - # self.cancel() triggers a notification. - return False - elif self._state == PENDING: - self._state = RUNNING - return True - else: - LOGGER.critical('Future %s in unexpected state: %s', - id(self.future), - self.future._state) - raise RuntimeError('Future in unexpected state') - - def set_result(self, result): - """Sets the return value of work associated with the future. - - Should only be used by Executor implementations and unit tests. - """ - with self._condition: - self._result = result - self._state = FINISHED - for waiter in self._waiters: - waiter.add_result(self) - self._condition.notify_all() - self._invoke_callbacks() - - def set_exception(self, exception): - """Sets the result of the future as being the given exception. - - Should only be used by Executor implementations and unit tests. - """ - with self._condition: - self._exception = exception - self._state = FINISHED - for waiter in self._waiters: - waiter.add_exception(self) - self._condition.notify_all() - self._invoke_callbacks() - -class Executor(object): - """This is an abstract base class for concrete asynchronous executors.""" - - def submit(self, fn, *args, **kwargs): - """Submits a callable to be executed with the given arguments. - - Schedules the callable to be executed as fn(*args, **kwargs) and returns - a Future instance representing the execution of the callable. - - Returns: - A Future representing the given call. - """ - raise NotImplementedError() - - def map(self, fn, *iterables, timeout=None): - """Returns a iterator equivalent to map(fn, iter). - - Args: - fn: A callable that will take take as many arguments as there are - passed iterables. - timeout: The maximum number of seconds to wait. If None, then there - is no limit on the wait time. - - Returns: - An iterator equivalent to: map(func, *iterables) but the calls may - be evaluated out-of-order. - - Raises: - TimeoutError: If the entire result iterator could not be generated - before the given timeout. - Exception: If fn(*args) raises for any values. - """ - if timeout is not None: - end_time = timeout + time.time() - - fs = [self.submit(fn, *args) for args in zip(*iterables)] - - try: - for future in fs: - if timeout is None: - yield future.result() - else: - yield future.result(end_time - time.time()) - finally: - for future in fs: - future.cancel() - - def shutdown(self, wait=True): - """Clean-up the resources associated with the Executor. - - It is safe to call this method several times. Otherwise, no other - methods can be called after this one. - - Args: - wait: If True then shutdown will not return until all running - futures have finished executing and the resources used by the - executor have been reclaimed. - """ - pass - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.shutdown(wait=True) - return False diff --git a/python3/futures/process.py b/python3/futures/process.py deleted file mode 100644 index 6de870f..0000000 --- a/python3/futures/process.py +++ /dev/null @@ -1,337 +0,0 @@ -# Copyright 2009 Brian Quinlan. All Rights Reserved. -# Licensed to PSF under a Contributor Agreement. - -"""Implements ProcessPoolExecutor. - -The follow diagram and text describe the data-flow through the system: - -|======================= In-process =====================|== Out-of-process ==| - -+----------+ +----------+ +--------+ +-----------+ +---------+ -| | => | Work Ids | => | | => | Call Q | => | | -| | +----------+ | | +-----------+ | | -| | | ... | | | | ... | | | -| | | 6 | | | | 5, call() | | | -| | | 7 | | | | ... | | | -| Process | | ... | | Local | +-----------+ | Process | -| Pool | +----------+ | Worker | | #1..n | -| Executor | | Thread | | | -| | +----------- + | | +-----------+ | | -| | <=> | Work Items | <=> | | <= | Result Q | <= | | -| | +------------+ | | +-----------+ | | -| | | 6: call() | | | | ... | | | -| | | future | | | | 4, result | | | -| | | ... | | | | 3, except | | | -+----------+ +------------+ +--------+ +-----------+ +---------+ - -Executor.submit() called: -- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict -- adds the id of the _WorkItem to the "Work Ids" queue - -Local worker thread: -- reads work ids from the "Work Ids" queue and looks up the corresponding - WorkItem from the "Work Items" dict: if the work item has been cancelled then - it is simply removed from the dict, otherwise it is repackaged as a - _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q" - until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because - calls placed in the "Call Q" can no longer be cancelled with Future.cancel(). -- reads _ResultItems from "Result Q", updates the future stored in the - "Work Items" dict and deletes the dict entry - -Process #1..n: -- reads _CallItems from "Call Q", executes the calls, and puts the resulting - _ResultItems in "Request Q" -""" - -__author__ = 'Brian Quinlan (brian@sweetapp.com)' - -import atexit -from futures import _base -import queue -import multiprocessing -import threading -import weakref - -# Workers are created as daemon threads and processes. This is done to allow the -# interpreter to exit when there are still idle processes in a -# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However, -# allowing workers to die with the interpreter has two undesirable properties: -# - The workers would still be running during interpretor shutdown, -# meaning that they would fail in unpredictable ways. -# - The workers could be killed while evaluating a work item, which could -# be bad if the callable being evaluated has external side-effects e.g. -# writing to a file. -# -# To work around this problem, an exit handler is installed which tells the -# workers to exit when their work queues are empty and then waits until the -# threads/processes finish. - -_thread_references = set() -_shutdown = False - -def _python_exit(): - global _shutdown - _shutdown = True - for thread_reference in _thread_references: - thread = thread_reference() - if thread is not None: - thread.join() - -def _remove_dead_thread_references(): - """Remove inactive threads from _thread_references. - - Should be called periodically to prevent memory leaks in scenarios such as: - >>> while True: - >>> ... t = ThreadPoolExecutor(max_workers=5) - >>> ... t.map(int, ['1', '2', '3', '4', '5']) - """ - for thread_reference in set(_thread_references): - if thread_reference() is None: - _thread_references.discard(thread_reference) - -# Controls how many more calls than processes will be queued in the call queue. -# A smaller number will mean that processes spend more time idle waiting for -# work while a larger number will make Future.cancel() succeed less frequently -# (Futures in the call queue cannot be cancelled). -EXTRA_QUEUED_CALLS = 1 - -class _WorkItem(object): - def __init__(self, future, fn, args, kwargs): - self.future = future - self.fn = fn - self.args = args - self.kwargs = kwargs - -class _ResultItem(object): - def __init__(self, work_id, exception=None, result=None): - self.work_id = work_id - self.exception = exception - self.result = result - -class _CallItem(object): - def __init__(self, work_id, fn, args, kwargs): - self.work_id = work_id - self.fn = fn - self.args = args - self.kwargs = kwargs - -def _process_worker(call_queue, result_queue, shutdown): - """Evaluates calls from call_queue and places the results in result_queue. - - This worker is run in a seperate process. - - Args: - call_queue: A multiprocessing.Queue of _CallItems that will be read and - evaluated by the worker. - result_queue: A multiprocessing.Queue of _ResultItems that will written - to by the worker. - shutdown: A multiprocessing.Event that will be set as a signal to the - worker that it should exit when call_queue is empty. - """ - while True: - try: - call_item = call_queue.get(block=True, timeout=0.1) - except queue.Empty: - if shutdown.is_set(): - return - else: - try: - r = call_item.fn(*call_item.args, **call_item.kwargs) - except BaseException as e: - result_queue.put(_ResultItem(call_item.work_id, - exception=e)) - else: - result_queue.put(_ResultItem(call_item.work_id, - result=r)) - -def _add_call_item_to_queue(pending_work_items, - work_ids, - call_queue): - """Fills call_queue with _WorkItems from pending_work_items. - - This function never blocks. - - Args: - pending_work_items: A dict mapping work ids to _WorkItems e.g. - {5: <_WorkItem...>, 6: <_WorkItem...>, ...} - work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids - are consumed and the corresponding _WorkItems from - pending_work_items are transformed into _CallItems and put in - call_queue. - call_queue: A multiprocessing.Queue that will be filled with _CallItems - derived from _WorkItems. - """ - while True: - if call_queue.full(): - return - try: - work_id = work_ids.get(block=False) - except queue.Empty: - return - else: - work_item = pending_work_items[work_id] - - if work_item.future.set_running_or_notify_cancel(): - call_queue.put(_CallItem(work_id, - work_item.fn, - work_item.args, - work_item.kwargs), - block=True) - else: - del pending_work_items[work_id] - continue - -def _queue_manangement_worker(executor_reference, - processes, - pending_work_items, - work_ids_queue, - call_queue, - result_queue, - shutdown_process_event): - """Manages the communication between this process and the worker processes. - - This function is run in a local thread. - - Args: - executor_reference: A weakref.ref to the ProcessPoolExecutor that owns - this thread. Used to determine if the ProcessPoolExecutor has been - garbage collected and that this function can exit. - process: A list of the multiprocessing.Process instances used as - workers. - pending_work_items: A dict mapping work ids to _WorkItems e.g. - {5: <_WorkItem...>, 6: <_WorkItem...>, ...} - work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]). - call_queue: A multiprocessing.Queue that will be filled with _CallItems - derived from _WorkItems for processing by the process workers. - result_queue: A multiprocessing.Queue of _ResultItems generated by the - process workers. - shutdown_process_event: A multiprocessing.Event used to signal the - process workers that they should exit when their work queue is - empty. - """ - while True: - _add_call_item_to_queue(pending_work_items, - work_ids_queue, - call_queue) - - try: - result_item = result_queue.get(block=True, timeout=0.1) - except queue.Empty: - executor = executor_reference() - # No more work items can be added if: - # - The interpreter is shutting down OR - # - The executor that owns this worker has been collected OR - # - The executor that owns this worker has been shutdown. - if _shutdown or executor is None or executor._shutdown_thread: - # Since no new work items can be added, it is safe to shutdown - # this thread if there are no pending work items. - if not pending_work_items: - shutdown_process_event.set() - - # If .join() is not called on the created processes then - # some multiprocessing.Queue methods may deadlock on Mac OS - # X. - for p in processes: - p.join() - return - del executor - else: - work_item = pending_work_items[result_item.work_id] - del pending_work_items[result_item.work_id] - - if result_item.exception: - work_item.future.set_exception(result_item.exception) - else: - work_item.future.set_result(result_item.result) - -class ProcessPoolExecutor(_base.Executor): - def __init__(self, max_workers=None): - """Initializes a new ProcessPoolExecutor instance. - - Args: - max_workers: The maximum number of processes that can be used to - execute the given calls. If None or not given then as many - worker processes will be created as the machine has processors. - """ - _remove_dead_thread_references() - - if max_workers is None: - self._max_workers = multiprocessing.cpu_count() - else: - self._max_workers = max_workers - - # Make the call queue slightly larger than the number of processes to - # prevent the worker processes from idling. But don't make it too big - # because futures in the call queue cannot be cancelled. - self._call_queue = multiprocessing.Queue(self._max_workers + - EXTRA_QUEUED_CALLS) - self._result_queue = multiprocessing.Queue() - self._work_ids = queue.Queue() - self._queue_management_thread = None - self._processes = set() - - # Shutdown is a two-step process. - self._shutdown_thread = False - self._shutdown_process_event = multiprocessing.Event() - self._shutdown_lock = threading.Lock() - self._queue_count = 0 - self._pending_work_items = {} - - def _start_queue_management_thread(self): - if self._queue_management_thread is None: - self._queue_management_thread = threading.Thread( - target=_queue_manangement_worker, - args=(weakref.ref(self), - self._processes, - self._pending_work_items, - self._work_ids, - self._call_queue, - self._result_queue, - self._shutdown_process_event)) - self._queue_management_thread.daemon = True - self._queue_management_thread.start() - _thread_references.add(weakref.ref(self._queue_management_thread)) - - def _adjust_process_count(self): - for _ in range(len(self._processes), self._max_workers): - p = multiprocessing.Process( - target=_process_worker, - args=(self._call_queue, - self._result_queue, - self._shutdown_process_event)) - p.start() - self._processes.add(p) - - def submit(self, fn, *args, **kwargs): - with self._shutdown_lock: - if self._shutdown_thread: - raise RuntimeError('cannot schedule new futures after shutdown') - - f = _base.Future() - w = _WorkItem(f, fn, args, kwargs) - - self._pending_work_items[self._queue_count] = w - self._work_ids.put(self._queue_count) - self._queue_count += 1 - - self._start_queue_management_thread() - self._adjust_process_count() - return f - submit.__doc__ = _base.Executor.submit.__doc__ - - def shutdown(self, wait=True): - with self._shutdown_lock: - self._shutdown_thread = True - if wait: - if self._queue_management_thread: - self._queue_management_thread.join() - # To reduce the risk of openning too many files, remove references to - # objects that use file descriptors. - self._queue_management_thread = None - self._call_queue = None - self._result_queue = None - self._shutdown_process_event = None - self._processes = None - shutdown.__doc__ = _base.Executor.shutdown.__doc__ - -atexit.register(_python_exit) diff --git a/python3/futures/thread.py b/python3/futures/thread.py deleted file mode 100644 index a2d96bf..0000000 --- a/python3/futures/thread.py +++ /dev/null @@ -1,136 +0,0 @@ -# Copyright 2009 Brian Quinlan. All Rights Reserved. -# Licensed to PSF under a Contributor Agreement. - -"""Implements ThreadPoolExecutor.""" - -__author__ = 'Brian Quinlan (brian@sweetapp.com)' - -import atexit -from futures import _base -import queue -import threading -import weakref - -# Workers are created as daemon threads. This is done to allow the interpreter -# to exit when there are still idle threads in a ThreadPoolExecutor's thread -# pool (i.e. shutdown() was not called). However, allowing workers to die with -# the interpreter has two undesirable properties: -# - The workers would still be running during interpretor shutdown, -# meaning that they would fail in unpredictable ways. -# - The workers could be killed while evaluating a work item, which could -# be bad if the callable being evaluated has external side-effects e.g. -# writing to a file. -# -# To work around this problem, an exit handler is installed which tells the -# workers to exit when their work queues are empty and then waits until the -# threads finish. - -_thread_references = set() -_shutdown = False - -def _python_exit(): - global _shutdown - _shutdown = True - for thread_reference in _thread_references: - thread = thread_reference() - if thread is not None: - thread.join() - -def _remove_dead_thread_references(): - """Remove inactive threads from _thread_references. - - Should be called periodically to prevent memory leaks in scenarios such as: - >>> while True: - ... t = ThreadPoolExecutor(max_workers=5) - ... t.map(int, ['1', '2', '3', '4', '5']) - """ - for thread_reference in set(_thread_references): - if thread_reference() is None: - _thread_references.discard(thread_reference) - -atexit.register(_python_exit) - -class _WorkItem(object): - def __init__(self, future, fn, args, kwargs): - self.future = future - self.fn = fn - self.args = args - self.kwargs = kwargs - - def run(self): - if not self.future.set_running_or_notify_cancel(): - return - - try: - result = self.fn(*self.args, **self.kwargs) - except BaseException as e: - self.future.set_exception(e) - else: - self.future.set_result(result) - -def _worker(executor_reference, work_queue): - try: - while True: - try: - work_item = work_queue.get(block=True, timeout=0.1) - except queue.Empty: - executor = executor_reference() - # Exit if: - # - The interpreter is shutting down OR - # - The executor that owns the worker has been collected OR - # - The executor that owns the worker has been shutdown. - if _shutdown or executor is None or executor._shutdown: - return - del executor - else: - work_item.run() - except BaseException as e: - _base.LOGGER.critical('Exception in worker', exc_info=True) - -class ThreadPoolExecutor(_base.Executor): - def __init__(self, max_workers): - """Initializes a new ThreadPoolExecutor instance. - - Args: - max_workers: The maximum number of threads that can be used to - execute the given calls. - """ - _remove_dead_thread_references() - - self._max_workers = max_workers - self._work_queue = queue.Queue() - self._threads = set() - self._shutdown = False - self._shutdown_lock = threading.Lock() - - def submit(self, fn, *args, **kwargs): - with self._shutdown_lock: - if self._shutdown: - raise RuntimeError('cannot schedule new futures after shutdown') - - f = _base.Future() - w = _WorkItem(f, fn, args, kwargs) - - self._work_queue.put(w) - self._adjust_thread_count() - return f - submit.__doc__ = _base.Executor.submit.__doc__ - - def _adjust_thread_count(self): - # TODO(bquinlan): Should avoid creating new threads if there are more - # idle threads than items in the work queue. - if len(self._threads) < self._max_workers: - t = threading.Thread(target=_worker, - args=(weakref.ref(self), self._work_queue)) - t.daemon = True - t.start() - self._threads.add(t) - _thread_references.add(weakref.ref(t)) - - def shutdown(self, wait=True): - with self._shutdown_lock: - self._shutdown = True - if wait: - for t in self._threads: - t.join() - shutdown.__doc__ = _base.Executor.shutdown.__doc__ diff --git a/python3/primes.py b/python3/primes.py deleted file mode 100644 index 5152fcb..0000000 --- a/python3/primes.py +++ /dev/null @@ -1,47 +0,0 @@ -import futures -import math -import time - -PRIMES = [ - 112272535095293, - 112582705942171, - 112272535095293, - 115280095190773, - 115797848077099, - 117450548693743, - 993960000099397] - -def is_prime(n): - if n % 2 == 0: - return False - - sqrt_n = int(math.floor(math.sqrt(n))) - for i in range(3, sqrt_n + 1, 2): - if n % i == 0: - return False - return True - -def sequential(): - return list(map(is_prime, PRIMES)) - -def with_process_pool_executor(): - with futures.ProcessPoolExecutor(10) as executor: - return list(executor.map(is_prime, PRIMES)) - -def with_thread_pool_executor(): - with futures.ThreadPoolExecutor(10) as executor: - return list(executor.map(is_prime, PRIMES)) - -def main(): - for name, fn in [('sequential', sequential), - ('processes', with_process_pool_executor), - ('threads', with_thread_pool_executor)]: - print('%s: ' % name.ljust(12), end='') - start = time.time() - if fn() != [True] * len(PRIMES): - print('failed') - else: - print('%.2f seconds' % (time.time() - start)) - -if __name__ == '__main__': - main() diff --git a/python3/setup.py b/python3/setup.py deleted file mode 100755 index fcd05f2..0000000 --- a/python3/setup.py +++ /dev/null @@ -1,18 +0,0 @@ -#!/usr/bin/env python3 - -from distutils.core import setup - -setup(name='futures3', - version='1.0', - description='Java-style futures implementation in Python 3.x', - author='Brian Quinlan', - author_email='brian@sweetapp.com', - url='http://code.google.com/p/pythonfutures', - download_url='http://pypi.python.org/pypi/futures3/', - packages=['futures'], - license='BSD', - classifiers=['License :: OSI Approved :: BSD License', - 'Development Status :: 5 - Production/Stable', - 'Intended Audience :: Developers', - 'Programming Language :: Python :: 3'] - ) diff --git a/python3/test_futures.py b/python3/test_futures.py deleted file mode 100644 index 98eea27..0000000 --- a/python3/test_futures.py +++ /dev/null @@ -1,819 +0,0 @@ -import io -import logging -import multiprocessing -import sys -import threading -import test.support -import time -import unittest - -if sys.platform.startswith('win'): - import ctypes - import ctypes.wintypes - -import futures -from futures._base import ( - PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future, - LOGGER, STDERR_HANDLER, wait) -import futures.process - -def create_future(state=PENDING, exception=None, result=None): - f = Future() - f._state = state - f._exception = exception - f._result = result - return f - -PENDING_FUTURE = create_future(state=PENDING) -RUNNING_FUTURE = create_future(state=RUNNING) -CANCELLED_FUTURE = create_future(state=CANCELLED) -CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED) -EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError()) -SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42) - -def mul(x, y): - return x * y - -class Call(object): - """A call that can be submitted to a future.Executor for testing. - - The call signals when it is called and waits for an event before finishing. - """ - CALL_LOCKS = {} - def _create_event(self): - if sys.platform.startswith('win'): - class SECURITY_ATTRIBUTES(ctypes.Structure): - _fields_ = [("nLength", ctypes.wintypes.DWORD), - ("lpSecurityDescriptor", ctypes.wintypes.LPVOID), - ("bInheritHandle", ctypes.wintypes.BOOL)] - - s = SECURITY_ATTRIBUTES() - s.nLength = ctypes.sizeof(s) - s.lpSecurityDescriptor = None - s.bInheritHandle = True - - handle = ctypes.windll.kernel32.CreateEventA(ctypes.pointer(s), - True, - False, - None) - assert handle is not None - return handle - else: - event = multiprocessing.Event() - self.CALL_LOCKS[id(event)] = event - return id(event) - - def _wait_on_event(self, handle): - if sys.platform.startswith('win'): - r = ctypes.windll.kernel32.WaitForSingleObject(handle, 5 * 1000) - assert r == 0 - else: - self.CALL_LOCKS[handle].wait() - - def _signal_event(self, handle): - if sys.platform.startswith('win'): - r = ctypes.windll.kernel32.SetEvent(handle) - assert r != 0 - else: - self.CALL_LOCKS[handle].set() - - def __init__(self, manual_finish=False, result=42): - self._called_event = self._create_event() - self._can_finish = self._create_event() - - self._result = result - - if not manual_finish: - self._signal_event(self._can_finish) - - def wait_on_called(self): - self._wait_on_event(self._called_event) - - def set_can(self): - self._signal_event(self._can_finish) - - def __call__(self): - self._signal_event(self._called_event) - self._wait_on_event(self._can_finish) - - return self._result - - def close(self): - self.set_can() - if sys.platform.startswith('win'): - ctypes.windll.kernel32.CloseHandle(self._called_event) - ctypes.windll.kernel32.CloseHandle(self._can_finish) - else: - del self.CALL_LOCKS[self._called_event] - del self.CALL_LOCKS[self._can_finish] - -class ExceptionCall(Call): - def __call__(self): - self._signal_event(self._called_event) - self._wait_on_event(self._can_finish) - raise ZeroDivisionError() - -class MapCall(Call): - def __init__(self, result=42): - super().__init__(manual_finish=True, result=result) - - def __call__(self, manual_finish): - if manual_finish: - super().__call__() - return self._result - -class ExecutorShutdownTest(unittest.TestCase): - def test_run_after_shutdown(self): - self.executor.shutdown() - self.assertRaises(RuntimeError, - self.executor.submit, - pow, 2, 5) - - - def _start_some_futures(self): - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) - call3 = Call(manual_finish=True) - - try: - self.executor.submit(call1) - self.executor.submit(call2) - self.executor.submit(call3) - - call1.wait_on_called() - call2.wait_on_called() - call3.wait_on_called() - - call1.set_can() - call2.set_can() - call3.set_can() - finally: - call1.close() - call2.close() - call3.close() - -class ThreadPoolShutdownTest(ExecutorShutdownTest): - def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_workers=5) - - def tearDown(self): - self.executor.shutdown(wait=True) - - def test_threads_terminate(self): - self._start_some_futures() - self.assertEqual(len(self.executor._threads), 3) - self.executor.shutdown() - for t in self.executor._threads: - t.join() - - def test_context_manager_shutdown(self): - with futures.ThreadPoolExecutor(max_workers=5) as e: - executor = e - self.assertEqual(list(e.map(abs, range(-5, 5))), - [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) - - for t in executor._threads: - t.join() - - def test_del_shutdown(self): - executor = futures.ThreadPoolExecutor(max_workers=5) - executor.map(abs, range(-5, 5)) - threads = executor._threads - del executor - - for t in threads: - t.join() - -class ProcessPoolShutdownTest(ExecutorShutdownTest): - def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_workers=5) - - def tearDown(self): - self.executor.shutdown(wait=True) - - def test_processes_terminate(self): - self._start_some_futures() - self.assertEqual(len(self.executor._processes), 5) - processes = self.executor._processes - self.executor.shutdown() - - for p in processes: - p.join() - - def test_context_manager_shutdown(self): - with futures.ProcessPoolExecutor(max_workers=5) as e: - executor = e - self.assertEqual(list(e.map(abs, range(-5, 5))), - [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) - - for p in self.executor._processes: - p.join() - - def test_del_shutdown(self): - executor = futures.ProcessPoolExecutor(max_workers=5) - list(executor.map(abs, range(-5, 5))) - queue_management_thread = executor._queue_management_thread - processes = executor._processes - del executor - - queue_management_thread.join() - for p in processes: - p.join() - -class WaitTests(unittest.TestCase): - def test_first_completed(self): - def wait_test(): - while not future1._waiters: - pass - call1.set_can() - - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - - t = threading.Thread(target=wait_test) - t.start() - done, not_done = futures.wait( - [CANCELLED_FUTURE, future1, future2], - return_when=futures.FIRST_COMPLETED) - - self.assertEquals(set([future1]), done) - self.assertEquals(set([CANCELLED_FUTURE, future2]), not_done) - finally: - call1.close() - call2.close() - - def test_first_completed_one_already_completed(self): - call1 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - - finished, pending = futures.wait( - [SUCCESSFUL_FUTURE, future1], - return_when=futures.FIRST_COMPLETED) - - self.assertEquals(set([SUCCESSFUL_FUTURE]), finished) - self.assertEquals(set([future1]), pending) - finally: - call1.close() - - def test_first_exception(self): - def wait_test(): - while not future1._waiters: - pass - call1.set_can() - call2.set_can() - - call1 = Call(manual_finish=True) - call2 = ExceptionCall(manual_finish=True) - call3 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - future3 = self.executor.submit(call3) - - t = threading.Thread(target=wait_test) - t.start() - finished, pending = futures.wait( - [future1, future2, future3], - return_when=futures.FIRST_EXCEPTION) - - self.assertEquals(set([future1, future2]), finished) - self.assertEquals(set([future3]), pending) - finally: - call1.close() - call2.close() - call3.close() - - def test_first_exception_some_already_complete(self): - def wait_test(): - while not future1._waiters: - pass - call1.set_can() - - call1 = ExceptionCall(manual_finish=True) - call2 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - - t = threading.Thread(target=wait_test) - t.start() - finished, pending = futures.wait( - [SUCCESSFUL_FUTURE, - CANCELLED_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - future1, future2], - return_when=futures.FIRST_EXCEPTION) - - self.assertEquals(set([SUCCESSFUL_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - future1]), finished) - self.assertEquals(set([CANCELLED_FUTURE, future2]), pending) - - - finally: - call1.close() - call2.close() - - def test_first_exception_one_already_failed(self): - call1 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - - finished, pending = futures.wait( - [EXCEPTION_FUTURE, future1], - return_when=futures.FIRST_EXCEPTION) - - self.assertEquals(set([EXCEPTION_FUTURE]), finished) - self.assertEquals(set([future1]), pending) - finally: - call1.close() - - def test_all_completed(self): - def wait_test(): - while not future1._waiters: - pass - call1.set_can() - call2.set_can() - - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - - t = threading.Thread(target=wait_test) - t.start() - finished, pending = futures.wait( - [future1, future2], - return_when=futures.ALL_COMPLETED) - - self.assertEquals(set([future1, future2]), finished) - self.assertEquals(set(), pending) - - - finally: - call1.close() - call2.close() - - def test_all_completed_some_already_completed(self): - def wait_test(): - while not future1._waiters: - pass - - future4.cancel() - call1.set_can() - call2.set_can() - call3.set_can() - - self.assertLessEqual( - futures.process.EXTRA_QUEUED_CALLS, - 1, - 'this test assumes that future4 will be cancelled before it is ' - 'queued to run - which might not be the case if ' - 'ProcessPoolExecutor is too aggresive in scheduling futures') - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) - call3 = Call(manual_finish=True) - call4 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - future3 = self.executor.submit(call3) - future4 = self.executor.submit(call4) - - t = threading.Thread(target=wait_test) - t.start() - finished, pending = futures.wait( - [SUCCESSFUL_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - future1, future2, future3, future4], - return_when=futures.ALL_COMPLETED) - - self.assertEquals(set([SUCCESSFUL_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - future1, future2, future3, future4]), - finished) - self.assertEquals(set(), pending) - finally: - call1.close() - call2.close() - call3.close() - call4.close() - - def test_timeout(self): - def wait_test(): - while not future1._waiters: - pass - call1.set_can() - - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - - t = threading.Thread(target=wait_test) - t.start() - finished, pending = futures.wait( - [CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE, - future1, future2], - timeout=1, - return_when=futures.ALL_COMPLETED) - - self.assertEquals(set([CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE, - future1]), finished) - self.assertEquals(set([future2]), pending) - - - finally: - call1.close() - call2.close() - - -class ThreadPoolWaitTests(WaitTests): - def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_workers=1) - - def tearDown(self): - self.executor.shutdown(wait=True) - -class ProcessPoolWaitTests(WaitTests): - def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_workers=1) - - def tearDown(self): - self.executor.shutdown(wait=True) - -class AsCompletedTests(unittest.TestCase): - # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout. - def test_no_timeout(self): - def wait_test(): - while not future1._waiters: - pass - call1.set_can() - call2.set_can() - - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - - t = threading.Thread(target=wait_test) - t.start() - completed = set(futures.as_completed( - [CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE, - future1, future2])) - self.assertEquals(set( - [CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE, - future1, future2]), - completed) - finally: - call1.close() - call2.close() - - def test_zero_timeout(self): - call1 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - completed_futures = set() - try: - for future in futures.as_completed( - [CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE, - future1], - timeout=0): - completed_futures.add(future) - except futures.TimeoutError: - pass - - self.assertEquals(set([CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE]), - completed_futures) - finally: - call1.close() - -class ThreadPoolAsCompletedTests(AsCompletedTests): - def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_workers=1) - - def tearDown(self): - self.executor.shutdown(wait=True) - -class ProcessPoolAsCompletedTests(AsCompletedTests): - def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_workers=1) - - def tearDown(self): - self.executor.shutdown(wait=True) - -class ExecutorTest(unittest.TestCase): - # Executor.shutdown() and context manager usage is tested by - # ExecutorShutdownTest. - def test_submit(self): - future = self.executor.submit(pow, 2, 8) - self.assertEquals(256, future.result()) - - def test_submit_keyword(self): - future = self.executor.submit(mul, 2, y=8) - self.assertEquals(16, future.result()) - - def test_map(self): - self.assertEqual( - list(self.executor.map(pow, range(10), range(10))), - list(map(pow, range(10), range(10)))) - - def test_map_exception(self): - i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5]) - self.assertEqual(i.__next__(), (0, 1)) - self.assertEqual(i.__next__(), (0, 1)) - self.assertRaises(ZeroDivisionError, i.__next__) - - def test_map_timeout(self): - results = [] - timeout_call = MapCall() - try: - try: - for i in self.executor.map(timeout_call, - [False, False, True], - timeout=1): - results.append(i) - except futures.TimeoutError: - pass - else: - self.fail('expected TimeoutError') - finally: - timeout_call.close() - - self.assertEquals([42, 42], results) - -class ThreadPoolExecutorTest(ExecutorTest): - def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_workers=1) - - def tearDown(self): - self.executor.shutdown(wait=True) - -class ProcessPoolExecutorTest(ExecutorTest): - def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_workers=1) - - def tearDown(self): - self.executor.shutdown(wait=True) - -class FutureTests(unittest.TestCase): - def test_done_callback_with_result(self): - callback_result = None - def fn(callback_future): - nonlocal callback_result - callback_result = callback_future.result() - - f = Future() - f.add_done_callback(fn) - f.set_result(5) - self.assertEquals(5, callback_result) - - def test_done_callback_with_exception(self): - callback_exception = None - def fn(callback_future): - nonlocal callback_exception - callback_exception = callback_future.exception() - - f = Future() - f.add_done_callback(fn) - f.set_exception(Exception('test')) - self.assertEquals(('test',), callback_exception.args) - - def test_done_callback_with_cancel(self): - was_cancelled = None - def fn(callback_future): - nonlocal was_cancelled - was_cancelled = callback_future.cancelled() - - f = Future() - f.add_done_callback(fn) - self.assertTrue(f.cancel()) - self.assertTrue(was_cancelled) - - def test_done_callback_raises(self): - LOGGER.removeHandler(STDERR_HANDLER) - logging_stream = io.StringIO() - handler = logging.StreamHandler(logging_stream) - LOGGER.addHandler(handler) - try: - raising_was_called = False - fn_was_called = False - - def raising_fn(callback_future): - nonlocal raising_was_called - raising_was_called = True - raise Exception('doh!') - - def fn(callback_future): - nonlocal fn_was_called - fn_was_called = True - - f = Future() - f.add_done_callback(raising_fn) - f.add_done_callback(fn) - f.set_result(5) - self.assertTrue(raising_was_called) - self.assertTrue(fn_was_called) - self.assertIn('Exception: doh!', logging_stream.getvalue()) - finally: - LOGGER.removeHandler(handler) - LOGGER.addHandler(STDERR_HANDLER) - - def test_done_callback_already_successful(self): - callback_result = None - def fn(callback_future): - nonlocal callback_result - callback_result = callback_future.result() - - f = Future() - f.set_result(5) - f.add_done_callback(fn) - self.assertEquals(5, callback_result) - - def test_done_callback_already_failed(self): - callback_exception = None - def fn(callback_future): - nonlocal callback_exception - callback_exception = callback_future.exception() - - f = Future() - f.set_exception(Exception('test')) - f.add_done_callback(fn) - self.assertEquals(('test',), callback_exception.args) - - def test_done_callback_already_cancelled(self): - was_cancelled = None - def fn(callback_future): - nonlocal was_cancelled - was_cancelled = callback_future.cancelled() - - f = Future() - self.assertTrue(f.cancel()) - f.add_done_callback(fn) - self.assertTrue(was_cancelled) - - def test_repr(self): - self.assertRegexpMatches(repr(PENDING_FUTURE), - '') - self.assertRegexpMatches(repr(RUNNING_FUTURE), - '') - self.assertRegexpMatches(repr(CANCELLED_FUTURE), - '') - self.assertRegexpMatches(repr(CANCELLED_AND_NOTIFIED_FUTURE), - '') - self.assertRegexpMatches( - repr(EXCEPTION_FUTURE), - '') - self.assertRegexpMatches( - repr(SUCCESSFUL_FUTURE), - '') - - - def test_cancel(self): - f1 = create_future(state=PENDING) - f2 = create_future(state=RUNNING) - f3 = create_future(state=CANCELLED) - f4 = create_future(state=CANCELLED_AND_NOTIFIED) - f5 = create_future(state=FINISHED, exception=IOError()) - f6 = create_future(state=FINISHED, result=5) - - self.assertTrue(f1.cancel()) - self.assertEquals(f1._state, CANCELLED) - - self.assertFalse(f2.cancel()) - self.assertEquals(f2._state, RUNNING) - - self.assertTrue(f3.cancel()) - self.assertEquals(f3._state, CANCELLED) - - self.assertTrue(f4.cancel()) - self.assertEquals(f4._state, CANCELLED_AND_NOTIFIED) - - self.assertFalse(f5.cancel()) - self.assertEquals(f5._state, FINISHED) - - self.assertFalse(f6.cancel()) - self.assertEquals(f6._state, FINISHED) - - def test_cancelled(self): - self.assertFalse(PENDING_FUTURE.cancelled()) - self.assertFalse(RUNNING_FUTURE.cancelled()) - self.assertTrue(CANCELLED_FUTURE.cancelled()) - self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled()) - self.assertFalse(EXCEPTION_FUTURE.cancelled()) - self.assertFalse(SUCCESSFUL_FUTURE.cancelled()) - - def test_done(self): - self.assertFalse(PENDING_FUTURE.done()) - self.assertFalse(RUNNING_FUTURE.done()) - self.assertTrue(CANCELLED_FUTURE.done()) - self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done()) - self.assertTrue(EXCEPTION_FUTURE.done()) - self.assertTrue(SUCCESSFUL_FUTURE.done()) - - def test_running(self): - self.assertFalse(PENDING_FUTURE.running()) - self.assertTrue(RUNNING_FUTURE.running()) - self.assertFalse(CANCELLED_FUTURE.running()) - self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running()) - self.assertFalse(EXCEPTION_FUTURE.running()) - self.assertFalse(SUCCESSFUL_FUTURE.running()) - - def test_result_with_timeout(self): - self.assertRaises(futures.TimeoutError, - PENDING_FUTURE.result, timeout=0) - self.assertRaises(futures.TimeoutError, - RUNNING_FUTURE.result, timeout=0) - self.assertRaises(futures.CancelledError, - CANCELLED_FUTURE.result, timeout=0) - self.assertRaises(futures.CancelledError, - CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0) - self.assertRaises(IOError, EXCEPTION_FUTURE.result, timeout=0) - self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42) - - def test_result_with_success(self): - # TODO(brian@sweetapp.com): This test is timing dependant. - def notification(): - # Wait until the main thread is waiting for the result. - time.sleep(1) - f1.set_result(42) - - f1 = create_future(state=PENDING) - t = threading.Thread(target=notification) - t.start() - - self.assertEquals(f1.result(timeout=5), 42) - - def test_result_with_cancel(self): - # TODO(brian@sweetapp.com): This test is timing dependant. - def notification(): - # Wait until the main thread is waiting for the result. - time.sleep(1) - f1.cancel() - - f1 = create_future(state=PENDING) - t = threading.Thread(target=notification) - t.start() - - self.assertRaises(futures.CancelledError, f1.result, timeout=5) - - def test_exception_with_timeout(self): - self.assertRaises(futures.TimeoutError, - PENDING_FUTURE.exception, timeout=0) - self.assertRaises(futures.TimeoutError, - RUNNING_FUTURE.exception, timeout=0) - self.assertRaises(futures.CancelledError, - CANCELLED_FUTURE.exception, timeout=0) - self.assertRaises(futures.CancelledError, - CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0) - self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0), - IOError)) - self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None) - - def test_exception_with_success(self): - def notification(): - # Wait until the main thread is waiting for the exception. - time.sleep(1) - with f1._condition: - f1._state = FINISHED - f1._exception = IOError() - f1._condition.notify_all() - - f1 = create_future(state=PENDING) - t = threading.Thread(target=notification) - t.start() - - self.assertTrue(isinstance(f1.exception(timeout=5), IOError)) - -def test_main(): - test.support.run_unittest(ProcessPoolExecutorTest, - ThreadPoolExecutorTest, - ProcessPoolWaitTests, - ThreadPoolWaitTests, - ProcessPoolAsCompletedTests, - ThreadPoolAsCompletedTests, - FutureTests, - ProcessPoolShutdownTest, - ThreadPoolShutdownTest) - -if __name__ == "__main__": - test_main() diff --git a/setup.py b/setup.py new file mode 100755 index 0000000..744ca96 --- /dev/null +++ b/setup.py @@ -0,0 +1,32 @@ +#!/usr/bin/env python +import sys + +extras = {} +try: + from setuptools import setup + if sys.version_info < (2, 6): + extras['install_requires'] = ['multiprocessing'] +except: + from distutils.core import setup + +setup(name='futures', + version='2.1', + description='Backport of the concurrent.futures package from Python 3.2', + author='Brian Quinlan', + author_email='brian@sweetapp.com', + maintainer='Alex Gronholm', + maintainer_email='alex.gronholm+pypi@nextday.fi', + url='http://code.google.com/p/pythonfutures', + download_url='http://pypi.python.org/pypi/futures3/', + packages=['futures', 'concurrent.futures'], + license='BSD', + classifiers=['License :: OSI Approved :: BSD License', + 'Development Status :: 5 - Production/Stable', + 'Intended Audience :: Developers', + 'Programming Language :: Python :: 2.5', + 'Programming Language :: Python :: 2.6', + 'Programming Language :: Python :: 2.7', + 'Programming Language :: Python :: 3', + 'Programming Language :: Python :: 3.1'], + **extras + ) diff --git a/test_futures.py b/test_futures.py new file mode 100644 index 0000000..bbc21a0 --- /dev/null +++ b/test_futures.py @@ -0,0 +1,821 @@ +from __future__ import with_statement +import logging +import multiprocessing +import re +import sys +import threading +import time +import unittest + +try: + from StringIO import StringIO +except ImportError: + from io import StringIO + +try: + from test.test_support import run_unittest +except ImportError: + from test.support import run_unittest + +if sys.version_info < (3, 0): + next = lambda x: x.next() + +if sys.platform.startswith('win'): + import ctypes + import ctypes.wintypes + +from concurrent import futures +from concurrent.futures._base import ( + PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future, + LOGGER, STDERR_HANDLER) + +def create_future(state=PENDING, exception=None, result=None): + f = Future() + f._state = state + f._exception = exception + f._result = result + return f + +PENDING_FUTURE = create_future(state=PENDING) +RUNNING_FUTURE = create_future(state=RUNNING) +CANCELLED_FUTURE = create_future(state=CANCELLED) +CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED) +EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError()) +SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42) + +def mul(x, y): + return x * y + +class Call(object): + """A call that can be submitted to a future.Executor for testing. + + The call signals when it is called and waits for an event before finishing. + """ + CALL_LOCKS = {} + def _create_event(self): + if sys.platform.startswith('win'): + class SECURITY_ATTRIBUTES(ctypes.Structure): + _fields_ = [("nLength", ctypes.wintypes.DWORD), + ("lpSecurityDescriptor", ctypes.wintypes.LPVOID), + ("bInheritHandle", ctypes.wintypes.BOOL)] + + s = SECURITY_ATTRIBUTES() + s.nLength = ctypes.sizeof(s) + s.lpSecurityDescriptor = None + s.bInheritHandle = True + + handle = ctypes.windll.kernel32.CreateEventA(ctypes.pointer(s), + True, + False, + None) + assert handle is not None + return handle + else: + event = multiprocessing.Event() + self.CALL_LOCKS[id(event)] = event + return id(event) + + def _wait_on_event(self, handle): + if sys.platform.startswith('win'): + r = ctypes.windll.kernel32.WaitForSingleObject(handle, 5 * 1000) + assert r == 0 + else: + self.CALL_LOCKS[handle].wait() + + def _signal_event(self, handle): + if sys.platform.startswith('win'): + r = ctypes.windll.kernel32.SetEvent(handle) + assert r != 0 + else: + self.CALL_LOCKS[handle].set() + + def __init__(self, manual_finish=False, result=42): + self._called_event = self._create_event() + self._can_finish = self._create_event() + + self._result = result + + if not manual_finish: + self._signal_event(self._can_finish) + + def wait_on_called(self): + self._wait_on_event(self._called_event) + + def set_can(self): + self._signal_event(self._can_finish) + + def __call__(self): + self._signal_event(self._called_event) + self._wait_on_event(self._can_finish) + + return self._result + + def close(self): + self.set_can() + if sys.platform.startswith('win'): + ctypes.windll.kernel32.CloseHandle(self._called_event) + ctypes.windll.kernel32.CloseHandle(self._can_finish) + else: + del self.CALL_LOCKS[self._called_event] + del self.CALL_LOCKS[self._can_finish] + +class ExceptionCall(Call): + def __call__(self): + self._signal_event(self._called_event) + self._wait_on_event(self._can_finish) + raise ZeroDivisionError() + +class MapCall(Call): + def __init__(self, result=42): + super(MapCall, self).__init__(manual_finish=True, result=result) + + def __call__(self, manual_finish): + if manual_finish: + super(MapCall, self).__call__() + return self._result + +class ExecutorShutdownTest(unittest.TestCase): + def test_run_after_shutdown(self): + self.executor.shutdown() + self.assertRaises(RuntimeError, + self.executor.submit, + pow, 2, 5) + + + def _start_some_futures(self): + call1 = Call(manual_finish=True) + call2 = Call(manual_finish=True) + call3 = Call(manual_finish=True) + + try: + self.executor.submit(call1) + self.executor.submit(call2) + self.executor.submit(call3) + + call1.wait_on_called() + call2.wait_on_called() + call3.wait_on_called() + + call1.set_can() + call2.set_can() + call3.set_can() + finally: + call1.close() + call2.close() + call3.close() + +class ThreadPoolShutdownTest(ExecutorShutdownTest): + def setUp(self): + self.executor = futures.ThreadPoolExecutor(max_workers=5) + + def tearDown(self): + self.executor.shutdown(wait=True) + + def test_threads_terminate(self): + self._start_some_futures() + self.assertEqual(len(self.executor._threads), 3) + self.executor.shutdown() + for t in self.executor._threads: + t.join() + + def test_context_manager_shutdown(self): + with futures.ThreadPoolExecutor(max_workers=5) as e: + executor = e + self.assertEqual(list(e.map(abs, range(-5, 5))), + [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) + + for t in executor._threads: + t.join() + + def test_del_shutdown(self): + executor = futures.ThreadPoolExecutor(max_workers=5) + executor.map(abs, range(-5, 5)) + threads = executor._threads + del executor + + for t in threads: + t.join() + +class ProcessPoolShutdownTest(ExecutorShutdownTest): + def setUp(self): + self.executor = futures.ProcessPoolExecutor(max_workers=5) + + def tearDown(self): + self.executor.shutdown(wait=True) + + def test_processes_terminate(self): + self._start_some_futures() + self.assertEqual(len(self.executor._processes), 5) + processes = self.executor._processes + self.executor.shutdown() + + for p in processes: + p.join() + + def test_context_manager_shutdown(self): + with futures.ProcessPoolExecutor(max_workers=5) as e: + executor = e + self.assertEqual(list(e.map(abs, range(-5, 5))), + [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) + + for p in self.executor._processes: + p.join() + + def test_del_shutdown(self): + executor = futures.ProcessPoolExecutor(max_workers=5) + list(executor.map(abs, range(-5, 5))) + queue_management_thread = executor._queue_management_thread + processes = executor._processes + del executor + + queue_management_thread.join() + for p in processes: + p.join() + +class WaitTests(unittest.TestCase): + def test_first_completed(self): + def wait_test(): + while not future1._waiters: + pass + call1.set_can() + + call1 = Call(manual_finish=True) + call2 = Call(manual_finish=True) + try: + future1 = self.executor.submit(call1) + future2 = self.executor.submit(call2) + + t = threading.Thread(target=wait_test) + t.start() + done, not_done = futures.wait( + [CANCELLED_FUTURE, future1, future2], + return_when=futures.FIRST_COMPLETED) + + self.assertEquals(set([future1]), done) + self.assertEquals(set([CANCELLED_FUTURE, future2]), not_done) + finally: + call1.close() + call2.close() + + def test_first_completed_one_already_completed(self): + call1 = Call(manual_finish=True) + try: + future1 = self.executor.submit(call1) + + finished, pending = futures.wait( + [SUCCESSFUL_FUTURE, future1], + return_when=futures.FIRST_COMPLETED) + + self.assertEquals(set([SUCCESSFUL_FUTURE]), finished) + self.assertEquals(set([future1]), pending) + finally: + call1.close() + + def test_first_exception(self): + def wait_test(): + while not future1._waiters: + pass + call1.set_can() + call2.set_can() + + call1 = Call(manual_finish=True) + call2 = ExceptionCall(manual_finish=True) + call3 = Call(manual_finish=True) + try: + future1 = self.executor.submit(call1) + future2 = self.executor.submit(call2) + future3 = self.executor.submit(call3) + + t = threading.Thread(target=wait_test) + t.start() + finished, pending = futures.wait( + [future1, future2, future3], + return_when=futures.FIRST_EXCEPTION) + + self.assertEquals(set([future1, future2]), finished) + self.assertEquals(set([future3]), pending) + finally: + call1.close() + call2.close() + call3.close() + + def test_first_exception_some_already_complete(self): + def wait_test(): + while not future1._waiters: + pass + call1.set_can() + + call1 = ExceptionCall(manual_finish=True) + call2 = Call(manual_finish=True) + try: + future1 = self.executor.submit(call1) + future2 = self.executor.submit(call2) + + t = threading.Thread(target=wait_test) + t.start() + finished, pending = futures.wait( + [SUCCESSFUL_FUTURE, + CANCELLED_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + future1, future2], + return_when=futures.FIRST_EXCEPTION) + + self.assertEquals(set([SUCCESSFUL_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + future1]), finished) + self.assertEquals(set([CANCELLED_FUTURE, future2]), pending) + + + finally: + call1.close() + call2.close() + + def test_first_exception_one_already_failed(self): + call1 = Call(manual_finish=True) + try: + future1 = self.executor.submit(call1) + + finished, pending = futures.wait( + [EXCEPTION_FUTURE, future1], + return_when=futures.FIRST_EXCEPTION) + + self.assertEquals(set([EXCEPTION_FUTURE]), finished) + self.assertEquals(set([future1]), pending) + finally: + call1.close() + + def test_all_completed(self): + def wait_test(): + while not future1._waiters: + pass + call1.set_can() + call2.set_can() + + call1 = Call(manual_finish=True) + call2 = Call(manual_finish=True) + try: + future1 = self.executor.submit(call1) + future2 = self.executor.submit(call2) + + t = threading.Thread(target=wait_test) + t.start() + finished, pending = futures.wait( + [future1, future2], + return_when=futures.ALL_COMPLETED) + + self.assertEquals(set([future1, future2]), finished) + self.assertEquals(set(), pending) + + + finally: + call1.close() + call2.close() + + def test_all_completed_some_already_completed(self): + def wait_test(): + while not future1._waiters: + pass + + future4.cancel() + call1.set_can() + call2.set_can() + call3.set_can() + + self.assertTrue( + futures.process.EXTRA_QUEUED_CALLS <= 1, + 'this test assumes that future4 will be cancelled before it is ' + 'queued to run - which might not be the case if ' + 'ProcessPoolExecutor is too aggresive in scheduling futures') + call1 = Call(manual_finish=True) + call2 = Call(manual_finish=True) + call3 = Call(manual_finish=True) + call4 = Call(manual_finish=True) + try: + future1 = self.executor.submit(call1) + future2 = self.executor.submit(call2) + future3 = self.executor.submit(call3) + future4 = self.executor.submit(call4) + + t = threading.Thread(target=wait_test) + t.start() + finished, pending = futures.wait( + [SUCCESSFUL_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + future1, future2, future3, future4], + return_when=futures.ALL_COMPLETED) + + self.assertEquals(set([SUCCESSFUL_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + future1, future2, future3, future4]), + finished) + self.assertEquals(set(), pending) + finally: + call1.close() + call2.close() + call3.close() + call4.close() + + def test_timeout(self): + def wait_test(): + while not future1._waiters: + pass + call1.set_can() + + call1 = Call(manual_finish=True) + call2 = Call(manual_finish=True) + try: + future1 = self.executor.submit(call1) + future2 = self.executor.submit(call2) + + t = threading.Thread(target=wait_test) + t.start() + finished, pending = futures.wait( + [CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE, + future1, future2], + timeout=1, + return_when=futures.ALL_COMPLETED) + + self.assertEquals(set([CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE, + future1]), finished) + self.assertEquals(set([future2]), pending) + + + finally: + call1.close() + call2.close() + + +class ThreadPoolWaitTests(WaitTests): + def setUp(self): + self.executor = futures.ThreadPoolExecutor(max_workers=1) + + def tearDown(self): + self.executor.shutdown(wait=True) + +class ProcessPoolWaitTests(WaitTests): + def setUp(self): + self.executor = futures.ProcessPoolExecutor(max_workers=1) + + def tearDown(self): + self.executor.shutdown(wait=True) + +class AsCompletedTests(unittest.TestCase): + # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout. + def test_no_timeout(self): + def wait_test(): + while not future1._waiters: + pass + call1.set_can() + call2.set_can() + + call1 = Call(manual_finish=True) + call2 = Call(manual_finish=True) + try: + future1 = self.executor.submit(call1) + future2 = self.executor.submit(call2) + + t = threading.Thread(target=wait_test) + t.start() + completed = set(futures.as_completed( + [CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE, + future1, future2])) + self.assertEquals(set( + [CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE, + future1, future2]), + completed) + finally: + call1.close() + call2.close() + + def test_zero_timeout(self): + call1 = Call(manual_finish=True) + try: + future1 = self.executor.submit(call1) + completed_futures = set() + try: + for future in futures.as_completed( + [CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE, + future1], + timeout=0): + completed_futures.add(future) + except futures.TimeoutError: + pass + + self.assertEquals(set([CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE]), + completed_futures) + finally: + call1.close() + +class ThreadPoolAsCompletedTests(AsCompletedTests): + def setUp(self): + self.executor = futures.ThreadPoolExecutor(max_workers=1) + + def tearDown(self): + self.executor.shutdown(wait=True) + +class ProcessPoolAsCompletedTests(AsCompletedTests): + def setUp(self): + self.executor = futures.ProcessPoolExecutor(max_workers=1) + + def tearDown(self): + self.executor.shutdown(wait=True) + +class ExecutorTest(unittest.TestCase): + # Executor.shutdown() and context manager usage is tested by + # ExecutorShutdownTest. + def test_submit(self): + future = self.executor.submit(pow, 2, 8) + self.assertEquals(256, future.result()) + + def test_submit_keyword(self): + future = self.executor.submit(mul, 2, y=8) + self.assertEquals(16, future.result()) + + def test_map(self): + self.assertEqual( + list(self.executor.map(pow, range(10), range(10))), + list(map(pow, range(10), range(10)))) + + def test_map_exception(self): + i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5]) + self.assertEqual(next(i), (0, 1)) + self.assertEqual(next(i), (0, 1)) + self.assertRaises(ZeroDivisionError, next, i) + + def test_map_timeout(self): + results = [] + timeout_call = MapCall() + try: + try: + for i in self.executor.map(timeout_call, + [False, False, True], + timeout=1): + results.append(i) + except futures.TimeoutError: + pass + else: + self.fail('expected TimeoutError') + finally: + timeout_call.close() + + self.assertEquals([42, 42], results) + +class ThreadPoolExecutorTest(ExecutorTest): + def setUp(self): + self.executor = futures.ThreadPoolExecutor(max_workers=1) + + def tearDown(self): + self.executor.shutdown(wait=True) + +class ProcessPoolExecutorTest(ExecutorTest): + def setUp(self): + self.executor = futures.ProcessPoolExecutor(max_workers=1) + + def tearDown(self): + self.executor.shutdown(wait=True) + +class FutureTests(unittest.TestCase): + def test_done_callback_with_result(self): + self.callback_result = None + def fn(callback_future): + self.callback_result = callback_future.result() + + f = Future() + f.add_done_callback(fn) + f.set_result(5) + self.assertEquals(5, self.callback_result) + + def test_done_callback_with_exception(self): + self.callback_exception = None + def fn(callback_future): + self.callback_exception = callback_future.exception() + + f = Future() + f.add_done_callback(fn) + f.set_exception(Exception('test')) + self.assertEquals(('test',), self.callback_exception.args) + + def test_done_callback_with_cancel(self): + self.was_cancelled = None + def fn(callback_future): + self.was_cancelled = callback_future.cancelled() + + f = Future() + f.add_done_callback(fn) + self.assertTrue(f.cancel()) + self.assertTrue(self.was_cancelled) + + def test_done_callback_raises(self): + LOGGER.removeHandler(STDERR_HANDLER) + logging_stream = StringIO() + handler = logging.StreamHandler(logging_stream) + LOGGER.addHandler(handler) + try: + self.raising_was_called = False + self.fn_was_called = False + + def raising_fn(callback_future): + self.raising_was_called = True + raise Exception('doh!') + + def fn(callback_future): + self.fn_was_called = True + + f = Future() + f.add_done_callback(raising_fn) + f.add_done_callback(fn) + f.set_result(5) + self.assertTrue(self.raising_was_called) + self.assertTrue(self.fn_was_called) + self.assertTrue('Exception: doh!' in logging_stream.getvalue()) + finally: + LOGGER.removeHandler(handler) + LOGGER.addHandler(STDERR_HANDLER) + + def test_done_callback_already_successful(self): + self.callback_result = None + def fn(callback_future): + self.callback_result = callback_future.result() + + f = Future() + f.set_result(5) + f.add_done_callback(fn) + self.assertEquals(5, self.callback_result) + + def test_done_callback_already_failed(self): + self.callback_exception = None + def fn(callback_future): + self.callback_exception = callback_future.exception() + + f = Future() + f.set_exception(Exception('test')) + f.add_done_callback(fn) + self.assertEquals(('test',), self.callback_exception.args) + + def test_done_callback_already_cancelled(self): + self.was_cancelled = None + def fn(callback_future): + self.was_cancelled = callback_future.cancelled() + + f = Future() + self.assertTrue(f.cancel()) + f.add_done_callback(fn) + self.assertTrue(self.was_cancelled) + + def test_repr(self): + self.assertTrue(re.match('', + repr(PENDING_FUTURE))) + self.assertTrue(re.match('', + repr(RUNNING_FUTURE))) + self.assertTrue(re.match('', + repr(CANCELLED_FUTURE))) + self.assertTrue(re.match('', + repr(CANCELLED_AND_NOTIFIED_FUTURE))) + self.assertTrue(re.match( + '', + repr(EXCEPTION_FUTURE))) + self.assertTrue(re.match( + '', + repr(SUCCESSFUL_FUTURE))) + + def test_cancel(self): + f1 = create_future(state=PENDING) + f2 = create_future(state=RUNNING) + f3 = create_future(state=CANCELLED) + f4 = create_future(state=CANCELLED_AND_NOTIFIED) + f5 = create_future(state=FINISHED, exception=IOError()) + f6 = create_future(state=FINISHED, result=5) + + self.assertTrue(f1.cancel()) + self.assertEquals(f1._state, CANCELLED) + + self.assertFalse(f2.cancel()) + self.assertEquals(f2._state, RUNNING) + + self.assertTrue(f3.cancel()) + self.assertEquals(f3._state, CANCELLED) + + self.assertTrue(f4.cancel()) + self.assertEquals(f4._state, CANCELLED_AND_NOTIFIED) + + self.assertFalse(f5.cancel()) + self.assertEquals(f5._state, FINISHED) + + self.assertFalse(f6.cancel()) + self.assertEquals(f6._state, FINISHED) + + def test_cancelled(self): + self.assertFalse(PENDING_FUTURE.cancelled()) + self.assertFalse(RUNNING_FUTURE.cancelled()) + self.assertTrue(CANCELLED_FUTURE.cancelled()) + self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled()) + self.assertFalse(EXCEPTION_FUTURE.cancelled()) + self.assertFalse(SUCCESSFUL_FUTURE.cancelled()) + + def test_done(self): + self.assertFalse(PENDING_FUTURE.done()) + self.assertFalse(RUNNING_FUTURE.done()) + self.assertTrue(CANCELLED_FUTURE.done()) + self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done()) + self.assertTrue(EXCEPTION_FUTURE.done()) + self.assertTrue(SUCCESSFUL_FUTURE.done()) + + def test_running(self): + self.assertFalse(PENDING_FUTURE.running()) + self.assertTrue(RUNNING_FUTURE.running()) + self.assertFalse(CANCELLED_FUTURE.running()) + self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running()) + self.assertFalse(EXCEPTION_FUTURE.running()) + self.assertFalse(SUCCESSFUL_FUTURE.running()) + + def test_result_with_timeout(self): + self.assertRaises(futures.TimeoutError, + PENDING_FUTURE.result, timeout=0) + self.assertRaises(futures.TimeoutError, + RUNNING_FUTURE.result, timeout=0) + self.assertRaises(futures.CancelledError, + CANCELLED_FUTURE.result, timeout=0) + self.assertRaises(futures.CancelledError, + CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0) + self.assertRaises(IOError, EXCEPTION_FUTURE.result, timeout=0) + self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42) + + def test_result_with_success(self): + # TODO(brian@sweetapp.com): This test is timing dependant. + def notification(): + # Wait until the main thread is waiting for the result. + time.sleep(1) + f1.set_result(42) + + f1 = create_future(state=PENDING) + t = threading.Thread(target=notification) + t.start() + + self.assertEquals(f1.result(timeout=5), 42) + + def test_result_with_cancel(self): + # TODO(brian@sweetapp.com): This test is timing dependant. + def notification(): + # Wait until the main thread is waiting for the result. + time.sleep(1) + f1.cancel() + + f1 = create_future(state=PENDING) + t = threading.Thread(target=notification) + t.start() + + self.assertRaises(futures.CancelledError, f1.result, timeout=5) + + def test_exception_with_timeout(self): + self.assertRaises(futures.TimeoutError, + PENDING_FUTURE.exception, timeout=0) + self.assertRaises(futures.TimeoutError, + RUNNING_FUTURE.exception, timeout=0) + self.assertRaises(futures.CancelledError, + CANCELLED_FUTURE.exception, timeout=0) + self.assertRaises(futures.CancelledError, + CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0) + self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0), + IOError)) + self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None) + + def test_exception_with_success(self): + def notification(): + # Wait until the main thread is waiting for the exception. + time.sleep(1) + with f1._condition: + f1._state = FINISHED + f1._exception = IOError() + f1._condition.notify_all() + + f1 = create_future(state=PENDING) + t = threading.Thread(target=notification) + t.start() + + self.assertTrue(isinstance(f1.exception(timeout=5), IOError)) + +def test_main(): + run_unittest(ProcessPoolExecutorTest, + ThreadPoolExecutorTest, + ProcessPoolWaitTests, + ThreadPoolWaitTests, + ProcessPoolAsCompletedTests, + ThreadPoolAsCompletedTests, + FutureTests, + ProcessPoolShutdownTest, + ThreadPoolShutdownTest) + +if __name__ == "__main__": + test_main() diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..83eda49 --- /dev/null +++ b/tox.ini @@ -0,0 +1,11 @@ +[tox] +envlist = py25,py26,py27,py31 + +[testenv] +commands={envpython} test_futures.py [] + +#[testenv:py24] +#deps=multiprocessing +# +#[testenv:py25] +#deps=multiprocessing -- cgit v1.2.1