summaryrefslogtreecommitdiff
path: root/python3/futures/_base.py
diff options
context:
space:
mode:
Diffstat (limited to 'python3/futures/_base.py')
-rw-r--r--python3/futures/_base.py200
1 files changed, 130 insertions, 70 deletions
diff --git a/python3/futures/_base.py b/python3/futures/_base.py
index 21bae32..ad338ae 100644
--- a/python3/futures/_base.py
+++ b/python3/futures/_base.py
@@ -7,19 +7,21 @@ import logging
import threading
import time
-FIRST_COMPLETED = 0
-FIRST_EXCEPTION = 1
-ALL_COMPLETED = 2
-RETURN_IMMEDIATELY = 3
-
-# Possible future states
-PENDING = 0
-RUNNING = 1
-CANCELLED = 2 # The future was cancelled...
-CANCELLED_AND_NOTIFIED = 3 # ...and .add_cancelled() was called.
-FINISHED = 4
-
-FUTURE_STATES = [
+FIRST_COMPLETED = 'FIRST_COMPLETED'
+FIRST_EXCEPTION = 'FIRST_EXCEPTION'
+ALL_COMPLETED = 'ALL_COMPLETED'
+RETURN_IMMEDIATELY = 'RETURN_IMMEDIATELY'
+
+# Possible future states (for internal use by the futures package).
+PENDING = 'PENDING'
+RUNNING = 'RUNNING'
+# The future was cancelled by the user...
+CANCELLED = 'CANCELLED'
+# ...and ThreadEventSink.add_cancelled() was called by a worker.
+CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED'
+FINISHED = 'FINISHED'
+
+_FUTURE_STATES = [
PENDING,
RUNNING,
CANCELLED,
@@ -35,12 +37,24 @@ _STATE_TO_DESCRIPTION_MAP = {
FINISHED: "finished"
}
+# Logger for internal use by the futures package.
LOGGER = logging.getLogger("futures")
_handler = logging.StreamHandler()
LOGGER.addHandler(_handler)
del _handler
def set_future_exception(future, event_sink, exception):
+ """Sets a future as having terminated with an exception.
+
+ This function should only be used within the futures package.
+
+ Args:
+ future: The Future that finished with an exception.
+ event_sink: The ThreadEventSink accociated with the Future's FutureList.
+ The event_sink will be notified of the Future's completion, which
+ may unblock some clients that have called FutureList.wait().
+ exception: The expection that executing the Future raised.
+ """
with future._condition:
future._exception = exception
with event_sink._condition:
@@ -49,6 +63,17 @@ def set_future_exception(future, event_sink, exception):
future._condition.notify_all()
def set_future_result(future, event_sink, result):
+ """Sets a future as having terminated without exception.
+
+ This function should only be used within the futures package.
+
+ Args:
+ future: The Future that completed.
+ event_sink: The ThreadEventSink accociated with the Future's FutureList.
+ The event_sink will be notified of the Future's completion, which
+ may unblock some clients that have called FutureList.wait().
+ result: The value returned by the Future.
+ """
with future._condition:
future._result = result
with event_sink._condition:
@@ -65,9 +90,98 @@ class CancelledError(Error):
class TimeoutError(Error):
pass
+class _WaitTracker(object):
+ """Provides the event that FutureList.wait(...) blocks on.
+
+ """
+ def __init__(self):
+ self.event = threading.Event()
+
+ def add_result(self):
+ raise NotImplementedError()
+
+ def add_exception(self):
+ raise NotImplementedError()
+
+ def add_cancelled(self):
+ raise NotImplementedError()
+
+class _FirstCompletedWaitTracker(_WaitTracker):
+ """Used by wait(return_when=FIRST_COMPLETED)."""
+
+ def add_result(self):
+ self.event.set()
+
+ def add_exception(self):
+ self.event.set()
+
+ def add_cancelled(self):
+ self.event.set()
+
+class _AllCompletedWaitTracker(_WaitTracker):
+ """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED)."""
+
+ def __init__(self, num_pending_calls, stop_on_exception):
+ self.num_pending_calls = num_pending_calls
+ self.stop_on_exception = stop_on_exception
+ super().__init__()
+
+ def add_result(self):
+ self.num_pending_calls -= 1
+ if not self.num_pending_calls:
+ self.event.set()
+
+ def add_exception(self):
+ if self.stop_on_exception:
+ self.event.set()
+ else:
+ self.add_result()
+
+ def add_cancelled(self):
+ self.add_result()
+
+class ThreadEventSink(object):
+ """Forwards events to many _WaitTrackers.
+
+ Each FutureList has a ThreadEventSink and each call to FutureList.wait()
+ causes a new _WaitTracker to be added to the ThreadEventSink. This design
+ allows many threads to call FutureList.wait() on the same FutureList with
+ different arguments.
+
+ This class should not be used by clients.
+ """
+ def __init__(self):
+ self._condition = threading.Lock()
+ self._waiters = []
+
+ def add(self, e):
+ self._waiters.append(e)
+
+ def remove(self, e):
+ self._waiters.remove(e)
+
+ def add_result(self):
+ for waiter in self._waiters:
+ waiter.add_result()
+
+ def add_exception(self):
+ for waiter in self._waiters:
+ waiter.add_exception()
+
+ def add_cancelled(self):
+ for waiter in self._waiters:
+ waiter.add_cancelled()
+
class Future(object):
"""Represents the result of an asynchronous computation."""
+ # Transitions into the CANCELLED_AND_NOTIFIED and FINISHED states trigger notifications to the ThreadEventSink
+ # belonging to the Future's FutureList and must be made with ThreadEventSink._condition held to prevent a race
+ # condition when the transition is made concurrently with the addition of a new _WaitTracker to the ThreadEventSink.
+ # Other state transitions need only have the Future._condition held.
+ # When ThreadEventSink._condition and Future._condition must both be held then Future._condition is always acquired
+ # first.
+
def __init__(self, index):
"""Initializes the future. Should not be called by clients."""
self._condition = threading.Condition()
@@ -193,61 +307,6 @@ class Future(object):
else:
raise TimeoutError()
-class _FirstCompletedWaitTracker(object):
- def __init__(self):
- self.event = threading.Event()
-
- def add_result(self):
- self.event.set()
-
- def add_exception(self):
- self.event.set()
-
- def add_cancelled(self):
- self.event.set()
-
-class _AllCompletedWaitTracker(object):
- def __init__(self, pending_calls, stop_on_exception):
- self.pending_calls = pending_calls
- self.stop_on_exception = stop_on_exception
- self.event = threading.Event()
-
- def add_result(self):
- self.pending_calls -= 1
- if not self.pending_calls:
- self.event.set()
-
- def add_exception(self):
- if self.stop_on_exception:
- self.event.set()
- else:
- self.add_result()
-
- def add_cancelled(self):
- self.add_result()
-
-class ThreadEventSink(object):
- def __init__(self):
- self._condition = threading.Lock()
- self._waiters = []
-
- def add(self, e):
- self._waiters.append(e)
-
- def remove(self, e):
- self._waiters.remove(e)
-
- def add_result(self):
- for waiter in self._waiters:
- waiter.add_result()
-
- def add_exception(self):
- for waiter in self._waiters:
- waiter.add_exception()
-
- def add_cancelled(self):
- for waiter in self._waiters:
- waiter.add_cancelled()
class FutureList(object):
def __init__(self, futures, event_sink):
@@ -282,11 +341,12 @@ class FutureList(object):
if return_when == RETURN_IMMEDIATELY:
return
+ # Futures cannot change state without this condition being held.
with self._event_sink._condition:
# Make a quick exit if every future is already done. This check is
# necessary because, if every future is in the
# CANCELLED_AND_NOTIFIED or FINISHED state then the WaitTracker will
- # never receive any
+ # never receive any events.
if all(f._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
for f in self):
return
@@ -390,7 +450,7 @@ class FutureList(object):
return future in self._futures
def __repr__(self):
- states = {state: 0 for state in FUTURE_STATES}
+ states = {state: 0 for state in _FUTURE_STATES}
for f in self:
states[f._state] += 1