summaryrefslogtreecommitdiff
path: root/python3/futures/_base.py
diff options
context:
space:
mode:
Diffstat (limited to 'python3/futures/_base.py')
-rw-r--r--python3/futures/_base.py482
1 files changed, 482 insertions, 0 deletions
diff --git a/python3/futures/_base.py b/python3/futures/_base.py
new file mode 100644
index 0000000..19cabe8
--- /dev/null
+++ b/python3/futures/_base.py
@@ -0,0 +1,482 @@
+import functools
+import logging
+import threading
+import time
+
+FIRST_COMPLETED = 0
+FIRST_EXCEPTION = 1
+ALL_COMPLETED = 2
+RETURN_IMMEDIATELY = 3
+
+# Possible future states
+PENDING = 0
+RUNNING = 1
+CANCELLED = 2 # The future was cancelled...
+CANCELLED_AND_NOTIFIED = 3 # ...and .add_cancelled() was called.
+FINISHED = 4
+
+FUTURE_STATES = [
+ PENDING,
+ RUNNING,
+ CANCELLED,
+ CANCELLED_AND_NOTIFIED,
+ FINISHED
+]
+
+_STATE_TO_DESCRIPTION_MAP = {
+ PENDING: "pending",
+ RUNNING: "running",
+ CANCELLED: "cancelled",
+ CANCELLED_AND_NOTIFIED: "cancelled",
+ FINISHED: "finished"
+}
+
+LOGGER = logging.getLogger("futures")
+_handler = logging.StreamHandler()
+LOGGER.addHandler(_handler)
+del _handler
+
+def set_future_exception(future, event_sink, exception):
+ with future._condition:
+ future._exception = exception
+ with event_sink._condition:
+ future._state = FINISHED
+ event_sink.add_exception()
+ future._condition.notify_all()
+
+def set_future_result(future, event_sink, result):
+ with future._condition:
+ future._result = result
+ with event_sink._condition:
+ future._state = FINISHED
+ event_sink.add_result()
+ future._condition.notify_all()
+
+class Error(Exception):
+ pass
+
+class CancelledError(Error):
+ pass
+
+class TimeoutError(Error):
+ pass
+
+class Future(object):
+ """Represents the result of an asynchronous computation."""
+
+ def __init__(self, index):
+ """Initializes the future. Should not be called by clients."""
+ self._condition = threading.Condition()
+ self._state = PENDING
+ self._result = None
+ self._exception = None
+ self._index = index
+
+ def __repr__(self):
+ with self._condition:
+ if self._state == FINISHED:
+ if self._exception:
+ return '<Future state=%s raised %s>' % (
+ _STATE_TO_DESCRIPTION_MAP[self._state],
+ self._exception.__class__.__name__)
+ else:
+ return '<Future state=%s returned %s>' % (
+ _STATE_TO_DESCRIPTION_MAP[self._state],
+ self._result.__class__.__name__)
+ return '<Future state=%s>' % _STATE_TO_DESCRIPTION_MAP[self._state]
+
+ @property
+ def index(self):
+ """The index of the future in its FutureList."""
+ return self._index
+
+ def cancel(self):
+ """Cancel the future if possible.
+
+ Returns True if the future was cancelled, False otherwise. A future
+ cannot be cancelled if it is running or has already completed.
+ """
+ with self._condition:
+ if self._state in [RUNNING, FINISHED]:
+ return False
+
+ if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED]:
+ self._state = CANCELLED
+ self._condition.notify_all()
+ return True
+
+ def cancelled(self):
+ """Return True if the future has cancelled."""
+ with self._condition:
+ return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
+
+ def running(self):
+ with self._condition:
+ return self._state == RUNNING
+
+ def done(self):
+ """Return True of the future was cancelled or finished executing."""
+ with self._condition:
+ return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
+
+ def __get_result(self):
+ if self._exception:
+ raise self._exception
+ else:
+ return self._result
+
+ def result(self, timeout=None):
+ """Return the result of the call that the future represents.
+
+ Args:
+ timeout: The number of seconds to wait for the result if the future
+ isn't done. If None, then there is no limit on the wait time.
+
+ Returns:
+ The result of the call that the future represents.
+
+ Raises:
+ CancelledError: If the future was cancelled.
+ TimeoutError: If the future didn't finish executing before the given
+ timeout.
+ Exception: If the call raised then that exception will be raised.
+ """
+ with self._condition:
+ if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
+ raise CancelledError()
+ elif self._state == FINISHED:
+ return self.__get_result()
+
+ self._condition.wait(timeout)
+
+ if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
+ raise CancelledError()
+ elif self._state == FINISHED:
+ return self.__get_result()
+ else:
+ raise TimeoutError()
+
+ def exception(self, timeout=None):
+ """Return the exception raised by the call that the future represents.
+
+ Args:
+ timeout: The number of seconds to wait for the exception if the
+ future isn't done. If None, then there is no limit on the wait
+ time.
+
+ Returns:
+ The exception raised by the call that the future represents or None
+ if the call completed without raising.
+
+ Raises:
+ CancelledError: If the future was cancelled.
+ TimeoutError: If the future didn't finish executing before the given
+ timeout.
+ """
+
+ with self._condition:
+ if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
+ raise CancelledError()
+ elif self._state == FINISHED:
+ return self._exception
+
+ self._condition.wait(timeout)
+
+ if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
+ raise CancelledError()
+ elif self._state == FINISHED:
+ return self._exception
+ else:
+ raise TimeoutError()
+
+class _FirstCompletedWaitTracker(object):
+ def __init__(self):
+ self.event = threading.Event()
+
+ def add_result(self):
+ self.event.set()
+
+ def add_exception(self):
+ self.event.set()
+
+ def add_cancelled(self):
+ self.event.set()
+
+class _AllCompletedWaitTracker(object):
+ def __init__(self, pending_calls, stop_on_exception):
+ self.pending_calls = pending_calls
+ self.stop_on_exception = stop_on_exception
+ self.event = threading.Event()
+
+ def add_result(self):
+ self.pending_calls -= 1
+ if not self.pending_calls:
+ self.event.set()
+
+ def add_exception(self):
+ if self.stop_on_exception:
+ self.event.set()
+ else:
+ self.add_result()
+
+ def add_cancelled(self):
+ self.add_result()
+
+class ThreadEventSink(object):
+ def __init__(self):
+ self._condition = threading.Lock()
+ self._waiters = []
+
+ def add(self, e):
+ self._waiters.append(e)
+
+ def remove(self, e):
+ self._waiters.remove(e)
+
+ def add_result(self):
+ for waiter in self._waiters:
+ waiter.add_result()
+
+ def add_exception(self):
+ for waiter in self._waiters:
+ waiter.add_exception()
+
+ def add_cancelled(self):
+ for waiter in self._waiters:
+ waiter.add_cancelled()
+
+class FutureList(object):
+ def __init__(self, futures, event_sink):
+ """Initializes the FutureList. Should not be called by clients."""
+ self._futures = futures
+ self._event_sink = event_sink
+
+ def wait(self, timeout=None, return_when=ALL_COMPLETED):
+ """Wait for the futures in the list to complete.
+
+ Args:
+ timeout: The maximum number of seconds to wait. If None, then there
+ is no limit on the wait time.
+ return_when: Indicates when the method should return. The options
+ are:
+
+ FIRST_COMPLETED - Return when any future finishes or is
+ cancelled.
+ FIRST_EXCEPTION - Return when any future finishes by raising an
+ exception. If no future raises and exception
+ then it is equivalent to ALL_COMPLETED.
+ ALL_COMPLETED - Return when all futures finish or are cancelled.
+ RETURN_IMMEDIATELY - Return without waiting (this is not likely
+ to be a useful option but it is there to
+ be symmetrical with the
+ executor.run_to_futures() method.
+
+ Raises:
+ TimeoutError: If the wait condition wasn't satisfied before the
+ given timeout.
+ """
+ if return_when == RETURN_IMMEDIATELY:
+ return
+
+ with self._event_sink._condition:
+ # Make a quick exit if every future is already done. This check is
+ # necessary because, if every future is in the
+ # CANCELLED_AND_NOTIFIED or FINISHED state then the WaitTracker will
+ # never receive any
+ if all(f._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
+ for f in self):
+ return
+
+ if return_when == FIRST_COMPLETED:
+ completed_tracker = _FirstCompletedWaitTracker()
+ else:
+ # Calculate how many events are expected before every future
+ # is complete. This can be done without holding the futures'
+ # locks because a future cannot transition itself into either
+ # of the states being looked for.
+ pending_count = sum(
+ f._state not in [CANCELLED_AND_NOTIFIED, FINISHED]
+ for f in self)
+
+ if return_when == FIRST_EXCEPTION:
+ completed_tracker = _AllCompletedWaitTracker(
+ pending_count, stop_on_exception=True)
+ elif return_when == ALL_COMPLETED:
+ completed_tracker = _AllCompletedWaitTracker(
+ pending_count, stop_on_exception=False)
+
+ self._event_sink.add(completed_tracker)
+
+ try:
+ completed_tracker.event.wait(timeout)
+ finally:
+ self._event_sink.remove(completed_tracker)
+
+ def cancel(self, timeout=None):
+ """Cancel the futures in the list.
+
+ Args:
+ timeout: The maximum number of seconds to wait. If None, then there
+ is no limit on the wait time.
+
+ Raises:
+ TimeoutError: If all the futures were not finished before the
+ given timeout.
+ """
+ for f in self:
+ f.cancel()
+ self.wait(timeout=timeout, return_when=ALL_COMPLETED)
+ if any(not f.done() for f in self):
+ raise TimeoutError()
+
+ def has_running_futures(self):
+ return any(self.running_futures())
+
+ def has_cancelled_futures(self):
+ return any(self.cancelled_futures())
+
+ def has_done_futures(self):
+ return any(self.done_futures())
+
+ def has_successful_futures(self):
+ return any(self.successful_futures())
+
+ def has_exception_futures(self):
+ return any(self.exception_futures())
+
+ def cancelled_futures(self):
+ return (f for f in self
+ if f._state in [CANCELLED, CANCELLED_AND_NOTIFIED])
+
+ def done_futures(self):
+ return (f for f in self
+ if f._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED])
+
+ def successful_futures(self):
+ return (f for f in self
+ if f._state == FINISHED and f._exception is None)
+
+ def exception_futures(self):
+ return (f for f in self
+ if f._state == FINISHED and f._exception is not None)
+
+ def running_futures(self):
+ return (f for f in self if f._state == RUNNING)
+
+ def __len__(self):
+ return len(self._futures)
+
+ def __getitem__(self, i):
+ return self._futures[i]
+
+ def __iter__(self):
+ return iter(self._futures)
+
+ def __contains__(self, future):
+ return future in self._futures
+
+ def __repr__(self):
+ states = {state: 0 for state in FUTURE_STATES}
+ for f in self:
+ states[f._state] += 1
+
+ return ('<FutureList #futures=%d '
+ '[#pending=%d #cancelled=%d #running=%d #finished=%d]>' % (
+ len(self),
+ states[PENDING],
+ states[CANCELLED] + states[CANCELLED_AND_NOTIFIED],
+ states[RUNNING],
+ states[FINISHED]))
+
+class Executor(object):
+ def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED):
+ """Return a list of futures representing the given calls.
+
+ Args:
+ calls: A sequence of callables that take no arguments. These will
+ be bound to Futures and returned.
+ timeout: The maximum number of seconds to wait. If None, then there
+ is no limit on the wait time.
+ return_when: Indicates when the method should return. The options
+ are:
+
+ FIRST_COMPLETED - Return when any future finishes or is
+ cancelled.
+ FIRST_EXCEPTION - Return when any future finishes by raising an
+ exception. If no future raises and exception
+ then it is equivalent to ALL_COMPLETED.
+ ALL_COMPLETED - Return when all futures finish or are cancelled.
+ RETURN_IMMEDIATELY - Return without waiting.
+
+ Returns:
+ A FuturesList containing futures for the given calls.
+ """
+ raise NotImplementedError()
+
+ def run_to_results(self, calls, timeout=None):
+ """Returns a iterator of the results of the given calls.
+
+ Args:
+ calls: A sequence of callables that take no arguments. These will
+ be called and their results returned.
+ timeout: The maximum number of seconds to wait. If None, then there
+ is no limit on the wait time.
+
+ Returns:
+ An iterator over the results of the given calls. Equivalent to:
+ (call() for call in calls) but the calls may be evaluated
+ out-of-order.
+
+ Raises:
+ TimeoutError: If all the given calls were not completed before the
+ given timeout.
+ Exception: If any call() raises.
+ """
+ if timeout is not None:
+ end_time = timeout + time.time()
+
+ fs = self.run_to_futures(calls, return_when=RETURN_IMMEDIATELY)
+
+ try:
+ for future in fs:
+ if timeout is None:
+ yield future.result()
+ else:
+ yield future.result(end_time - time.time())
+ finally:
+ try:
+ fs.cancel(timeout=0)
+ except TimeoutError:
+ pass
+
+ def map(self, func, *iterables, timeout=None):
+ """Returns a iterator equivalent to map(fn, iter).
+
+ Args:
+ func: A callable that will take take as many arguments as there
+ are passed iterables.
+ timeout: The maximum number of seconds to wait. If None, then there
+ is no limit on the wait time.
+
+ Returns:
+ An iterator equivalent to: map(func, *iterables) but the calls may
+ be evaluated out-of-order.
+
+ Raises:
+ TimeoutError: If the entire result iterator could not be generated
+ before the given timeout.
+ Exception: If fn(*args) raises for any values.
+ """
+ calls = [functools.partial(func, *args) for args in zip(*iterables)]
+ return self.run_to_results(calls, timeout)
+
+ def shutdown(self):
+ """Clean-up. No other methods can be called afterwards."""
+ raise NotImplementedError()
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.shutdown()
+ return False