diff options
Diffstat (limited to 'python3/futures/_base.py')
-rw-r--r-- | python3/futures/_base.py | 599 |
1 files changed, 291 insertions, 308 deletions
diff --git a/python3/futures/_base.py b/python3/futures/_base.py index 330423e..561f4d2 100644 --- a/python3/futures/_base.py +++ b/python3/futures/_base.py @@ -1,7 +1,9 @@ -# Copyright 2009 Brian Quinlan. All Rights Reserved. See LICENSE file. +# Copyright 2009 Brian Quinlan. All Rights Reserved. +# Licensed to PSF under a Contributor Agreement. __author__ = 'Brian Quinlan (brian@sweetapp.com)' +import collections import functools import logging import threading @@ -10,14 +12,13 @@ import time FIRST_COMPLETED = 'FIRST_COMPLETED' FIRST_EXCEPTION = 'FIRST_EXCEPTION' ALL_COMPLETED = 'ALL_COMPLETED' -RETURN_IMMEDIATELY = 'RETURN_IMMEDIATELY' # Possible future states (for internal use by the futures package). PENDING = 'PENDING' RUNNING = 'RUNNING' # The future was cancelled by the user... -CANCELLED = 'CANCELLED' -# ...and ThreadEventSink.add_cancelled() was called by a worker. +CANCELLED = 'CANCELLED' +# ...and _Waiter.add_cancelled() was called by a worker. CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED' FINISHED = 'FINISHED' @@ -39,86 +40,52 @@ _STATE_TO_DESCRIPTION_MAP = { # Logger for internal use by the futures package. LOGGER = logging.getLogger("futures") -_handler = logging.StreamHandler() -LOGGER.addHandler(_handler) -del _handler - -def set_future_exception(future, event_sink, exception): - """Sets a future as having terminated with an exception. - - This function should only be used within the futures package. - - Args: - future: The Future that finished with an exception. - event_sink: The ThreadEventSink accociated with the Future's FutureList. - The event_sink will be notified of the Future's completion, which - may unblock some clients that have called FutureList.wait(). - exception: The expection that executing the Future raised. - """ - with future._condition: - future._exception = exception - with event_sink._condition: - future._state = FINISHED - event_sink.add_exception() - future._condition.notify_all() - -def set_future_result(future, event_sink, result): - """Sets a future as having terminated without exception. - - This function should only be used within the futures package. - - Args: - future: The Future that completed. - event_sink: The ThreadEventSink accociated with the Future's FutureList. - The event_sink will be notified of the Future's completion, which - may unblock some clients that have called FutureList.wait(). - result: The value returned by the Future. - """ - with future._condition: - future._result = result - with event_sink._condition: - future._state = FINISHED - event_sink.add_result() - future._condition.notify_all() +STDERR_HANDLER = logging.StreamHandler() +LOGGER.addHandler(STDERR_HANDLER) class Error(Exception): + """Base class for all future-related exceptions.""" pass class CancelledError(Error): + """The Future was cancelled.""" pass class TimeoutError(Error): + """The operation exceeded the given deadline.""" pass -class _WaitTracker(object): - """Provides the event that FutureList.wait(...) blocks on. - - """ +class _Waiter(object): + """Provides the event that wait() and as_completed() block on.""" def __init__(self): self.event = threading.Event() + self.finished_futures = [] - def add_result(self): - raise NotImplementedError() + def add_result(self, future): + self.finished_futures.append(future) - def add_exception(self): - raise NotImplementedError() + def add_exception(self, future): + self.finished_futures.append(future) - def add_cancelled(self): - raise NotImplementedError() - -class _FirstCompletedWaitTracker(_WaitTracker): - """Used by wait(return_when=FIRST_COMPLETED).""" + def add_cancelled(self, future): + self.finished_futures.append(future) - def add_result(self): +class _FirstCompletedWaiter(_Waiter): + """Used by wait(return_when=FIRST_COMPLETED) and as_completed().""" + + def add_result(self, future): + super().add_result(future) self.event.set() - def add_exception(self): + def add_exception(self, future): + super().add_exception(future) self.event.set() - def add_cancelled(self): + def add_cancelled(self, future): + super().add_cancelled(future) self.event.set() -class _AllCompletedWaitTracker(_WaitTracker): +class _AllCompletedWaiter(_Waiter): """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED).""" def __init__(self, num_pending_calls, stop_on_exception): @@ -126,87 +93,196 @@ class _AllCompletedWaitTracker(_WaitTracker): self.stop_on_exception = stop_on_exception super().__init__() - def add_result(self): + def _decrement_pending_calls(self): self.num_pending_calls -= 1 if not self.num_pending_calls: self.event.set() - def add_exception(self): + def add_result(self, future): + super().add_result(future) + self._decrement_pending_calls() + + def add_exception(self, future): + super().add_exception(future) if self.stop_on_exception: self.event.set() else: - self.add_result() + self._decrement_pending_calls() + + def add_cancelled(self, future): + super().add_cancelled(future) + self._decrement_pending_calls() + +class _AcquireFutures(object): + """A context manager that does an ordered acquire of Future conditions.""" + + def __init__(self, futures): + self.futures = sorted(futures, key=id) + + def __enter__(self): + for future in self.futures: + future._condition.acquire() + + def __exit__(self, *args): + for future in self.futures: + future._condition.release() + +def _create_and_install_waiters(fs, return_when): + if return_when == FIRST_COMPLETED: + waiter = _FirstCompletedWaiter() + else: + pending_count = sum( + f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs) + + if return_when == FIRST_EXCEPTION: + waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True) + elif return_when == ALL_COMPLETED: + waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False) + else: + raise ValueError("Invalid return condition: %r" % return_when) - def add_cancelled(self): - self.add_result() + for f in fs: + f._waiters.append(waiter) -class ThreadEventSink(object): - """Forwards events to many _WaitTrackers. + return waiter - Each FutureList has a ThreadEventSink and each call to FutureList.wait() - causes a new _WaitTracker to be added to the ThreadEventSink. This design - allows many threads to call FutureList.wait() on the same FutureList with - different arguments. +def as_completed(fs, timeout=None): + """An iterator over the given futures that yields each as it completes. - This class should not be used by clients. + Args: + fs: The sequence of Futures (possibly created by different Executors) to + iterate over. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + + Returns: + An iterator that yields the given Futures as they complete (finished or + cancelled). + + Raises: + TimeoutError: If the entire result iterator could not be generated + before the given timeout. """ - def __init__(self): - self._condition = threading.Lock() - self._waiters = [] + if timeout is not None: + end_time = timeout + time.time() + + with _AcquireFutures(fs): + finished = set( + f for f in fs + if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) + pending = set(fs) - finished + waiter = _create_and_install_waiters(fs, FIRST_COMPLETED) + + try: + for future in finished: + yield future + + while pending: + if timeout is None: + wait_timeout = None + else: + wait_timeout = end_time - time.time() + if wait_timeout < 0: + raise TimeoutError( + '%d (of %d) futures unfinished' % ( + len(pending), len(fs))) - def add(self, e): - self._waiters.append(e) + waiter.event.wait(timeout) - def remove(self, e): - self._waiters.remove(e) + for future in waiter.finished_futures[:]: + yield future + waiter.finished_futures.remove(future) + pending.remove(future) - def add_result(self): - for waiter in self._waiters: - waiter.add_result() + finally: + for f in fs: + f._waiters.remove(waiter) - def add_exception(self): - for waiter in self._waiters: - waiter.add_exception() +DoneAndNotDoneFutures = collections.namedtuple( + 'DoneAndNotDoneFutures', 'done not_done') +def wait(fs, timeout=None, return_when=ALL_COMPLETED): + """Wait for the futures in the given sequence to complete. - def add_cancelled(self): - for waiter in self._waiters: - waiter.add_cancelled() + Args: + fs: The sequence of Futures (possibly created by different Executors) to + wait upon. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + return_when: Indicates when this function should return. The options + are: + + FIRST_COMPLETED - Return when any future finishes or is + cancelled. + FIRST_EXCEPTION - Return when any future finishes by raising an + exception. If no future raises an exception + then it is equivalent to ALL_COMPLETED. + ALL_COMPLETED - Return when all futures finish or are cancelled. + + Returns: + A named 2-tuple of sets. The first set, named 'done', contains the + futures that completed (is finished or cancelled) before the wait + completed. The second set, named 'not_done', contains uncompleted + futures. + """ + with _AcquireFutures(fs): + done = set(f for f in fs + if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) + not_done = set(fs) - done + + if (return_when == FIRST_COMPLETED) and done: + return DoneAndNotDoneFutures(done, not_done) + elif (return_when == FIRST_EXCEPTION) and done: + if any(f for f in done + if not f.cancelled() and f.exception() is not None): + return DoneAndNotDoneFutures(done, not_done) + + if len(done) == len(fs): + return DoneAndNotDoneFutures(done, not_done) + + waiter = _create_and_install_waiters(fs, return_when) + + waiter.event.wait(timeout) + for f in fs: + f._waiters.remove(waiter) + + done.update(waiter.finished_futures) + return DoneAndNotDoneFutures(done, set(fs) - done) class Future(object): """Represents the result of an asynchronous computation.""" - # Transitions into the CANCELLED_AND_NOTIFIED and FINISHED states trigger notifications to the ThreadEventSink - # belonging to the Future's FutureList and must be made with ThreadEventSink._condition held to prevent a race - # condition when the transition is made concurrently with the addition of a new _WaitTracker to the ThreadEventSink. - # Other state transitions need only have the Future._condition held. - # When ThreadEventSink._condition and Future._condition must both be held then Future._condition is always acquired - # first. - - def __init__(self, index): + def __init__(self): """Initializes the future. Should not be called by clients.""" self._condition = threading.Condition() self._state = PENDING self._result = None self._exception = None - self._index = index + self._waiters = [] + self._done_callbacks = [] + + def _invoke_callbacks(self): + for callback in self._done_callbacks: + try: + callback(self) + except Exception: + LOGGER.exception('exception calling callback for %r', self) def __repr__(self): with self._condition: if self._state == FINISHED: if self._exception: - return '<Future state=%s raised %s>' % ( + return '<Future at %s state=%s raised %s>' % ( + hex(id(self)), _STATE_TO_DESCRIPTION_MAP[self._state], self._exception.__class__.__name__) else: - return '<Future state=%s returned %s>' % ( + return '<Future at %s state=%s returned %s>' % ( + hex(id(self)), _STATE_TO_DESCRIPTION_MAP[self._state], self._result.__class__.__name__) - return '<Future state=%s>' % _STATE_TO_DESCRIPTION_MAP[self._state] - - @property - def index(self): - """The index of the future in its FutureList.""" - return self._index + return '<Future at %s state=%s>' % ( + hex(id(self)), + _STATE_TO_DESCRIPTION_MAP[self._state]) def cancel(self): """Cancel the future if possible. @@ -218,10 +294,14 @@ class Future(object): if self._state in [RUNNING, FINISHED]: return False - if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED]: - self._state = CANCELLED - self._condition.notify_all() - return True + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + return True + + self._state = CANCELLED + self._condition.notify_all() + + self._invoke_callbacks() + return True def cancelled(self): """Return True if the future has cancelled.""" @@ -229,6 +309,7 @@ class Future(object): return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED] def running(self): + """Return True if the future is currently executing.""" with self._condition: return self._state == RUNNING @@ -243,6 +324,23 @@ class Future(object): else: return self._result + def add_done_callback(self, fn): + """Attaches a callable that will be called when the future finishes. + + Args: + fn: A callable that will be called with this future as its only + argument when the future completes or is cancelled. The callable + will always be called by a thread in the same process in which + it was added. If the future has already completed or been + cancelled then the callable will be called immediately. These + callables are called in the order that they were added. + """ + with self._condition: + if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]: + self._done_callbacks.append(fn) + return + fn(self) + def result(self, timeout=None): """Return the result of the call that the future represents. @@ -307,210 +405,109 @@ class Future(object): else: raise TimeoutError() + # The following methods should only be used by Executors and in tests. + def set_running_or_notify_cancel(self): + """Mark the future as running or process any cancel notifications. -class FutureList(object): - def __init__(self, futures, event_sink): - """Initializes the FutureList. Should not be called by clients.""" - self._futures = futures - self._event_sink = event_sink + Should only be used by Executor implementations and unit tests. - def wait(self, timeout=None, return_when=ALL_COMPLETED): - """Wait for the futures in the list to complete. + If the future has been cancelled (cancel() was called and returned + True) then any threads waiting on the future completing (though calls + to as_completed() or wait()) are notified and False is returned. - Args: - timeout: The maximum number of seconds to wait. If None, then there - is no limit on the wait time. - return_when: Indicates when the method should return. The options - are: - - FIRST_COMPLETED - Return when any future finishes or is - cancelled. - FIRST_EXCEPTION - Return when any future finishes by raising an - exception. If no future raises and exception - then it is equivalent to ALL_COMPLETED. - ALL_COMPLETED - Return when all futures finish or are cancelled. - RETURN_IMMEDIATELY - Return without waiting (this is not likely - to be a useful option but it is there to - be symmetrical with the - executor.run_to_futures() method. + If the future was not cancelled then it is put in the running state + (future calls to running() will return True) and True is returned. + + This method should be called by Executor implementations before + executing the work associated with this future. If this method returns + False then the work should not be executed. + + Returns: + False if the Future was cancelled, True otherwise. Raises: - TimeoutError: If the wait condition wasn't satisfied before the - given timeout. + RuntimeError: if this method was already called or if set_result() + or set_exception() was called. """ - if return_when == RETURN_IMMEDIATELY: - return - - # Futures cannot change state without this condition being held. - with self._event_sink._condition: - # Make a quick exit if every future is already done. This check is - # necessary because, if every future is in the - # CANCELLED_AND_NOTIFIED or FINISHED state then the WaitTracker will - # never receive any events. - if all(f._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] - for f in self): - return - - if return_when == FIRST_COMPLETED: - completed_tracker = _FirstCompletedWaitTracker() + with self._condition: + if self._state == CANCELLED: + self._state = CANCELLED_AND_NOTIFIED + for waiter in self._waiters: + waiter.add_cancelled(self) + # self._condition.notify_all() is not necessary because + # self.cancel() triggers a notification. + return False + elif self._state == PENDING: + self._state = RUNNING + return True else: - # Calculate how many events are expected before every future - # is complete. This can be done without holding the futures' - # locks because a future cannot transition itself into either - # of the states being looked for. - pending_count = sum( - f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] - for f in self) - - if return_when == FIRST_EXCEPTION: - completed_tracker = _AllCompletedWaitTracker( - pending_count, stop_on_exception=True) - elif return_when == ALL_COMPLETED: - completed_tracker = _AllCompletedWaitTracker( - pending_count, stop_on_exception=False) - - self._event_sink.add(completed_tracker) + LOGGER.critical('Future %s in unexpected state: %s', + id(self.future), + self.future._state) + raise RuntimeError('Future in unexpected state') - try: - completed_tracker.event.wait(timeout) - finally: - self._event_sink.remove(completed_tracker) + def set_result(self, result): + """Sets the return value of work associated with the future. - def cancel(self, timeout=None): - """Cancel the futures in the list. + Should only be used by Executor implementations and unit tests. + """ + with self._condition: + self._result = result + self._state = FINISHED + for waiter in self._waiters: + waiter.add_result(self) + self._condition.notify_all() + self._invoke_callbacks() - Args: - timeout: The maximum number of seconds to wait. If None, then there - is no limit on the wait time. + def set_exception(self, exception): + """Sets the result of the future as being the given exception. - Raises: - TimeoutError: If all the futures were not finished before the - given timeout. + Should only be used by Executor implementations and unit tests. """ - for f in self: - f.cancel() - self.wait(timeout=timeout, return_when=ALL_COMPLETED) - if any(not f.done() for f in self): - raise TimeoutError() - - def has_running_futures(self): - """Returns True if any futures in the list are still running.""" - return any(self.running_futures()) - - def has_cancelled_futures(self): - """Returns True if any futures in the list were cancelled.""" - return any(self.cancelled_futures()) - - def has_done_futures(self): - """Returns True if any futures in the list are finished or cancelled.""" - return any(self.done_futures()) - - def has_successful_futures(self): - """Returns True if any futures in the list finished without raising.""" - return any(self.successful_futures()) - - def has_exception_futures(self): - """Returns True if any futures in the list finished by raising.""" - return any(self.exception_futures()) - - def cancelled_futures(self): - """Returns all cancelled futures in the list.""" - return (f for f in self - if f._state in [CANCELLED, CANCELLED_AND_NOTIFIED]) - - def done_futures(self): - """Returns all futures in the list that are finished or cancelled.""" - return (f for f in self - if f._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]) - - def successful_futures(self): - """Returns all futures in the list that finished without raising.""" - return (f for f in self - if f._state == FINISHED and f._exception is None) - - def exception_futures(self): - """Returns all futures in the list that finished by raising.""" - return (f for f in self - if f._state == FINISHED and f._exception is not None) - - def running_futures(self): - """Returns all futures in the list that are still running.""" - return (f for f in self if f._state == RUNNING) - - def __len__(self): - return len(self._futures) - - def __getitem__(self, i): - return self._futures[i] - - def __iter__(self): - return iter(self._futures) - - def __contains__(self, future): - return future in self._futures - - def __repr__(self): - states = {state: 0 for state in _FUTURE_STATES} - for f in self: - states[f._state] += 1 - - return ('<FutureList #futures=%d ' - '[#pending=%d #cancelled=%d #running=%d #finished=%d]>' % ( - len(self), - states[PENDING], - states[CANCELLED] + states[CANCELLED_AND_NOTIFIED], - states[RUNNING], - states[FINISHED])) + with self._condition: + self._exception = exception + self._state = FINISHED + for waiter in self._waiters: + waiter.add_exception(self) + self._condition.notify_all() + self._invoke_callbacks() class Executor(object): - """This is an abstract base class for conrete asynchronous executors.""" - def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED): - """Return a list of futures representing the given calls. + """This is an abstract base class for concrete asynchronous executors.""" - Args: - calls: A sequence of callables that take no arguments. These will - be bound to Futures and returned. - timeout: The maximum number of seconds to wait. If None, then there - is no limit on the wait time. - return_when: Indicates when the method should return. The options - are: + def submit(self, fn, *args, **kwargs): + """Submits a callable to be executed with the given arguments. - FIRST_COMPLETED - Return when any future finishes or is - cancelled. - FIRST_EXCEPTION - Return when any future finishes by raising an - exception. If no future raises and exception - then it is equivalent to ALL_COMPLETED. - ALL_COMPLETED - Return when all futures finish or are cancelled. - RETURN_IMMEDIATELY - Return without waiting. + Schedules the callable to be executed as fn(*args, **kwargs) and returns + a Future instance representing the execution of the callable. Returns: - A FutureList containing Futures for the given calls. + A Future representing the given call. """ raise NotImplementedError() - def run_to_results(self, calls, timeout=None): - """Returns a iterator of the results of the given calls. + def map(self, fn, *iterables, timeout=None): + """Returns a iterator equivalent to map(fn, iter). Args: - calls: A sequence of callables that take no arguments. These will - be called and their results returned. + fn: A callable that will take take as many arguments as there are + passed iterables. timeout: The maximum number of seconds to wait. If None, then there is no limit on the wait time. Returns: - An iterator over the results of the given calls. Equivalent to: - (call() for call in calls) but the calls may be evaluated - out-of-order. + An iterator equivalent to: map(func, *iterables) but the calls may + be evaluated out-of-order. Raises: - TimeoutError: If all the given calls were not completed before the - given timeout. - Exception: If any call() raises. + TimeoutError: If the entire result iterator could not be generated + before the given timeout. + Exception: If fn(*args) raises for any values. """ if timeout is not None: end_time = timeout + time.time() - fs = self.run_to_futures(calls, return_when=RETURN_IMMEDIATELY) + fs = [self.submit(fn, *args) for args in zip(*iterables)] try: for future in fs: @@ -519,39 +516,25 @@ class Executor(object): else: yield future.result(end_time - time.time()) finally: - try: - fs.cancel(timeout=0) - except TimeoutError: - pass + for future in fs: + future.cancel() - def map(self, func, *iterables, timeout=None): - """Returns a iterator equivalent to map(fn, iter). + def shutdown(self, wait=True): + """Clean-up the resources associated with the Executor. - Args: - func: A callable that will take take as many arguments as there - are passed iterables. - timeout: The maximum number of seconds to wait. If None, then there - is no limit on the wait time. + It is safe to call this method several times. Otherwise, no other + methods can be called after this one. - Returns: - An iterator equivalent to: map(func, *iterables) but the calls may - be evaluated out-of-order. - - Raises: - TimeoutError: If the entire result iterator could not be generated - before the given timeout. - Exception: If fn(*args) raises for any values. + Args: + wait: If True then shutdown will not return until all running + futures have finished executing and the resources used by the + executor have been reclaimed. """ - calls = [functools.partial(func, *args) for args in zip(*iterables)] - return self.run_to_results(calls, timeout) - - def shutdown(self): - """Clean-up. No other methods can be called afterwards.""" - raise NotImplementedError() + pass def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): - self.shutdown() + self.shutdown(wait=True) return False |