diff options
Diffstat (limited to 'futures/_base.py')
-rw-r--r-- | futures/_base.py | 156 |
1 files changed, 140 insertions, 16 deletions
diff --git a/futures/_base.py b/futures/_base.py index 1debfbb..01dafd3 100644 --- a/futures/_base.py +++ b/futures/_base.py @@ -52,14 +52,20 @@ def set_future_result(future, event_sink, result): event_sink.add_result() future._condition.notify_all() -class CancelledError(Exception): +class Error(Exception): pass -class TimeoutError(Exception): +class CancelledError(Error): + pass + +class TimeoutError(Error): pass class Future(object): + """Represents the result of an asynchronous computation.""" + def __init__(self): + """Initializes the Future. Should not be called by clients.""" self._condition = threading.Condition() self._state = PENDING self._result = None @@ -79,6 +85,11 @@ class Future(object): return '<Future state=%s>' % _STATE_TO_DESCRIPTION_MAP[self._state] def cancel(self): + """Cancel the future if possible. + + Returns True if the future was cancelled, False otherwise. A future + cannot be cancelled if it is running or has already completed. + """ with self._condition: if self._state in [RUNNING, FINISHED]: return False @@ -89,10 +100,12 @@ class Future(object): return True def cancelled(self): + """Return True if the future has cancelled.""" with self._condition: return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED] def done(self): + """Return True of the future was cancelled or finished executing.""" with self._condition: return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] @@ -103,6 +116,21 @@ class Future(object): return self._result def result(self, timeout=None): + """Return the result of the call that the future represents. + + Args: + timeout: The number of seconds to wait for the result if the future + isn't done. If None, then there is no limit on the wait time. + + Returns: + The result of the call that the future represents. + + Raises: + CancelledError: If the future was cancelled. + TimeoutError: If the future didn't finish executing before the given + timeout. + Exception: If the call raised then that exception will be raised. + """ with self._condition: if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: raise CancelledError() @@ -119,6 +147,23 @@ class Future(object): raise TimeoutError() def exception(self, timeout=None): + """Return the exception raised by the call that the future represents. + + Args: + timeout: The number of seconds to wait for the exception if the + future isn't done. If None, then there is no limit on the wait + time. + + Returns: + The exception raised by the call that the future represents or None + if the call completed without raising. + + Raises: + CancelledError: If the future was cancelled. + TimeoutError: If the future didn't finish executing before the given + timeout. + """ + with self._condition: if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: raise CancelledError() @@ -192,10 +237,34 @@ class ThreadEventSink(object): class FutureList(object): def __init__(self, futures, event_sink): + """Initializes the FutureList. Should not be called by clients.""" self._futures = futures self._event_sink = event_sink def wait(self, timeout=None, return_when=ALL_COMPLETED): + """Wait for the futures in the list to complete. + + Args: + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + return_when: Indicates when the method should return. The options + are: + + FIRST_COMPLETED - Return when any future finishes or is + cancelled. + FIRST_EXCEPTION - Return when any future finishes by raising an + exception. If no future raises and exception + then it is equivalent to ALL_COMPLETED. + ALL_COMPLETED - Return when all futures finish or are cancelled. + RETURN_IMMEDIATELY - Return without waiting (this is not likely + to be a useful option but it is there to + be symmetrical with the + executor.run_to_futures() method. + + Raises: + TimeoutError: If the wait condition wasn't satisfied before the + given timeout. + """ if return_when == RETURN_IMMEDIATELY: return @@ -234,6 +303,16 @@ class FutureList(object): self._event_sink.remove(completed_tracker) def cancel(self, timeout=None): + """Cancel the futures in the list. + + Args: + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + + Raises: + TimeoutError: If all the futures were not finished before the + given timeout. + """ for f in self: f.cancel() self.wait(timeout=timeout, return_when=ALL_COMPLETED) @@ -303,28 +382,56 @@ class FutureList(object): states[FINISHED])) class Executor(object): - def run(self, calls, timeout=None, return_when=ALL_COMPLETED): + def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED): + """Return a list of futures representing the given calls. + + Args: + calls: A sequence of callables that take no arguments. These will + be bound to Futures and returned. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + return_when: Indicates when the method should return. The options + are: + + FIRST_COMPLETED - Return when any future finishes or is + cancelled. + FIRST_EXCEPTION - Return when any future finishes by raising an + exception. If no future raises and exception + then it is equivalent to ALL_COMPLETED. + ALL_COMPLETED - Return when all futures finish or are cancelled. + RETURN_IMMEDIATELY - Return without waiting (this is not likely + to be a useful option but it is there to + be symmetrical with the + executor.run_to_futures() method. + + Returns: + A FuturesList containing futures for the given calls. + """ raise NotImplementedError() - def runXXX(self, calls, timeout=None): - """Execute the given calls and + def run_to_results(self, calls, timeout=None): + """Returns a iterator of the results of the given calls. - Arguments: - calls: A sequence of functions that will be called without arguments - and whose results with be returned. - timeout: The maximum number of seconds to wait for the complete results. - None indicates that there is no timeout. + Args: + calls: A sequence of callables that take no arguments. These will + be called and their results returned. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. - Yields: - The results of the given calls in the order that they are given. + Returns: + An iterator over the results of the given calls. Equivalent to: + (call() for call in calls) but the calls may be evaluated + out-of-order. - Exceptions: - TimeoutError: if it takes more than timeout + Raises: + TimeoutError: If all the given calls were not completed before the + given timeout. + Exception: If any call() raises. """ if timeout is not None: end_time = timeout + time.time() - fs = self.run(calls, return_when=RETURN_IMMEDIATELY) + fs = self.run_to_futures(calls, return_when=RETURN_IMMEDIATELY) try: for future in fs: @@ -339,10 +446,27 @@ class Executor(object): pass def map(self, fn, iter, timeout=None): + """Returns a iterator equivalent to map(fn, iter). + + Args: + fn: A callable taking a single argument. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + + Returns: + An iterator equivalent to: map(fn, iter) but the calls may be + evaluated out-of-order. + + Raises: + TimeoutError: If the entire result iterator could not be generated + before the given timeout. + Exception: If fn(x) raises for any x in iter. + """ calls = [functools.partial(fn, a) for a in iter] - return self.runXXX(calls, timeout) + return self.run_to_results(calls, timeout) def shutdown(self): + """Clean-up. No other methods can be called afterwards.""" raise NotImplementedError() def __enter__(self): |