summaryrefslogtreecommitdiff
path: root/futures/_base.py
diff options
context:
space:
mode:
Diffstat (limited to 'futures/_base.py')
-rw-r--r--futures/_base.py156
1 files changed, 140 insertions, 16 deletions
diff --git a/futures/_base.py b/futures/_base.py
index 1debfbb..01dafd3 100644
--- a/futures/_base.py
+++ b/futures/_base.py
@@ -52,14 +52,20 @@ def set_future_result(future, event_sink, result):
event_sink.add_result()
future._condition.notify_all()
-class CancelledError(Exception):
+class Error(Exception):
pass
-class TimeoutError(Exception):
+class CancelledError(Error):
+ pass
+
+class TimeoutError(Error):
pass
class Future(object):
+ """Represents the result of an asynchronous computation."""
+
def __init__(self):
+ """Initializes the Future. Should not be called by clients."""
self._condition = threading.Condition()
self._state = PENDING
self._result = None
@@ -79,6 +85,11 @@ class Future(object):
return '<Future state=%s>' % _STATE_TO_DESCRIPTION_MAP[self._state]
def cancel(self):
+ """Cancel the future if possible.
+
+ Returns True if the future was cancelled, False otherwise. A future
+ cannot be cancelled if it is running or has already completed.
+ """
with self._condition:
if self._state in [RUNNING, FINISHED]:
return False
@@ -89,10 +100,12 @@ class Future(object):
return True
def cancelled(self):
+ """Return True if the future has cancelled."""
with self._condition:
return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
def done(self):
+ """Return True of the future was cancelled or finished executing."""
with self._condition:
return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
@@ -103,6 +116,21 @@ class Future(object):
return self._result
def result(self, timeout=None):
+ """Return the result of the call that the future represents.
+
+ Args:
+ timeout: The number of seconds to wait for the result if the future
+ isn't done. If None, then there is no limit on the wait time.
+
+ Returns:
+ The result of the call that the future represents.
+
+ Raises:
+ CancelledError: If the future was cancelled.
+ TimeoutError: If the future didn't finish executing before the given
+ timeout.
+ Exception: If the call raised then that exception will be raised.
+ """
with self._condition:
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
@@ -119,6 +147,23 @@ class Future(object):
raise TimeoutError()
def exception(self, timeout=None):
+ """Return the exception raised by the call that the future represents.
+
+ Args:
+ timeout: The number of seconds to wait for the exception if the
+ future isn't done. If None, then there is no limit on the wait
+ time.
+
+ Returns:
+ The exception raised by the call that the future represents or None
+ if the call completed without raising.
+
+ Raises:
+ CancelledError: If the future was cancelled.
+ TimeoutError: If the future didn't finish executing before the given
+ timeout.
+ """
+
with self._condition:
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
@@ -192,10 +237,34 @@ class ThreadEventSink(object):
class FutureList(object):
def __init__(self, futures, event_sink):
+ """Initializes the FutureList. Should not be called by clients."""
self._futures = futures
self._event_sink = event_sink
def wait(self, timeout=None, return_when=ALL_COMPLETED):
+ """Wait for the futures in the list to complete.
+
+ Args:
+ timeout: The maximum number of seconds to wait. If None, then there
+ is no limit on the wait time.
+ return_when: Indicates when the method should return. The options
+ are:
+
+ FIRST_COMPLETED - Return when any future finishes or is
+ cancelled.
+ FIRST_EXCEPTION - Return when any future finishes by raising an
+ exception. If no future raises and exception
+ then it is equivalent to ALL_COMPLETED.
+ ALL_COMPLETED - Return when all futures finish or are cancelled.
+ RETURN_IMMEDIATELY - Return without waiting (this is not likely
+ to be a useful option but it is there to
+ be symmetrical with the
+ executor.run_to_futures() method.
+
+ Raises:
+ TimeoutError: If the wait condition wasn't satisfied before the
+ given timeout.
+ """
if return_when == RETURN_IMMEDIATELY:
return
@@ -234,6 +303,16 @@ class FutureList(object):
self._event_sink.remove(completed_tracker)
def cancel(self, timeout=None):
+ """Cancel the futures in the list.
+
+ Args:
+ timeout: The maximum number of seconds to wait. If None, then there
+ is no limit on the wait time.
+
+ Raises:
+ TimeoutError: If all the futures were not finished before the
+ given timeout.
+ """
for f in self:
f.cancel()
self.wait(timeout=timeout, return_when=ALL_COMPLETED)
@@ -303,28 +382,56 @@ class FutureList(object):
states[FINISHED]))
class Executor(object):
- def run(self, calls, timeout=None, return_when=ALL_COMPLETED):
+ def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED):
+ """Return a list of futures representing the given calls.
+
+ Args:
+ calls: A sequence of callables that take no arguments. These will
+ be bound to Futures and returned.
+ timeout: The maximum number of seconds to wait. If None, then there
+ is no limit on the wait time.
+ return_when: Indicates when the method should return. The options
+ are:
+
+ FIRST_COMPLETED - Return when any future finishes or is
+ cancelled.
+ FIRST_EXCEPTION - Return when any future finishes by raising an
+ exception. If no future raises and exception
+ then it is equivalent to ALL_COMPLETED.
+ ALL_COMPLETED - Return when all futures finish or are cancelled.
+ RETURN_IMMEDIATELY - Return without waiting (this is not likely
+ to be a useful option but it is there to
+ be symmetrical with the
+ executor.run_to_futures() method.
+
+ Returns:
+ A FuturesList containing futures for the given calls.
+ """
raise NotImplementedError()
- def runXXX(self, calls, timeout=None):
- """Execute the given calls and
+ def run_to_results(self, calls, timeout=None):
+ """Returns a iterator of the results of the given calls.
- Arguments:
- calls: A sequence of functions that will be called without arguments
- and whose results with be returned.
- timeout: The maximum number of seconds to wait for the complete results.
- None indicates that there is no timeout.
+ Args:
+ calls: A sequence of callables that take no arguments. These will
+ be called and their results returned.
+ timeout: The maximum number of seconds to wait. If None, then there
+ is no limit on the wait time.
- Yields:
- The results of the given calls in the order that they are given.
+ Returns:
+ An iterator over the results of the given calls. Equivalent to:
+ (call() for call in calls) but the calls may be evaluated
+ out-of-order.
- Exceptions:
- TimeoutError: if it takes more than timeout
+ Raises:
+ TimeoutError: If all the given calls were not completed before the
+ given timeout.
+ Exception: If any call() raises.
"""
if timeout is not None:
end_time = timeout + time.time()
- fs = self.run(calls, return_when=RETURN_IMMEDIATELY)
+ fs = self.run_to_futures(calls, return_when=RETURN_IMMEDIATELY)
try:
for future in fs:
@@ -339,10 +446,27 @@ class Executor(object):
pass
def map(self, fn, iter, timeout=None):
+ """Returns a iterator equivalent to map(fn, iter).
+
+ Args:
+ fn: A callable taking a single argument.
+ timeout: The maximum number of seconds to wait. If None, then there
+ is no limit on the wait time.
+
+ Returns:
+ An iterator equivalent to: map(fn, iter) but the calls may be
+ evaluated out-of-order.
+
+ Raises:
+ TimeoutError: If the entire result iterator could not be generated
+ before the given timeout.
+ Exception: If fn(x) raises for any x in iter.
+ """
calls = [functools.partial(fn, a) for a in iter]
- return self.runXXX(calls, timeout)
+ return self.run_to_results(calls, timeout)
def shutdown(self):
+ """Clean-up. No other methods can be called afterwards."""
raise NotImplementedError()
def __enter__(self):