# Copyright 2009 Brian Quinlan. All Rights Reserved. See LICENSE file. __author__ = 'Brian Quinlan (brian@sweetapp.com)' import functools import logging import threading import time FIRST_COMPLETED = 'FIRST_COMPLETED' FIRST_EXCEPTION = 'FIRST_EXCEPTION' ALL_COMPLETED = 'ALL_COMPLETED' RETURN_IMMEDIATELY = 'RETURN_IMMEDIATELY' # Possible future states (for internal use by the futures package). PENDING = 'PENDING' RUNNING = 'RUNNING' # The future was cancelled by the user... CANCELLED = 'CANCELLED' # ...and ThreadEventSink.add_cancelled() was called by a worker. CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED' FINISHED = 'FINISHED' _FUTURE_STATES = [ PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED ] _STATE_TO_DESCRIPTION_MAP = { PENDING: "pending", RUNNING: "running", CANCELLED: "cancelled", CANCELLED_AND_NOTIFIED: "cancelled", FINISHED: "finished" } # Logger for internal use by the futures package. LOGGER = logging.getLogger("futures") _handler = logging.StreamHandler() LOGGER.addHandler(_handler) del _handler def set_future_exception(future, event_sink, exception): """Sets a future as having terminated with an exception. This function should only be used within the futures package. Args: future: The Future that finished with an exception. event_sink: The ThreadEventSink accociated with the Future's FutureList. The event_sink will be notified of the Future's completion, which may unblock some clients that have called FutureList.wait(). exception: The expection that executing the Future raised. """ with future._condition: future._exception = exception with event_sink._condition: future._state = FINISHED event_sink.add_exception() future._condition.notify_all() def set_future_result(future, event_sink, result): """Sets a future as having terminated without exception. This function should only be used within the futures package. Args: future: The Future that completed. event_sink: The ThreadEventSink accociated with the Future's FutureList. The event_sink will be notified of the Future's completion, which may unblock some clients that have called FutureList.wait(). result: The value returned by the Future. """ with future._condition: future._result = result with event_sink._condition: future._state = FINISHED event_sink.add_result() future._condition.notify_all() class Error(Exception): pass class CancelledError(Error): pass class TimeoutError(Error): pass class _WaitTracker(object): """Provides the event that FutureList.wait(...) blocks on. """ def __init__(self): self.event = threading.Event() def add_result(self): raise NotImplementedError() def add_exception(self): raise NotImplementedError() def add_cancelled(self): raise NotImplementedError() class _FirstCompletedWaitTracker(_WaitTracker): """Used by wait(return_when=FIRST_COMPLETED).""" def add_result(self): self.event.set() def add_exception(self): self.event.set() def add_cancelled(self): self.event.set() class _AllCompletedWaitTracker(_WaitTracker): """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED).""" def __init__(self, num_pending_calls, stop_on_exception): self.num_pending_calls = num_pending_calls self.stop_on_exception = stop_on_exception super().__init__() def add_result(self): self.num_pending_calls -= 1 if not self.num_pending_calls: self.event.set() def add_exception(self): if self.stop_on_exception: self.event.set() else: self.add_result() def add_cancelled(self): self.add_result() class ThreadEventSink(object): """Forwards events to many _WaitTrackers. Each FutureList has a ThreadEventSink and each call to FutureList.wait() causes a new _WaitTracker to be added to the ThreadEventSink. This design allows many threads to call FutureList.wait() on the same FutureList with different arguments. This class should not be used by clients. """ def __init__(self): self._condition = threading.Lock() self._waiters = [] def add(self, e): self._waiters.append(e) def remove(self, e): self._waiters.remove(e) def add_result(self): for waiter in self._waiters: waiter.add_result() def add_exception(self): for waiter in self._waiters: waiter.add_exception() def add_cancelled(self): for waiter in self._waiters: waiter.add_cancelled() class Future(object): """Represents the result of an asynchronous computation.""" # Transitions into the CANCELLED_AND_NOTIFIED and FINISHED states trigger notifications to the ThreadEventSink # belonging to the Future's FutureList and must be made with ThreadEventSink._condition held to prevent a race # condition when the transition is made concurrently with the addition of a new _WaitTracker to the ThreadEventSink. # Other state transitions need only have the Future._condition held. # When ThreadEventSink._condition and Future._condition must both be held then Future._condition is always acquired # first. def __init__(self, index): """Initializes the future. Should not be called by clients.""" self._condition = threading.Condition() self._state = PENDING self._result = None self._exception = None self._index = index def __repr__(self): with self._condition: if self._state == FINISHED: if self._exception: return '' % ( _STATE_TO_DESCRIPTION_MAP[self._state], self._exception.__class__.__name__) else: return '' % ( _STATE_TO_DESCRIPTION_MAP[self._state], self._result.__class__.__name__) return '' % _STATE_TO_DESCRIPTION_MAP[self._state] @property def index(self): """The index of the future in its FutureList.""" return self._index def cancel(self): """Cancel the future if possible. Returns True if the future was cancelled, False otherwise. A future cannot be cancelled if it is running or has already completed. """ with self._condition: if self._state in [RUNNING, FINISHED]: return False if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED]: self._state = CANCELLED self._condition.notify_all() return True def cancelled(self): """Return True if the future has cancelled.""" with self._condition: return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED] def running(self): with self._condition: return self._state == RUNNING def done(self): """Return True of the future was cancelled or finished executing.""" with self._condition: return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] def __get_result(self): if self._exception: raise self._exception else: return self._result def result(self, timeout=None): """Return the result of the call that the future represents. Args: timeout: The number of seconds to wait for the result if the future isn't done. If None, then there is no limit on the wait time. Returns: The result of the call that the future represents. Raises: CancelledError: If the future was cancelled. TimeoutError: If the future didn't finish executing before the given timeout. Exception: If the call raised then that exception will be raised. """ with self._condition: if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: raise CancelledError() elif self._state == FINISHED: return self.__get_result() self._condition.wait(timeout) if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: raise CancelledError() elif self._state == FINISHED: return self.__get_result() else: raise TimeoutError() def exception(self, timeout=None): """Return the exception raised by the call that the future represents. Args: timeout: The number of seconds to wait for the exception if the future isn't done. If None, then there is no limit on the wait time. Returns: The exception raised by the call that the future represents or None if the call completed without raising. Raises: CancelledError: If the future was cancelled. TimeoutError: If the future didn't finish executing before the given timeout. """ with self._condition: if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: raise CancelledError() elif self._state == FINISHED: return self._exception self._condition.wait(timeout) if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: raise CancelledError() elif self._state == FINISHED: return self._exception else: raise TimeoutError() class FutureList(object): def __init__(self, futures, event_sink): """Initializes the FutureList. Should not be called by clients.""" self._futures = futures self._event_sink = event_sink def wait(self, timeout=None, return_when=ALL_COMPLETED): """Wait for the futures in the list to complete. Args: timeout: The maximum number of seconds to wait. If None, then there is no limit on the wait time. return_when: Indicates when the method should return. The options are: FIRST_COMPLETED - Return when any future finishes or is cancelled. FIRST_EXCEPTION - Return when any future finishes by raising an exception. If no future raises and exception then it is equivalent to ALL_COMPLETED. ALL_COMPLETED - Return when all futures finish or are cancelled. RETURN_IMMEDIATELY - Return without waiting (this is not likely to be a useful option but it is there to be symmetrical with the executor.run_to_futures() method. Raises: TimeoutError: If the wait condition wasn't satisfied before the given timeout. """ if return_when == RETURN_IMMEDIATELY: return # Futures cannot change state without this condition being held. with self._event_sink._condition: # Make a quick exit if every future is already done. This check is # necessary because, if every future is in the # CANCELLED_AND_NOTIFIED or FINISHED state then the WaitTracker will # never receive any events. if all(f._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] for f in self): return if return_when == FIRST_COMPLETED: completed_tracker = _FirstCompletedWaitTracker() else: # Calculate how many events are expected before every future # is complete. This can be done without holding the futures' # locks because a future cannot transition itself into either # of the states being looked for. pending_count = sum( f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in self) if return_when == FIRST_EXCEPTION: completed_tracker = _AllCompletedWaitTracker( pending_count, stop_on_exception=True) elif return_when == ALL_COMPLETED: completed_tracker = _AllCompletedWaitTracker( pending_count, stop_on_exception=False) self._event_sink.add(completed_tracker) try: completed_tracker.event.wait(timeout) finally: self._event_sink.remove(completed_tracker) def cancel(self, timeout=None): """Cancel the futures in the list. Args: timeout: The maximum number of seconds to wait. If None, then there is no limit on the wait time. Raises: TimeoutError: If all the futures were not finished before the given timeout. """ for f in self: f.cancel() self.wait(timeout=timeout, return_when=ALL_COMPLETED) if any(not f.done() for f in self): raise TimeoutError() def has_running_futures(self): """Returns True if any futures in the list are still running.""" return any(self.running_futures()) def has_cancelled_futures(self): """Returns True if any futures in the list were cancelled.""" return any(self.cancelled_futures()) def has_done_futures(self): """Returns True if any futures in the list are finished or cancelled.""" return any(self.done_futures()) def has_successful_futures(self): """Returns True if any futures in the list finished without raising.""" return any(self.successful_futures()) def has_exception_futures(self): """Returns True if any futures in the list finished by raising.""" return any(self.exception_futures()) def cancelled_futures(self): """Returns all cancelled futures in the list.""" return (f for f in self if f._state in [CANCELLED, CANCELLED_AND_NOTIFIED]) def done_futures(self): """Returns all futures in the list that are finished or cancelled.""" return (f for f in self if f._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]) def successful_futures(self): """Returns all futures in the list that finished without raising.""" return (f for f in self if f._state == FINISHED and f._exception is None) def exception_futures(self): """Returns all futures in the list that finished by raising.""" return (f for f in self if f._state == FINISHED and f._exception is not None) def running_futures(self): """Returns all futures in the list that are still running.""" return (f for f in self if f._state == RUNNING) def __len__(self): return len(self._futures) def __getitem__(self, i): return self._futures[i] def __iter__(self): return iter(self._futures) def __contains__(self, future): return future in self._futures def __repr__(self): states = {state: 0 for state in _FUTURE_STATES} for f in self: states[f._state] += 1 return ('' % ( len(self), states[PENDING], states[CANCELLED] + states[CANCELLED_AND_NOTIFIED], states[RUNNING], states[FINISHED])) class Executor(object): """This is an abstract base class for conrete asynchronous executors.""" def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED): """Return a list of futures representing the given calls. Args: calls: A sequence of callables that take no arguments. These will be bound to Futures and returned. timeout: The maximum number of seconds to wait. If None, then there is no limit on the wait time. return_when: Indicates when the method should return. The options are: FIRST_COMPLETED - Return when any future finishes or is cancelled. FIRST_EXCEPTION - Return when any future finishes by raising an exception. If no future raises and exception then it is equivalent to ALL_COMPLETED. ALL_COMPLETED - Return when all futures finish or are cancelled. RETURN_IMMEDIATELY - Return without waiting. Returns: A FutureList containing Futures for the given calls. """ raise NotImplementedError() def run_to_results(self, calls, timeout=None): """Returns a iterator of the results of the given calls. Args: calls: A sequence of callables that take no arguments. These will be called and their results returned. timeout: The maximum number of seconds to wait. If None, then there is no limit on the wait time. Returns: An iterator over the results of the given calls. Equivalent to: (call() for call in calls) but the calls may be evaluated out-of-order. Raises: TimeoutError: If all the given calls were not completed before the given timeout. Exception: If any call() raises. """ if timeout is not None: end_time = timeout + time.time() fs = self.run_to_futures(calls, return_when=RETURN_IMMEDIATELY) try: for future in fs: if timeout is None: yield future.result() else: yield future.result(end_time - time.time()) finally: try: fs.cancel(timeout=0) except TimeoutError: pass def map(self, func, *iterables, timeout=None): """Returns a iterator equivalent to map(fn, iter). Args: func: A callable that will take take as many arguments as there are passed iterables. timeout: The maximum number of seconds to wait. If None, then there is no limit on the wait time. Returns: An iterator equivalent to: map(func, *iterables) but the calls may be evaluated out-of-order. Raises: TimeoutError: If the entire result iterator could not be generated before the given timeout. Exception: If fn(*args) raises for any values. """ calls = [functools.partial(func, *args) for args in zip(*iterables)] return self.run_to_results(calls, timeout) def shutdown(self): """Clean-up. No other methods can be called afterwards.""" raise NotImplementedError() def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.shutdown() return False