summaryrefslogtreecommitdiff
path: root/python2/futures/_base.py
diff options
context:
space:
mode:
Diffstat (limited to 'python2/futures/_base.py')
-rw-r--r--python2/futures/_base.py687
1 files changed, 303 insertions, 384 deletions
diff --git a/python2/futures/_base.py b/python2/futures/_base.py
index bec7212..ed7a094 100644
--- a/python2/futures/_base.py
+++ b/python2/futures/_base.py
@@ -1,54 +1,24 @@
-# Copyright 2009 Brian Quinlan. All Rights Reserved. See LICENSE file.
+# Copyright 2009 Brian Quinlan. All Rights Reserved.
+# Licensed to PSF under a Contributor Agreement.
__author__ = 'Brian Quinlan (brian@sweetapp.com)'
+import collections
+import functools
import logging
import threading
import time
-try:
- from functools import partial
-except ImportError:
- def partial(func, *args, **keywords):
- def newfunc(*fargs, **fkeywords):
- newkeywords = keywords.copy()
- newkeywords.update(fkeywords)
- return func(*(args + fargs), **newkeywords)
- newfunc.func = func
- newfunc.args = args
- newfunc.keywords = keywords
- return newfunc
-
-# The "any" and "all" builtins weren't introduced until Python 2.5.
-try:
- any
-except NameError:
- def any(iterable):
- for element in iterable:
- if element:
- return True
- return False
-
-try:
- all
-except NameError:
- def all(iterable):
- for element in iterable:
- if not element:
- return False
- return True
-
FIRST_COMPLETED = 'FIRST_COMPLETED'
FIRST_EXCEPTION = 'FIRST_EXCEPTION'
ALL_COMPLETED = 'ALL_COMPLETED'
-RETURN_IMMEDIATELY = 'RETURN_IMMEDIATELY'
# Possible future states (for internal use by the futures package).
PENDING = 'PENDING'
RUNNING = 'RUNNING'
# The future was cancelled by the user...
-CANCELLED = 'CANCELLED'
-# ...and ThreadEventSink.add_cancelled() was called by a worker.
+CANCELLED = 'CANCELLED'
+# ...and _Waiter.add_cancelled() was called by a worker.
CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED'
FINISHED = 'FINISHED'
@@ -70,184 +40,249 @@ _STATE_TO_DESCRIPTION_MAP = {
# Logger for internal use by the futures package.
LOGGER = logging.getLogger("futures")
-_handler = logging.StreamHandler()
-LOGGER.addHandler(_handler)
-del _handler
-
-def set_future_exception(future, event_sink, exception):
- """Sets a future as having terminated with an exception.
-
- This function should only be used within the futures package.
-
- Args:
- future: The Future that finished with an exception.
- event_sink: The ThreadEventSink accociated with the Future's FutureList.
- The event_sink will be notified of the Future's completion, which
- may unblock some clients that have called FutureList.wait().
- exception: The expection that executing the Future raised.
- """
- future._condition.acquire()
- try:
- future._exception = exception
- event_sink._condition.acquire()
- try:
- future._state = FINISHED
- event_sink.add_exception()
- finally:
- event_sink._condition.release()
-
- future._condition.notifyAll()
- finally:
- future._condition.release()
-
-def set_future_result(future, event_sink, result):
- """Sets a future as having terminated without exception.
-
- This function should only be used within the futures package.
-
- Args:
- future: The Future that completed.
- event_sink: The ThreadEventSink accociated with the Future's FutureList.
- The event_sink will be notified of the Future's completion, which
- may unblock some clients that have called FutureList.wait().
- result: The value returned by the Future.
- """
- future._condition.acquire()
- try:
- future._result = result
- event_sink._condition.acquire()
- try:
- future._state = FINISHED
- event_sink.add_result()
- finally:
- event_sink._condition.release()
-
- future._condition.notifyAll()
- finally:
- future._condition.release()
+STDERR_HANDLER = logging.StreamHandler()
+LOGGER.addHandler(STDERR_HANDLER)
class Error(Exception):
+ """Base class for all future-related exceptions."""
pass
class CancelledError(Error):
+ """The Future was cancelled."""
pass
class TimeoutError(Error):
+ """The operation exceeded the given deadline."""
pass
-class _WaitTracker(object):
- """Provides the event that FutureList.wait(...) blocks on.
-
- """
+class _Waiter(object):
+ """Provides the event that wait() and as_completed() block on."""
def __init__(self):
self.event = threading.Event()
+ self.finished_futures = []
- def add_result(self):
- raise NotImplementedError()
+ def add_result(self, future):
+ self.finished_futures.append(future)
- def add_exception(self):
- raise NotImplementedError()
+ def add_exception(self, future):
+ self.finished_futures.append(future)
- def add_cancelled(self):
- raise NotImplementedError()
+ def add_cancelled(self, future):
+ self.finished_futures.append(future)
-class _FirstCompletedWaitTracker(_WaitTracker):
- """Used by wait(return_when=FIRST_COMPLETED)."""
+class _FirstCompletedWaiter(_Waiter):
+ """Used by wait(return_when=FIRST_COMPLETED) and as_completed()."""
- def add_result(self):
+ def add_result(self, future):
+ super(_FirstCompletedWaiter, self).add_result(future)
self.event.set()
- def add_exception(self):
+ def add_exception(self, future):
+ super(_FirstCompletedWaiter, self).add_exception(future)
self.event.set()
- def add_cancelled(self):
+ def add_cancelled(self, future):
+ super(_FirstCompletedWaiter, self).add_cancelled(future)
self.event.set()
-class _AllCompletedWaitTracker(_WaitTracker):
+class _AllCompletedWaiter(_Waiter):
"""Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED)."""
def __init__(self, num_pending_calls, stop_on_exception):
self.num_pending_calls = num_pending_calls
self.stop_on_exception = stop_on_exception
- _WaitTracker.__init__(self)
+ super(_AllCompletedWaiter, self).__init__()
- def add_result(self):
+ def _decrement_pending_calls(self):
self.num_pending_calls -= 1
if not self.num_pending_calls:
self.event.set()
- def add_exception(self):
+ def add_result(self, future):
+ super(_AllCompletedWaiter, self).add_result(future)
+ self._decrement_pending_calls()
+
+ def add_exception(self, future):
+ super(_AllCompletedWaiter, self).add_exception(future)
if self.stop_on_exception:
self.event.set()
else:
- self.add_result()
+ self._decrement_pending_calls()
- def add_cancelled(self):
- self.add_result()
+ def add_cancelled(self, future):
+ super(_AllCompletedWaiter, self).add_cancelled(future)
+ self._decrement_pending_calls()
-class ThreadEventSink(object):
- """Forwards events to many _WaitTrackers.
+class _AcquireFutures(object):
+ """A context manager that does an ordered acquire of Future conditions."""
+
+ def __init__(self, futures):
+ self.futures = sorted(futures, key=id)
+
+ def __enter__(self):
+ for future in self.futures:
+ future._condition.acquire()
+
+ def __exit__(self, *args):
+ for future in self.futures:
+ future._condition.release()
+
+def _create_and_install_waiters(fs, return_when):
+ if return_when == FIRST_COMPLETED:
+ waiter = _FirstCompletedWaiter()
+ else:
+ pending_count = sum(
+ f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs)
+
+ if return_when == FIRST_EXCEPTION:
+ waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True)
+ elif return_when == ALL_COMPLETED:
+ waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False)
+ else:
+ raise ValueError("Invalid return condition: %r" % return_when)
- Each FutureList has a ThreadEventSink and each call to FutureList.wait()
- causes a new _WaitTracker to be added to the ThreadEventSink. This design
- allows many threads to call FutureList.wait() on the same FutureList with
- different arguments.
+ for f in fs:
+ f._waiters.append(waiter)
- This class should not be used by clients.
+ return waiter
+
+def as_completed(fs, timeout=None):
+ """An iterator over the given futures that yields each as it completes.
+
+ Args:
+ fs: The sequence of Futures (possibly created by different Executors) to
+ iterate over.
+ timeout: The maximum number of seconds to wait. If None, then there
+ is no limit on the wait time.
+
+ Returns:
+ An iterator that yields the given Futures as they complete (finished or
+ cancelled).
+
+ Raises:
+ TimeoutError: If the entire result iterator could not be generated
+ before the given timeout.
"""
- def __init__(self):
- self._condition = threading.Lock()
- self._waiters = []
+ if timeout is not None:
+ end_time = timeout + time.time()
- def add(self, e):
- self._waiters.append(e)
+ with _AcquireFutures(fs):
+ finished = set(
+ f for f in fs
+ if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
+ pending = set(fs) - finished
+ waiter = _create_and_install_waiters(fs, FIRST_COMPLETED)
+
+ try:
+ for future in finished:
+ yield future
+
+ while pending:
+ if timeout is None:
+ wait_timeout = None
+ else:
+ wait_timeout = end_time - time.time()
+ if wait_timeout < 0:
+ raise TimeoutError(
+ '%d (of %d) futures unfinished' % (
+ len(pending), len(fs)))
+
+ waiter.event.wait(timeout)
+
+ for future in waiter.finished_futures[:]:
+ yield future
+ waiter.finished_futures.remove(future)
+ pending.remove(future)
+
+ finally:
+ for f in fs:
+ f._waiters.remove(waiter)
+
+DoneAndNotDoneFutures = collections.namedtuple(
+ 'DoneAndNotDoneFutures', 'done not_done')
+def wait(fs, timeout=None, return_when=ALL_COMPLETED):
+ """Wait for the futures in the given sequence to complete.
+
+ Args:
+ fs: The sequence of Futures (possibly created by different Executors) to
+ wait upon.
+ timeout: The maximum number of seconds to wait. If None, then there
+ is no limit on the wait time.
+ return_when: Indicates when this function should return. The options
+ are:
+
+ FIRST_COMPLETED - Return when any future finishes or is
+ cancelled.
+ FIRST_EXCEPTION - Return when any future finishes by raising an
+ exception. If no future raises an exception
+ then it is equivalent to ALL_COMPLETED.
+ ALL_COMPLETED - Return when all futures finish or are cancelled.
+
+ Returns:
+ A named 2-tuple of sets. The first set, named 'done', contains the
+ futures that completed (is finished or cancelled) before the wait
+ completed. The second set, named 'not_done', contains uncompleted
+ futures.
+ """
+ with _AcquireFutures(fs):
+ done = set(f for f in fs
+ if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
+ not_done = set(fs) - done
- def remove(self, e):
- self._waiters.remove(e)
+ if (return_when == FIRST_COMPLETED) and done:
+ return DoneAndNotDoneFutures(done, not_done)
+ elif (return_when == FIRST_EXCEPTION) and done:
+ if any(f for f in done
+ if not f.cancelled() and f.exception() is not None):
+ return DoneAndNotDoneFutures(done, not_done)
- def add_result(self):
- for waiter in self._waiters:
- waiter.add_result()
+ if len(done) == len(fs):
+ return DoneAndNotDoneFutures(done, not_done)
- def add_exception(self):
- for waiter in self._waiters:
- waiter.add_exception()
+ waiter = _create_and_install_waiters(fs, return_when)
- def add_cancelled(self):
- for waiter in self._waiters:
- waiter.add_cancelled()
+ waiter.event.wait(timeout)
+ for f in fs:
+ f._waiters.remove(waiter)
+
+ done.update(waiter.finished_futures)
+ return DoneAndNotDoneFutures(done, set(fs) - done)
class Future(object):
"""Represents the result of an asynchronous computation."""
- def __init__(self, index):
+ def __init__(self):
"""Initializes the future. Should not be called by clients."""
self._condition = threading.Condition()
self._state = PENDING
self._result = None
self._exception = None
- self._index = index
+ self._waiters = []
+ self._done_callbacks = []
+
+ def _invoke_callbacks(self):
+ for callback in self._done_callbacks:
+ try:
+ callback(self)
+ except Exception:
+ LOGGER.exception('exception calling callback for %r', self)
def __repr__(self):
- self._condition.acquire()
- try:
+ with self._condition:
if self._state == FINISHED:
if self._exception:
- return '<Future state=%s raised %s>' % (
+ return '<Future at %s state=%s raised %s>' % (
+ hex(id(self)),
_STATE_TO_DESCRIPTION_MAP[self._state],
self._exception.__class__.__name__)
else:
- return '<Future state=%s returned %s>' % (
+ return '<Future at %s state=%s returned %s>' % (
+ hex(id(self)),
_STATE_TO_DESCRIPTION_MAP[self._state],
self._result.__class__.__name__)
- return '<Future state=%s>' % _STATE_TO_DESCRIPTION_MAP[self._state]
- finally:
- self._condition.release()
-
- @property
- def index(self):
- """The index of the future in its FutureList."""
- return self._index
+ return '<Future at %s state=%s>' % (
+ hex(id(self)),
+ _STATE_TO_DESCRIPTION_MAP[self._state])
def cancel(self):
"""Cancel the future if possible.
@@ -255,40 +290,33 @@ class Future(object):
Returns True if the future was cancelled, False otherwise. A future
cannot be cancelled if it is running or has already completed.
"""
- self._condition.acquire()
- try:
+ with self._condition:
if self._state in [RUNNING, FINISHED]:
return False
- if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED]:
- self._state = CANCELLED
- self._condition.notify_all()
- return True
- finally:
- self._condition.release()
+ if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
+ return True
+
+ self._state = CANCELLED
+ self._condition.notify_all()
+
+ self._invoke_callbacks()
+ return True
def cancelled(self):
"""Return True if the future has cancelled."""
- self._condition.acquire()
- try:
+ with self._condition:
return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
- finally:
- self._condition.release()
def running(self):
- self._condition.acquire()
- try:
+ """Return True if the future is currently executing."""
+ with self._condition:
return self._state == RUNNING
- finally:
- self._condition.release()
def done(self):
"""Return True of the future was cancelled or finished executing."""
- self._condition.acquire()
- try:
+ with self._condition:
return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
- finally:
- self._condition.release()
def __get_result(self):
if self._exception:
@@ -296,6 +324,23 @@ class Future(object):
else:
return self._result
+ def add_done_callback(self, fn):
+ """Attaches a callable that will be called when the future finishes.
+
+ Args:
+ fn: A callable that will be called with this future as its only
+ argument when the future completes or is cancelled. The callable
+ will always be called by a thread in the same process in which
+ it was added. If the future has already completed or been
+ cancelled then the callable will be called immediately. These
+ callables are called in the order that they were added.
+ """
+ with self._condition:
+ if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
+ self._done_callbacks.append(fn)
+ return
+ fn(self)
+
def result(self, timeout=None):
"""Return the result of the call that the future represents.
@@ -312,8 +357,7 @@ class Future(object):
timeout.
Exception: If the call raised then that exception will be raised.
"""
- self._condition.acquire()
- try:
+ with self._condition:
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
@@ -327,8 +371,6 @@ class Future(object):
return self.__get_result()
else:
raise TimeoutError()
- finally:
- self._condition.release()
def exception(self, timeout=None):
"""Return the exception raised by the call that the future represents.
@@ -348,8 +390,7 @@ class Future(object):
timeout.
"""
- self._condition.acquire()
- try:
+ with self._condition:
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
@@ -363,215 +404,111 @@ class Future(object):
return self._exception
else:
raise TimeoutError()
- finally:
- self._condition.release()
-class FutureList(object):
- def __init__(self, futures, event_sink):
- """Initializes the FutureList. Should not be called by clients."""
- self._futures = futures
- self._event_sink = event_sink
+ # The following methods should only be used by Executors and in tests.
+ def set_running_or_notify_cancel(self):
+ """Mark the future as running or process any cancel notifications.
- def wait(self, timeout=None, return_when=ALL_COMPLETED):
- """Wait for the futures in the list to complete.
+ Should only be used by Executor implementations and unit tests.
- Args:
- timeout: The maximum number of seconds to wait. If None, then there
- is no limit on the wait time.
- return_when: Indicates when the method should return. The options
- are:
-
- FIRST_COMPLETED - Return when any future finishes or is
- cancelled.
- FIRST_EXCEPTION - Return when any future finishes by raising an
- exception. If no future raises and exception
- then it is equivalent to ALL_COMPLETED.
- ALL_COMPLETED - Return when all futures finish or are cancelled.
- RETURN_IMMEDIATELY - Return without waiting (this is not likely
- to be a useful option but it is there to
- be symmetrical with the
- executor.run_to_futures() method.
+ If the future has been cancelled (cancel() was called and returned
+ True) then any threads waiting on the future completing (though calls
+ to as_completed() or wait()) are notified and False is returned.
- Raises:
- TimeoutError: If the wait condition wasn't satisfied before the
- given timeout.
- """
- if return_when == RETURN_IMMEDIATELY:
- return
-
- # Futures cannot change state without this condition being held.
- self._event_sink._condition.acquire()
- try:
- # Make a quick exit if every future is already done. This check is
- # necessary because, if every future is in the
- # CANCELLED_AND_NOTIFIED or FINISHED state then the WaitTracker will
- # never receive any events.
- if all(f._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
- for f in self):
- return
-
- if return_when == FIRST_COMPLETED:
- completed_tracker = _FirstCompletedWaitTracker()
- else:
- # Calculate how many events are expected before every future
- # is complete. This can be done without holding the futures'
- # locks because a future cannot transition itself into either
- # of the states being looked for.
- pending_count = sum(
- f._state not in [CANCELLED_AND_NOTIFIED, FINISHED]
- for f in self)
-
- if return_when == FIRST_EXCEPTION:
- completed_tracker = _AllCompletedWaitTracker(
- pending_count, stop_on_exception=True)
- elif return_when == ALL_COMPLETED:
- completed_tracker = _AllCompletedWaitTracker(
- pending_count, stop_on_exception=False)
-
- self._event_sink.add(completed_tracker)
- finally:
- self._event_sink._condition.release()
+ If the future was not cancelled then it is put in the running state
+ (future calls to running() will return True) and True is returned.
- try:
- completed_tracker.event.wait(timeout)
- finally:
- self._event_sink.remove(completed_tracker)
+ This method should be called by Executor implementations before
+ executing the work associated with this future. If this method returns
+ False then the work should not be executed.
- def cancel(self, timeout=None):
- """Cancel the futures in the list.
-
- Args:
- timeout: The maximum number of seconds to wait. If None, then there
- is no limit on the wait time.
+ Returns:
+ False if the Future was cancelled, True otherwise.
Raises:
- TimeoutError: If all the futures were not finished before the
- given timeout.
+ RuntimeError: if this method was already called or if set_result()
+ or set_exception() was called.
"""
- for f in self:
- f.cancel()
- self.wait(timeout=timeout, return_when=ALL_COMPLETED)
- if any(not f.done() for f in self):
- raise TimeoutError()
-
- def has_running_futures(self):
- """Returns True if any futures in the list are still running."""
- return any(self.running_futures())
-
- def has_cancelled_futures(self):
- """Returns True if any futures in the list were cancelled."""
- return any(self.cancelled_futures())
-
- def has_done_futures(self):
- """Returns True if any futures in the list are finished or cancelled."""
- return any(self.done_futures())
-
- def has_successful_futures(self):
- """Returns True if any futures in the list finished without raising."""
- return any(self.successful_futures())
-
- def has_exception_futures(self):
- """Returns True if any futures in the list finished by raising."""
- return any(self.exception_futures())
-
- def cancelled_futures(self):
- """Returns all cancelled futures in the list."""
- return (f for f in self
- if f._state in [CANCELLED, CANCELLED_AND_NOTIFIED])
-
- def done_futures(self):
- """Returns all futures in the list that are finished or cancelled."""
- return (f for f in self
- if f._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED])
-
- def successful_futures(self):
- """Returns all futures in the list that finished without raising."""
- return (f for f in self
- if f._state == FINISHED and f._exception is None)
-
- def exception_futures(self):
- """Returns all futures in the list that finished by raising."""
- return (f for f in self
- if f._state == FINISHED and f._exception is not None)
-
- def running_futures(self):
- """Returns all futures in the list that are still running."""
- return (f for f in self if f._state == RUNNING)
-
- def __len__(self):
- return len(self._futures)
-
- def __getitem__(self, i):
- return self._futures[i]
-
- def __iter__(self):
- return iter(self._futures)
-
- def __contains__(self, future):
- return future in self._futures
+ with self._condition:
+ if self._state == CANCELLED:
+ self._state = CANCELLED_AND_NOTIFIED
+ for waiter in self._waiters:
+ waiter.add_cancelled(self)
+ # self._condition.notify_all() is not necessary because
+ # self.cancel() triggers a notification.
+ return False
+ elif self._state == PENDING:
+ self._state = RUNNING
+ return True
+ else:
+ LOGGER.critical('Future %s in unexpected state: %s',
+ id(self.future),
+ self.future._state)
+ raise RuntimeError('Future in unexpected state')
- def __repr__(self):
- states = dict([(state, 0) for state in _FUTURE_STATES])
- for f in self:
- states[f._state] += 1
-
- return ('<FutureList #futures=%d '
- '[#pending=%d #cancelled=%d #running=%d #finished=%d]>' % (
- len(self),
- states[PENDING],
- states[CANCELLED] + states[CANCELLED_AND_NOTIFIED],
- states[RUNNING],
- states[FINISHED]))
+ def set_result(self, result):
+ """Sets the return value of work associated with the future.
+
+ Should only be used by Executor implementations and unit tests.
+ """
+ with self._condition:
+ self._result = result
+ self._state = FINISHED
+ for waiter in self._waiters:
+ waiter.add_result(self)
+ self._condition.notify_all()
+ self._invoke_callbacks()
+
+ def set_exception(self, exception):
+ """Sets the result of the future as being the given exception.
+
+ Should only be used by Executor implementations and unit tests.
+ """
+ with self._condition:
+ self._exception = exception
+ self._state = FINISHED
+ for waiter in self._waiters:
+ waiter.add_exception(self)
+ self._condition.notify_all()
+ self._invoke_callbacks()
class Executor(object):
"""This is an abstract base class for concrete asynchronous executors."""
- def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED):
- """Return a list of futures representing the given calls.
- Args:
- calls: A sequence of callables that take no arguments. These will
- be bound to Futures and returned.
- timeout: The maximum number of seconds to wait. If None, then there
- is no limit on the wait time.
- return_when: Indicates when the method should return. The options
- are:
+ def submit(self, fn, *args, **kwargs):
+ """Submits a callable to be executed with the given arguments.
- FIRST_COMPLETED - Return when any future finishes or is
- cancelled.
- FIRST_EXCEPTION - Return when any future finishes by raising an
- exception. If no future raises and exception
- then it is equivalent to ALL_COMPLETED.
- ALL_COMPLETED - Return when all futures finish or are cancelled.
- RETURN_IMMEDIATELY - Return without waiting.
+ Schedules the callable to be executed as fn(*args, **kwargs) and returns
+ a Future instance representing the execution of the callable.
Returns:
- A FutureList containing Futures for the given calls.
+ A Future representing the given call.
"""
raise NotImplementedError()
- def run_to_results(self, calls, timeout=None):
- """Returns a iterator of the results of the given calls.
+ def map(self, fn, *iterables, **kwargs):
+ """Returns a iterator equivalent to map(fn, iter).
Args:
- calls: A sequence of callables that take no arguments. These will
- be called and their results returned.
+ fn: A callable that will take take as many arguments as there are
+ passed iterables.
timeout: The maximum number of seconds to wait. If None, then there
is no limit on the wait time.
Returns:
- An iterator over the results of the given calls. Equivalent to:
- (call() for call in calls) but the calls may be evaluated
- out-of-order.
+ An iterator equivalent to: map(func, *iterables) but the calls may
+ be evaluated out-of-order.
Raises:
- TimeoutError: If all the given calls were not completed before the
- given timeout.
- Exception: If any call() raises.
+ TimeoutError: If the entire result iterator could not be generated
+ before the given timeout.
+ Exception: If fn(*args) raises for any values.
"""
+ timeout = kwargs.get('timeout')
if timeout is not None:
end_time = timeout + time.time()
- fs = self.run_to_futures(calls, return_when=RETURN_IMMEDIATELY)
+ fs = [self.submit(fn, *args) for args in zip(*iterables)]
try:
for future in fs:
@@ -579,44 +516,26 @@ class Executor(object):
yield future.result()
else:
yield future.result(end_time - time.time())
- except Exception, e:
- # Python 2.4 and earlier don't allow yield statements in
- # try/finally blocks
- try:
- fs.cancel(timeout=0)
- except TimeoutError:
- pass
- raise e
-
- def map(self, func, *iterables, **kwargs):
- """Returns a iterator equivalent to map(fn, iter).
+ finally:
+ for future in fs:
+ future.cancel()
- Args:
- func: A callable that will take take as many arguments as there
- are passed iterables.
- timeout: The maximum number of seconds to wait. If None, then there
- is no limit on the wait time.
+ def shutdown(self, wait=True):
+ """Clean-up the resources associated with the Executor.
- Returns:
- An iterator equivalent to: map(func, *iterables) but the calls may
- be evaluated out-of-order.
+ It is safe to call this method several times. Otherwise, no other
+ methods can be called after this one.
- Raises:
- TimeoutError: If the entire result iterator could not be generated
- before the given timeout.
- Exception: If fn(*args) raises for any values.
+ Args:
+ wait: If True then shutdown will not return until all running
+ futures have finished executing and the resources used by the
+ executor have been reclaimed.
"""
- timeout = kwargs.get('timeout') or None
- calls = [partial(func, *args) for args in zip(*iterables)]
- return self.run_to_results(calls, timeout=timeout)
-
- def shutdown(self):
- """Clean-up. No other methods can be called afterwards."""
- raise NotImplementedError()
+ pass
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
- self.shutdown()
+ self.shutdown(wait=True)
return False