summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbrian.quinlan <devnull@localhost>2009-10-04 23:10:28 +0000
committerbrian.quinlan <devnull@localhost>2009-10-04 23:10:28 +0000
commit8fc3d853c2f2c0b160561be3ef55cd1d763d9a74 (patch)
tree67d08cf0ecb6736524c51699b95059adb47391b4
parent35a66ee20837957a6247602fbc653e0857c0b531 (diff)
downloadfutures-8fc3d853c2f2c0b160561be3ef55cd1d763d9a74.tar.gz
Large module documentation cleanup
-rw-r--r--python2/futures/_base.py194
-rw-r--r--python2/futures/thread.py15
-rw-r--r--python3/futures/_base.py200
-rw-r--r--python3/futures/thread.py10
4 files changed, 275 insertions, 144 deletions
diff --git a/python2/futures/_base.py b/python2/futures/_base.py
index e90c288..4c50064 100644
--- a/python2/futures/_base.py
+++ b/python2/futures/_base.py
@@ -38,19 +38,21 @@ except NameError:
return False
return True
-FIRST_COMPLETED = 0
-FIRST_EXCEPTION = 1
-ALL_COMPLETED = 2
-RETURN_IMMEDIATELY = 3
-
-# Possible future states
-PENDING = 0
-RUNNING = 1
-CANCELLED = 2 # The future was cancelled...
-CANCELLED_AND_NOTIFIED = 3 # ...and .add_cancelled() was called.
-FINISHED = 4
-
-FUTURE_STATES = [
+FIRST_COMPLETED = 'FIRST_COMPLETED'
+FIRST_EXCEPTION = 'FIRST_EXCEPTION'
+ALL_COMPLETED = 'ALL_COMPLETED'
+RETURN_IMMEDIATELY = 'RETURN_IMMEDIATELY'
+
+# Possible future states (for internal use by the futures package).
+PENDING = 'PENDING'
+RUNNING = 'RUNNING'
+# The future was cancelled by the user...
+CANCELLED = 'CANCELLED'
+# ...and ThreadEventSink.add_cancelled() was called by a worker.
+CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED'
+FINISHED = 'FINISHED'
+
+_FUTURE_STATES = [
PENDING,
RUNNING,
CANCELLED,
@@ -66,12 +68,24 @@ _STATE_TO_DESCRIPTION_MAP = {
FINISHED: "finished"
}
+# Logger for internal use by the futures package.
LOGGER = logging.getLogger("futures")
_handler = logging.StreamHandler()
LOGGER.addHandler(_handler)
del _handler
def set_future_exception(future, event_sink, exception):
+ """Sets a future as having terminated with an exception.
+
+ This function should only be used within the futures package.
+
+ Args:
+ future: The Future that finished with an exception.
+ event_sink: The ThreadEventSink accociated with the Future's FutureList.
+ The event_sink will be notified of the Future's completion, which
+ may unblock some clients that have called FutureList.wait().
+ exception: The expection that executing the Future raised.
+ """
future._condition.acquire()
try:
future._exception = exception
@@ -87,6 +101,17 @@ def set_future_exception(future, event_sink, exception):
future._condition.release()
def set_future_result(future, event_sink, result):
+ """Sets a future as having terminated without exception.
+
+ This function should only be used within the futures package.
+
+ Args:
+ future: The Future that completed.
+ event_sink: The ThreadEventSink accociated with the Future's FutureList.
+ The event_sink will be notified of the Future's completion, which
+ may unblock some clients that have called FutureList.wait().
+ result: The value returned by the Future.
+ """
future._condition.acquire()
try:
future._result = result
@@ -110,6 +135,88 @@ class CancelledError(Error):
class TimeoutError(Error):
pass
+class _WaitTracker(object):
+ """Provides the event that FutureList.wait(...) blocks on.
+
+ """
+ def __init__(self):
+ self.event = threading.Event()
+
+ def add_result(self):
+ raise NotImplementedError()
+
+ def add_exception(self):
+ raise NotImplementedError()
+
+ def add_cancelled(self):
+ raise NotImplementedError()
+
+class _FirstCompletedWaitTracker(_WaitTracker):
+ """Used by wait(return_when=FIRST_COMPLETED)."""
+
+ def add_result(self):
+ self.event.set()
+
+ def add_exception(self):
+ self.event.set()
+
+ def add_cancelled(self):
+ self.event.set()
+
+class _AllCompletedWaitTracker(_WaitTracker):
+ """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED)."""
+
+ def __init__(self, num_pending_calls, stop_on_exception):
+ self.num_pending_calls = num_pending_calls
+ self.stop_on_exception = stop_on_exception
+ _WaitTracker.__init__(self)
+
+ def add_result(self):
+ self.num_pending_calls -= 1
+ if not self.num_pending_calls:
+ self.event.set()
+
+ def add_exception(self):
+ if self.stop_on_exception:
+ self.event.set()
+ else:
+ self.add_result()
+
+ def add_cancelled(self):
+ self.add_result()
+
+class ThreadEventSink(object):
+ """Forwards events to many _WaitTrackers.
+
+ Each FutureList has a ThreadEventSink and each call to FutureList.wait()
+ causes a new _WaitTracker to be added to the ThreadEventSink. This design
+ allows many threads to call FutureList.wait() on the same FutureList with
+ different arguments.
+
+ This class should not be used by clients.
+ """
+ def __init__(self):
+ self._condition = threading.Lock()
+ self._waiters = []
+
+ def add(self, e):
+ self._waiters.append(e)
+
+ def remove(self, e):
+ self._waiters.remove(e)
+
+ def add_result(self):
+ for waiter in self._waiters:
+ waiter.add_result()
+
+ def add_exception(self):
+ for waiter in self._waiters:
+ waiter.add_exception()
+
+ def add_cancelled(self):
+ for waiter in self._waiters:
+ waiter.add_cancelled()
+
class Future(object):
"""Represents the result of an asynchronous computation."""
@@ -259,62 +366,6 @@ class Future(object):
finally:
self._condition.release()
-class _FirstCompletedWaitTracker(object):
- def __init__(self):
- self.event = threading.Event()
-
- def add_result(self):
- self.event.set()
-
- def add_exception(self):
- self.event.set()
-
- def add_cancelled(self):
- self.event.set()
-
-class _AllCompletedWaitTracker(object):
- def __init__(self, pending_calls, stop_on_exception):
- self.pending_calls = pending_calls
- self.stop_on_exception = stop_on_exception
- self.event = threading.Event()
-
- def add_result(self):
- self.pending_calls -= 1
- if not self.pending_calls:
- self.event.set()
-
- def add_exception(self):
- if self.stop_on_exception:
- self.event.set()
- else:
- self.add_result()
-
- def add_cancelled(self):
- self.add_result()
-
-class ThreadEventSink(object):
- def __init__(self):
- self._condition = threading.Lock()
- self._waiters = []
-
- def add(self, e):
- self._waiters.append(e)
-
- def remove(self, e):
- self._waiters.remove(e)
-
- def add_result(self):
- for waiter in self._waiters:
- waiter.add_result()
-
- def add_exception(self):
- for waiter in self._waiters:
- waiter.add_exception()
-
- def add_cancelled(self):
- for waiter in self._waiters:
- waiter.add_cancelled()
-
class FutureList(object):
def __init__(self, futures, event_sink):
"""Initializes the FutureList. Should not be called by clients."""
@@ -348,12 +399,13 @@ class FutureList(object):
if return_when == RETURN_IMMEDIATELY:
return
+ # Futures cannot change state without this condition being held.
self._event_sink._condition.acquire()
try:
# Make a quick exit if every future is already done. This check is
# necessary because, if every future is in the
# CANCELLED_AND_NOTIFIED or FINISHED state then the WaitTracker will
- # never receive any
+ # never receive any events.
if all(f._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
for f in self):
return
@@ -459,7 +511,7 @@ class FutureList(object):
return future in self._futures
def __repr__(self):
- states = dict([(state, 0) for state in FUTURE_STATES])
+ states = dict([(state, 0) for state in _FUTURE_STATES])
for f in self:
states[f._state] += 1
diff --git a/python2/futures/thread.py b/python2/futures/thread.py
index 77a75d0..4928342 100644
--- a/python2/futures/thread.py
+++ b/python2/futures/thread.py
@@ -26,8 +26,8 @@ import weakref
# writing to a file.
#
# To work around this problem, an exit handler is installed which tells the
-# workers to exit when their work queues are empty and then waits until the
-# threads finish.
+# workers to exit when their work queues are empty and then waits until they
+# finish.
_thread_references = set() # Weakrefs to every active worker thread.
_shutdown = False # Indicates that the interpreter is shutting down.
@@ -43,7 +43,8 @@ def _python_exit():
def _remove_dead_thread_references():
"""Remove inactive threads from _thread_references.
- Should be called periodically to prevent memory leaks in scenarios such as:
+ Should be called periodically to prevent thread objects from accumulating in
+ scenarios such as:
>>> while True:
>>> ... t = ThreadPoolExecutor(max_threads=5)
>>> ... t.map(int, ['1', '2', '3', '4', '5'])
@@ -109,6 +110,12 @@ def _worker(executor_reference, work_queue):
class ThreadPoolExecutor(Executor):
def __init__(self, max_threads):
+ """Initializes a new ThreadPoolExecutor instance.
+
+ Args:
+ max_threads: The maximum number of threads that can be used to
+ execute the given calls.
+ """
_remove_dead_thread_references()
self._max_threads = max_threads
@@ -147,6 +154,7 @@ class ThreadPoolExecutor(Executor):
return fl
finally:
self._shutdown_lock.release()
+ run_to_futures.__doc__ = Executor.run_to_futures.__doc__
def shutdown(self):
self._shutdown_lock.acquire()
@@ -154,3 +162,4 @@ class ThreadPoolExecutor(Executor):
self._shutdown = True
finally:
self._shutdown_lock.release()
+ shutdown.__doc__ = Executor.shutdown.__doc__
diff --git a/python3/futures/_base.py b/python3/futures/_base.py
index 21bae32..ad338ae 100644
--- a/python3/futures/_base.py
+++ b/python3/futures/_base.py
@@ -7,19 +7,21 @@ import logging
import threading
import time
-FIRST_COMPLETED = 0
-FIRST_EXCEPTION = 1
-ALL_COMPLETED = 2
-RETURN_IMMEDIATELY = 3
-
-# Possible future states
-PENDING = 0
-RUNNING = 1
-CANCELLED = 2 # The future was cancelled...
-CANCELLED_AND_NOTIFIED = 3 # ...and .add_cancelled() was called.
-FINISHED = 4
-
-FUTURE_STATES = [
+FIRST_COMPLETED = 'FIRST_COMPLETED'
+FIRST_EXCEPTION = 'FIRST_EXCEPTION'
+ALL_COMPLETED = 'ALL_COMPLETED'
+RETURN_IMMEDIATELY = 'RETURN_IMMEDIATELY'
+
+# Possible future states (for internal use by the futures package).
+PENDING = 'PENDING'
+RUNNING = 'RUNNING'
+# The future was cancelled by the user...
+CANCELLED = 'CANCELLED'
+# ...and ThreadEventSink.add_cancelled() was called by a worker.
+CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED'
+FINISHED = 'FINISHED'
+
+_FUTURE_STATES = [
PENDING,
RUNNING,
CANCELLED,
@@ -35,12 +37,24 @@ _STATE_TO_DESCRIPTION_MAP = {
FINISHED: "finished"
}
+# Logger for internal use by the futures package.
LOGGER = logging.getLogger("futures")
_handler = logging.StreamHandler()
LOGGER.addHandler(_handler)
del _handler
def set_future_exception(future, event_sink, exception):
+ """Sets a future as having terminated with an exception.
+
+ This function should only be used within the futures package.
+
+ Args:
+ future: The Future that finished with an exception.
+ event_sink: The ThreadEventSink accociated with the Future's FutureList.
+ The event_sink will be notified of the Future's completion, which
+ may unblock some clients that have called FutureList.wait().
+ exception: The expection that executing the Future raised.
+ """
with future._condition:
future._exception = exception
with event_sink._condition:
@@ -49,6 +63,17 @@ def set_future_exception(future, event_sink, exception):
future._condition.notify_all()
def set_future_result(future, event_sink, result):
+ """Sets a future as having terminated without exception.
+
+ This function should only be used within the futures package.
+
+ Args:
+ future: The Future that completed.
+ event_sink: The ThreadEventSink accociated with the Future's FutureList.
+ The event_sink will be notified of the Future's completion, which
+ may unblock some clients that have called FutureList.wait().
+ result: The value returned by the Future.
+ """
with future._condition:
future._result = result
with event_sink._condition:
@@ -65,9 +90,98 @@ class CancelledError(Error):
class TimeoutError(Error):
pass
+class _WaitTracker(object):
+ """Provides the event that FutureList.wait(...) blocks on.
+
+ """
+ def __init__(self):
+ self.event = threading.Event()
+
+ def add_result(self):
+ raise NotImplementedError()
+
+ def add_exception(self):
+ raise NotImplementedError()
+
+ def add_cancelled(self):
+ raise NotImplementedError()
+
+class _FirstCompletedWaitTracker(_WaitTracker):
+ """Used by wait(return_when=FIRST_COMPLETED)."""
+
+ def add_result(self):
+ self.event.set()
+
+ def add_exception(self):
+ self.event.set()
+
+ def add_cancelled(self):
+ self.event.set()
+
+class _AllCompletedWaitTracker(_WaitTracker):
+ """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED)."""
+
+ def __init__(self, num_pending_calls, stop_on_exception):
+ self.num_pending_calls = num_pending_calls
+ self.stop_on_exception = stop_on_exception
+ super().__init__()
+
+ def add_result(self):
+ self.num_pending_calls -= 1
+ if not self.num_pending_calls:
+ self.event.set()
+
+ def add_exception(self):
+ if self.stop_on_exception:
+ self.event.set()
+ else:
+ self.add_result()
+
+ def add_cancelled(self):
+ self.add_result()
+
+class ThreadEventSink(object):
+ """Forwards events to many _WaitTrackers.
+
+ Each FutureList has a ThreadEventSink and each call to FutureList.wait()
+ causes a new _WaitTracker to be added to the ThreadEventSink. This design
+ allows many threads to call FutureList.wait() on the same FutureList with
+ different arguments.
+
+ This class should not be used by clients.
+ """
+ def __init__(self):
+ self._condition = threading.Lock()
+ self._waiters = []
+
+ def add(self, e):
+ self._waiters.append(e)
+
+ def remove(self, e):
+ self._waiters.remove(e)
+
+ def add_result(self):
+ for waiter in self._waiters:
+ waiter.add_result()
+
+ def add_exception(self):
+ for waiter in self._waiters:
+ waiter.add_exception()
+
+ def add_cancelled(self):
+ for waiter in self._waiters:
+ waiter.add_cancelled()
+
class Future(object):
"""Represents the result of an asynchronous computation."""
+ # Transitions into the CANCELLED_AND_NOTIFIED and FINISHED states trigger notifications to the ThreadEventSink
+ # belonging to the Future's FutureList and must be made with ThreadEventSink._condition held to prevent a race
+ # condition when the transition is made concurrently with the addition of a new _WaitTracker to the ThreadEventSink.
+ # Other state transitions need only have the Future._condition held.
+ # When ThreadEventSink._condition and Future._condition must both be held then Future._condition is always acquired
+ # first.
+
def __init__(self, index):
"""Initializes the future. Should not be called by clients."""
self._condition = threading.Condition()
@@ -193,61 +307,6 @@ class Future(object):
else:
raise TimeoutError()
-class _FirstCompletedWaitTracker(object):
- def __init__(self):
- self.event = threading.Event()
-
- def add_result(self):
- self.event.set()
-
- def add_exception(self):
- self.event.set()
-
- def add_cancelled(self):
- self.event.set()
-
-class _AllCompletedWaitTracker(object):
- def __init__(self, pending_calls, stop_on_exception):
- self.pending_calls = pending_calls
- self.stop_on_exception = stop_on_exception
- self.event = threading.Event()
-
- def add_result(self):
- self.pending_calls -= 1
- if not self.pending_calls:
- self.event.set()
-
- def add_exception(self):
- if self.stop_on_exception:
- self.event.set()
- else:
- self.add_result()
-
- def add_cancelled(self):
- self.add_result()
-
-class ThreadEventSink(object):
- def __init__(self):
- self._condition = threading.Lock()
- self._waiters = []
-
- def add(self, e):
- self._waiters.append(e)
-
- def remove(self, e):
- self._waiters.remove(e)
-
- def add_result(self):
- for waiter in self._waiters:
- waiter.add_result()
-
- def add_exception(self):
- for waiter in self._waiters:
- waiter.add_exception()
-
- def add_cancelled(self):
- for waiter in self._waiters:
- waiter.add_cancelled()
class FutureList(object):
def __init__(self, futures, event_sink):
@@ -282,11 +341,12 @@ class FutureList(object):
if return_when == RETURN_IMMEDIATELY:
return
+ # Futures cannot change state without this condition being held.
with self._event_sink._condition:
# Make a quick exit if every future is already done. This check is
# necessary because, if every future is in the
# CANCELLED_AND_NOTIFIED or FINISHED state then the WaitTracker will
- # never receive any
+ # never receive any events.
if all(f._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
for f in self):
return
@@ -390,7 +450,7 @@ class FutureList(object):
return future in self._futures
def __repr__(self):
- states = {state: 0 for state in FUTURE_STATES}
+ states = {state: 0 for state in _FUTURE_STATES}
for f in self:
states[f._state] += 1
diff --git a/python3/futures/thread.py b/python3/futures/thread.py
index 9292863..286d08b 100644
--- a/python3/futures/thread.py
+++ b/python3/futures/thread.py
@@ -103,6 +103,14 @@ def _worker(executor_reference, work_queue):
class ThreadPoolExecutor(Executor):
def __init__(self, max_threads):
+ """Initializes a new ThreadPoolExecutor instance.
+
+ Args:
+ max_threads: The maximum number of threads that can be used to
+ execute the given calls.
+ """
+ _remove_dead_thread_references()
+
self._max_threads = max_threads
self._work_queue = queue.Queue()
self._threads = set()
@@ -136,7 +144,9 @@ class ThreadPoolExecutor(Executor):
fl = FutureList(futures, event_sink)
fl.wait(timeout=timeout, return_when=return_when)
return fl
+ run_to_futures.__doc__ = Executor.run_to_futures.__doc__
def shutdown(self):
with self._shutdown_lock:
self._shutdown = True
+ shutdown.__doc__ = Executor.shutdown.__doc__