diff options
Diffstat (limited to 'python3/futures/_base.py')
-rw-r--r-- | python3/futures/_base.py | 482 |
1 files changed, 482 insertions, 0 deletions
diff --git a/python3/futures/_base.py b/python3/futures/_base.py new file mode 100644 index 0000000..19cabe8 --- /dev/null +++ b/python3/futures/_base.py @@ -0,0 +1,482 @@ +import functools +import logging +import threading +import time + +FIRST_COMPLETED = 0 +FIRST_EXCEPTION = 1 +ALL_COMPLETED = 2 +RETURN_IMMEDIATELY = 3 + +# Possible future states +PENDING = 0 +RUNNING = 1 +CANCELLED = 2 # The future was cancelled... +CANCELLED_AND_NOTIFIED = 3 # ...and .add_cancelled() was called. +FINISHED = 4 + +FUTURE_STATES = [ + PENDING, + RUNNING, + CANCELLED, + CANCELLED_AND_NOTIFIED, + FINISHED +] + +_STATE_TO_DESCRIPTION_MAP = { + PENDING: "pending", + RUNNING: "running", + CANCELLED: "cancelled", + CANCELLED_AND_NOTIFIED: "cancelled", + FINISHED: "finished" +} + +LOGGER = logging.getLogger("futures") +_handler = logging.StreamHandler() +LOGGER.addHandler(_handler) +del _handler + +def set_future_exception(future, event_sink, exception): + with future._condition: + future._exception = exception + with event_sink._condition: + future._state = FINISHED + event_sink.add_exception() + future._condition.notify_all() + +def set_future_result(future, event_sink, result): + with future._condition: + future._result = result + with event_sink._condition: + future._state = FINISHED + event_sink.add_result() + future._condition.notify_all() + +class Error(Exception): + pass + +class CancelledError(Error): + pass + +class TimeoutError(Error): + pass + +class Future(object): + """Represents the result of an asynchronous computation.""" + + def __init__(self, index): + """Initializes the future. Should not be called by clients.""" + self._condition = threading.Condition() + self._state = PENDING + self._result = None + self._exception = None + self._index = index + + def __repr__(self): + with self._condition: + if self._state == FINISHED: + if self._exception: + return '<Future state=%s raised %s>' % ( + _STATE_TO_DESCRIPTION_MAP[self._state], + self._exception.__class__.__name__) + else: + return '<Future state=%s returned %s>' % ( + _STATE_TO_DESCRIPTION_MAP[self._state], + self._result.__class__.__name__) + return '<Future state=%s>' % _STATE_TO_DESCRIPTION_MAP[self._state] + + @property + def index(self): + """The index of the future in its FutureList.""" + return self._index + + def cancel(self): + """Cancel the future if possible. + + Returns True if the future was cancelled, False otherwise. A future + cannot be cancelled if it is running or has already completed. + """ + with self._condition: + if self._state in [RUNNING, FINISHED]: + return False + + if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED]: + self._state = CANCELLED + self._condition.notify_all() + return True + + def cancelled(self): + """Return True if the future has cancelled.""" + with self._condition: + return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED] + + def running(self): + with self._condition: + return self._state == RUNNING + + def done(self): + """Return True of the future was cancelled or finished executing.""" + with self._condition: + return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] + + def __get_result(self): + if self._exception: + raise self._exception + else: + return self._result + + def result(self, timeout=None): + """Return the result of the call that the future represents. + + Args: + timeout: The number of seconds to wait for the result if the future + isn't done. If None, then there is no limit on the wait time. + + Returns: + The result of the call that the future represents. + + Raises: + CancelledError: If the future was cancelled. + TimeoutError: If the future didn't finish executing before the given + timeout. + Exception: If the call raised then that exception will be raised. + """ + with self._condition: + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() + elif self._state == FINISHED: + return self.__get_result() + + self._condition.wait(timeout) + + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() + elif self._state == FINISHED: + return self.__get_result() + else: + raise TimeoutError() + + def exception(self, timeout=None): + """Return the exception raised by the call that the future represents. + + Args: + timeout: The number of seconds to wait for the exception if the + future isn't done. If None, then there is no limit on the wait + time. + + Returns: + The exception raised by the call that the future represents or None + if the call completed without raising. + + Raises: + CancelledError: If the future was cancelled. + TimeoutError: If the future didn't finish executing before the given + timeout. + """ + + with self._condition: + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() + elif self._state == FINISHED: + return self._exception + + self._condition.wait(timeout) + + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() + elif self._state == FINISHED: + return self._exception + else: + raise TimeoutError() + +class _FirstCompletedWaitTracker(object): + def __init__(self): + self.event = threading.Event() + + def add_result(self): + self.event.set() + + def add_exception(self): + self.event.set() + + def add_cancelled(self): + self.event.set() + +class _AllCompletedWaitTracker(object): + def __init__(self, pending_calls, stop_on_exception): + self.pending_calls = pending_calls + self.stop_on_exception = stop_on_exception + self.event = threading.Event() + + def add_result(self): + self.pending_calls -= 1 + if not self.pending_calls: + self.event.set() + + def add_exception(self): + if self.stop_on_exception: + self.event.set() + else: + self.add_result() + + def add_cancelled(self): + self.add_result() + +class ThreadEventSink(object): + def __init__(self): + self._condition = threading.Lock() + self._waiters = [] + + def add(self, e): + self._waiters.append(e) + + def remove(self, e): + self._waiters.remove(e) + + def add_result(self): + for waiter in self._waiters: + waiter.add_result() + + def add_exception(self): + for waiter in self._waiters: + waiter.add_exception() + + def add_cancelled(self): + for waiter in self._waiters: + waiter.add_cancelled() + +class FutureList(object): + def __init__(self, futures, event_sink): + """Initializes the FutureList. Should not be called by clients.""" + self._futures = futures + self._event_sink = event_sink + + def wait(self, timeout=None, return_when=ALL_COMPLETED): + """Wait for the futures in the list to complete. + + Args: + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + return_when: Indicates when the method should return. The options + are: + + FIRST_COMPLETED - Return when any future finishes or is + cancelled. + FIRST_EXCEPTION - Return when any future finishes by raising an + exception. If no future raises and exception + then it is equivalent to ALL_COMPLETED. + ALL_COMPLETED - Return when all futures finish or are cancelled. + RETURN_IMMEDIATELY - Return without waiting (this is not likely + to be a useful option but it is there to + be symmetrical with the + executor.run_to_futures() method. + + Raises: + TimeoutError: If the wait condition wasn't satisfied before the + given timeout. + """ + if return_when == RETURN_IMMEDIATELY: + return + + with self._event_sink._condition: + # Make a quick exit if every future is already done. This check is + # necessary because, if every future is in the + # CANCELLED_AND_NOTIFIED or FINISHED state then the WaitTracker will + # never receive any + if all(f._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] + for f in self): + return + + if return_when == FIRST_COMPLETED: + completed_tracker = _FirstCompletedWaitTracker() + else: + # Calculate how many events are expected before every future + # is complete. This can be done without holding the futures' + # locks because a future cannot transition itself into either + # of the states being looked for. + pending_count = sum( + f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] + for f in self) + + if return_when == FIRST_EXCEPTION: + completed_tracker = _AllCompletedWaitTracker( + pending_count, stop_on_exception=True) + elif return_when == ALL_COMPLETED: + completed_tracker = _AllCompletedWaitTracker( + pending_count, stop_on_exception=False) + + self._event_sink.add(completed_tracker) + + try: + completed_tracker.event.wait(timeout) + finally: + self._event_sink.remove(completed_tracker) + + def cancel(self, timeout=None): + """Cancel the futures in the list. + + Args: + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + + Raises: + TimeoutError: If all the futures were not finished before the + given timeout. + """ + for f in self: + f.cancel() + self.wait(timeout=timeout, return_when=ALL_COMPLETED) + if any(not f.done() for f in self): + raise TimeoutError() + + def has_running_futures(self): + return any(self.running_futures()) + + def has_cancelled_futures(self): + return any(self.cancelled_futures()) + + def has_done_futures(self): + return any(self.done_futures()) + + def has_successful_futures(self): + return any(self.successful_futures()) + + def has_exception_futures(self): + return any(self.exception_futures()) + + def cancelled_futures(self): + return (f for f in self + if f._state in [CANCELLED, CANCELLED_AND_NOTIFIED]) + + def done_futures(self): + return (f for f in self + if f._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]) + + def successful_futures(self): + return (f for f in self + if f._state == FINISHED and f._exception is None) + + def exception_futures(self): + return (f for f in self + if f._state == FINISHED and f._exception is not None) + + def running_futures(self): + return (f for f in self if f._state == RUNNING) + + def __len__(self): + return len(self._futures) + + def __getitem__(self, i): + return self._futures[i] + + def __iter__(self): + return iter(self._futures) + + def __contains__(self, future): + return future in self._futures + + def __repr__(self): + states = {state: 0 for state in FUTURE_STATES} + for f in self: + states[f._state] += 1 + + return ('<FutureList #futures=%d ' + '[#pending=%d #cancelled=%d #running=%d #finished=%d]>' % ( + len(self), + states[PENDING], + states[CANCELLED] + states[CANCELLED_AND_NOTIFIED], + states[RUNNING], + states[FINISHED])) + +class Executor(object): + def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED): + """Return a list of futures representing the given calls. + + Args: + calls: A sequence of callables that take no arguments. These will + be bound to Futures and returned. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + return_when: Indicates when the method should return. The options + are: + + FIRST_COMPLETED - Return when any future finishes or is + cancelled. + FIRST_EXCEPTION - Return when any future finishes by raising an + exception. If no future raises and exception + then it is equivalent to ALL_COMPLETED. + ALL_COMPLETED - Return when all futures finish or are cancelled. + RETURN_IMMEDIATELY - Return without waiting. + + Returns: + A FuturesList containing futures for the given calls. + """ + raise NotImplementedError() + + def run_to_results(self, calls, timeout=None): + """Returns a iterator of the results of the given calls. + + Args: + calls: A sequence of callables that take no arguments. These will + be called and their results returned. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + + Returns: + An iterator over the results of the given calls. Equivalent to: + (call() for call in calls) but the calls may be evaluated + out-of-order. + + Raises: + TimeoutError: If all the given calls were not completed before the + given timeout. + Exception: If any call() raises. + """ + if timeout is not None: + end_time = timeout + time.time() + + fs = self.run_to_futures(calls, return_when=RETURN_IMMEDIATELY) + + try: + for future in fs: + if timeout is None: + yield future.result() + else: + yield future.result(end_time - time.time()) + finally: + try: + fs.cancel(timeout=0) + except TimeoutError: + pass + + def map(self, func, *iterables, timeout=None): + """Returns a iterator equivalent to map(fn, iter). + + Args: + func: A callable that will take take as many arguments as there + are passed iterables. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + + Returns: + An iterator equivalent to: map(func, *iterables) but the calls may + be evaluated out-of-order. + + Raises: + TimeoutError: If the entire result iterator could not be generated + before the given timeout. + Exception: If fn(*args) raises for any values. + """ + calls = [functools.partial(func, *args) for args in zip(*iterables)] + return self.run_to_results(calls, timeout) + + def shutdown(self): + """Clean-up. No other methods can be called afterwards.""" + raise NotImplementedError() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.shutdown() + return False |