diff options
Diffstat (limited to 'python2/futures/_base.py')
-rw-r--r-- | python2/futures/_base.py | 194 |
1 files changed, 123 insertions, 71 deletions
diff --git a/python2/futures/_base.py b/python2/futures/_base.py index e90c288..4c50064 100644 --- a/python2/futures/_base.py +++ b/python2/futures/_base.py @@ -38,19 +38,21 @@ except NameError: return False return True -FIRST_COMPLETED = 0 -FIRST_EXCEPTION = 1 -ALL_COMPLETED = 2 -RETURN_IMMEDIATELY = 3 - -# Possible future states -PENDING = 0 -RUNNING = 1 -CANCELLED = 2 # The future was cancelled... -CANCELLED_AND_NOTIFIED = 3 # ...and .add_cancelled() was called. -FINISHED = 4 - -FUTURE_STATES = [ +FIRST_COMPLETED = 'FIRST_COMPLETED' +FIRST_EXCEPTION = 'FIRST_EXCEPTION' +ALL_COMPLETED = 'ALL_COMPLETED' +RETURN_IMMEDIATELY = 'RETURN_IMMEDIATELY' + +# Possible future states (for internal use by the futures package). +PENDING = 'PENDING' +RUNNING = 'RUNNING' +# The future was cancelled by the user... +CANCELLED = 'CANCELLED' +# ...and ThreadEventSink.add_cancelled() was called by a worker. +CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED' +FINISHED = 'FINISHED' + +_FUTURE_STATES = [ PENDING, RUNNING, CANCELLED, @@ -66,12 +68,24 @@ _STATE_TO_DESCRIPTION_MAP = { FINISHED: "finished" } +# Logger for internal use by the futures package. LOGGER = logging.getLogger("futures") _handler = logging.StreamHandler() LOGGER.addHandler(_handler) del _handler def set_future_exception(future, event_sink, exception): + """Sets a future as having terminated with an exception. + + This function should only be used within the futures package. + + Args: + future: The Future that finished with an exception. + event_sink: The ThreadEventSink accociated with the Future's FutureList. + The event_sink will be notified of the Future's completion, which + may unblock some clients that have called FutureList.wait(). + exception: The expection that executing the Future raised. + """ future._condition.acquire() try: future._exception = exception @@ -87,6 +101,17 @@ def set_future_exception(future, event_sink, exception): future._condition.release() def set_future_result(future, event_sink, result): + """Sets a future as having terminated without exception. + + This function should only be used within the futures package. + + Args: + future: The Future that completed. + event_sink: The ThreadEventSink accociated with the Future's FutureList. + The event_sink will be notified of the Future's completion, which + may unblock some clients that have called FutureList.wait(). + result: The value returned by the Future. + """ future._condition.acquire() try: future._result = result @@ -110,6 +135,88 @@ class CancelledError(Error): class TimeoutError(Error): pass +class _WaitTracker(object): + """Provides the event that FutureList.wait(...) blocks on. + + """ + def __init__(self): + self.event = threading.Event() + + def add_result(self): + raise NotImplementedError() + + def add_exception(self): + raise NotImplementedError() + + def add_cancelled(self): + raise NotImplementedError() + +class _FirstCompletedWaitTracker(_WaitTracker): + """Used by wait(return_when=FIRST_COMPLETED).""" + + def add_result(self): + self.event.set() + + def add_exception(self): + self.event.set() + + def add_cancelled(self): + self.event.set() + +class _AllCompletedWaitTracker(_WaitTracker): + """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED).""" + + def __init__(self, num_pending_calls, stop_on_exception): + self.num_pending_calls = num_pending_calls + self.stop_on_exception = stop_on_exception + _WaitTracker.__init__(self) + + def add_result(self): + self.num_pending_calls -= 1 + if not self.num_pending_calls: + self.event.set() + + def add_exception(self): + if self.stop_on_exception: + self.event.set() + else: + self.add_result() + + def add_cancelled(self): + self.add_result() + +class ThreadEventSink(object): + """Forwards events to many _WaitTrackers. + + Each FutureList has a ThreadEventSink and each call to FutureList.wait() + causes a new _WaitTracker to be added to the ThreadEventSink. This design + allows many threads to call FutureList.wait() on the same FutureList with + different arguments. + + This class should not be used by clients. + """ + def __init__(self): + self._condition = threading.Lock() + self._waiters = [] + + def add(self, e): + self._waiters.append(e) + + def remove(self, e): + self._waiters.remove(e) + + def add_result(self): + for waiter in self._waiters: + waiter.add_result() + + def add_exception(self): + for waiter in self._waiters: + waiter.add_exception() + + def add_cancelled(self): + for waiter in self._waiters: + waiter.add_cancelled() + class Future(object): """Represents the result of an asynchronous computation.""" @@ -259,62 +366,6 @@ class Future(object): finally: self._condition.release() -class _FirstCompletedWaitTracker(object): - def __init__(self): - self.event = threading.Event() - - def add_result(self): - self.event.set() - - def add_exception(self): - self.event.set() - - def add_cancelled(self): - self.event.set() - -class _AllCompletedWaitTracker(object): - def __init__(self, pending_calls, stop_on_exception): - self.pending_calls = pending_calls - self.stop_on_exception = stop_on_exception - self.event = threading.Event() - - def add_result(self): - self.pending_calls -= 1 - if not self.pending_calls: - self.event.set() - - def add_exception(self): - if self.stop_on_exception: - self.event.set() - else: - self.add_result() - - def add_cancelled(self): - self.add_result() - -class ThreadEventSink(object): - def __init__(self): - self._condition = threading.Lock() - self._waiters = [] - - def add(self, e): - self._waiters.append(e) - - def remove(self, e): - self._waiters.remove(e) - - def add_result(self): - for waiter in self._waiters: - waiter.add_result() - - def add_exception(self): - for waiter in self._waiters: - waiter.add_exception() - - def add_cancelled(self): - for waiter in self._waiters: - waiter.add_cancelled() - class FutureList(object): def __init__(self, futures, event_sink): """Initializes the FutureList. Should not be called by clients.""" @@ -348,12 +399,13 @@ class FutureList(object): if return_when == RETURN_IMMEDIATELY: return + # Futures cannot change state without this condition being held. self._event_sink._condition.acquire() try: # Make a quick exit if every future is already done. This check is # necessary because, if every future is in the # CANCELLED_AND_NOTIFIED or FINISHED state then the WaitTracker will - # never receive any + # never receive any events. if all(f._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] for f in self): return @@ -459,7 +511,7 @@ class FutureList(object): return future in self._futures def __repr__(self): - states = dict([(state, 0) for state in FUTURE_STATES]) + states = dict([(state, 0) for state in _FUTURE_STATES]) for f in self: states[f._state] += 1 |