diff options
author | brian.quinlan <devnull@localhost> | 2009-10-04 23:10:28 +0000 |
---|---|---|
committer | brian.quinlan <devnull@localhost> | 2009-10-04 23:10:28 +0000 |
commit | 8fc3d853c2f2c0b160561be3ef55cd1d763d9a74 (patch) | |
tree | 67d08cf0ecb6736524c51699b95059adb47391b4 | |
parent | 35a66ee20837957a6247602fbc653e0857c0b531 (diff) | |
download | futures-8fc3d853c2f2c0b160561be3ef55cd1d763d9a74.tar.gz |
Large module documentation cleanup
-rw-r--r-- | python2/futures/_base.py | 194 | ||||
-rw-r--r-- | python2/futures/thread.py | 15 | ||||
-rw-r--r-- | python3/futures/_base.py | 200 | ||||
-rw-r--r-- | python3/futures/thread.py | 10 |
4 files changed, 275 insertions, 144 deletions
diff --git a/python2/futures/_base.py b/python2/futures/_base.py index e90c288..4c50064 100644 --- a/python2/futures/_base.py +++ b/python2/futures/_base.py @@ -38,19 +38,21 @@ except NameError: return False return True -FIRST_COMPLETED = 0 -FIRST_EXCEPTION = 1 -ALL_COMPLETED = 2 -RETURN_IMMEDIATELY = 3 - -# Possible future states -PENDING = 0 -RUNNING = 1 -CANCELLED = 2 # The future was cancelled... -CANCELLED_AND_NOTIFIED = 3 # ...and .add_cancelled() was called. -FINISHED = 4 - -FUTURE_STATES = [ +FIRST_COMPLETED = 'FIRST_COMPLETED' +FIRST_EXCEPTION = 'FIRST_EXCEPTION' +ALL_COMPLETED = 'ALL_COMPLETED' +RETURN_IMMEDIATELY = 'RETURN_IMMEDIATELY' + +# Possible future states (for internal use by the futures package). +PENDING = 'PENDING' +RUNNING = 'RUNNING' +# The future was cancelled by the user... +CANCELLED = 'CANCELLED' +# ...and ThreadEventSink.add_cancelled() was called by a worker. +CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED' +FINISHED = 'FINISHED' + +_FUTURE_STATES = [ PENDING, RUNNING, CANCELLED, @@ -66,12 +68,24 @@ _STATE_TO_DESCRIPTION_MAP = { FINISHED: "finished" } +# Logger for internal use by the futures package. LOGGER = logging.getLogger("futures") _handler = logging.StreamHandler() LOGGER.addHandler(_handler) del _handler def set_future_exception(future, event_sink, exception): + """Sets a future as having terminated with an exception. + + This function should only be used within the futures package. + + Args: + future: The Future that finished with an exception. + event_sink: The ThreadEventSink accociated with the Future's FutureList. + The event_sink will be notified of the Future's completion, which + may unblock some clients that have called FutureList.wait(). + exception: The expection that executing the Future raised. + """ future._condition.acquire() try: future._exception = exception @@ -87,6 +101,17 @@ def set_future_exception(future, event_sink, exception): future._condition.release() def set_future_result(future, event_sink, result): + """Sets a future as having terminated without exception. + + This function should only be used within the futures package. + + Args: + future: The Future that completed. + event_sink: The ThreadEventSink accociated with the Future's FutureList. + The event_sink will be notified of the Future's completion, which + may unblock some clients that have called FutureList.wait(). + result: The value returned by the Future. + """ future._condition.acquire() try: future._result = result @@ -110,6 +135,88 @@ class CancelledError(Error): class TimeoutError(Error): pass +class _WaitTracker(object): + """Provides the event that FutureList.wait(...) blocks on. + + """ + def __init__(self): + self.event = threading.Event() + + def add_result(self): + raise NotImplementedError() + + def add_exception(self): + raise NotImplementedError() + + def add_cancelled(self): + raise NotImplementedError() + +class _FirstCompletedWaitTracker(_WaitTracker): + """Used by wait(return_when=FIRST_COMPLETED).""" + + def add_result(self): + self.event.set() + + def add_exception(self): + self.event.set() + + def add_cancelled(self): + self.event.set() + +class _AllCompletedWaitTracker(_WaitTracker): + """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED).""" + + def __init__(self, num_pending_calls, stop_on_exception): + self.num_pending_calls = num_pending_calls + self.stop_on_exception = stop_on_exception + _WaitTracker.__init__(self) + + def add_result(self): + self.num_pending_calls -= 1 + if not self.num_pending_calls: + self.event.set() + + def add_exception(self): + if self.stop_on_exception: + self.event.set() + else: + self.add_result() + + def add_cancelled(self): + self.add_result() + +class ThreadEventSink(object): + """Forwards events to many _WaitTrackers. + + Each FutureList has a ThreadEventSink and each call to FutureList.wait() + causes a new _WaitTracker to be added to the ThreadEventSink. This design + allows many threads to call FutureList.wait() on the same FutureList with + different arguments. + + This class should not be used by clients. + """ + def __init__(self): + self._condition = threading.Lock() + self._waiters = [] + + def add(self, e): + self._waiters.append(e) + + def remove(self, e): + self._waiters.remove(e) + + def add_result(self): + for waiter in self._waiters: + waiter.add_result() + + def add_exception(self): + for waiter in self._waiters: + waiter.add_exception() + + def add_cancelled(self): + for waiter in self._waiters: + waiter.add_cancelled() + class Future(object): """Represents the result of an asynchronous computation.""" @@ -259,62 +366,6 @@ class Future(object): finally: self._condition.release() -class _FirstCompletedWaitTracker(object): - def __init__(self): - self.event = threading.Event() - - def add_result(self): - self.event.set() - - def add_exception(self): - self.event.set() - - def add_cancelled(self): - self.event.set() - -class _AllCompletedWaitTracker(object): - def __init__(self, pending_calls, stop_on_exception): - self.pending_calls = pending_calls - self.stop_on_exception = stop_on_exception - self.event = threading.Event() - - def add_result(self): - self.pending_calls -= 1 - if not self.pending_calls: - self.event.set() - - def add_exception(self): - if self.stop_on_exception: - self.event.set() - else: - self.add_result() - - def add_cancelled(self): - self.add_result() - -class ThreadEventSink(object): - def __init__(self): - self._condition = threading.Lock() - self._waiters = [] - - def add(self, e): - self._waiters.append(e) - - def remove(self, e): - self._waiters.remove(e) - - def add_result(self): - for waiter in self._waiters: - waiter.add_result() - - def add_exception(self): - for waiter in self._waiters: - waiter.add_exception() - - def add_cancelled(self): - for waiter in self._waiters: - waiter.add_cancelled() - class FutureList(object): def __init__(self, futures, event_sink): """Initializes the FutureList. Should not be called by clients.""" @@ -348,12 +399,13 @@ class FutureList(object): if return_when == RETURN_IMMEDIATELY: return + # Futures cannot change state without this condition being held. self._event_sink._condition.acquire() try: # Make a quick exit if every future is already done. This check is # necessary because, if every future is in the # CANCELLED_AND_NOTIFIED or FINISHED state then the WaitTracker will - # never receive any + # never receive any events. if all(f._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] for f in self): return @@ -459,7 +511,7 @@ class FutureList(object): return future in self._futures def __repr__(self): - states = dict([(state, 0) for state in FUTURE_STATES]) + states = dict([(state, 0) for state in _FUTURE_STATES]) for f in self: states[f._state] += 1 diff --git a/python2/futures/thread.py b/python2/futures/thread.py index 77a75d0..4928342 100644 --- a/python2/futures/thread.py +++ b/python2/futures/thread.py @@ -26,8 +26,8 @@ import weakref # writing to a file. # # To work around this problem, an exit handler is installed which tells the -# workers to exit when their work queues are empty and then waits until the -# threads finish. +# workers to exit when their work queues are empty and then waits until they +# finish. _thread_references = set() # Weakrefs to every active worker thread. _shutdown = False # Indicates that the interpreter is shutting down. @@ -43,7 +43,8 @@ def _python_exit(): def _remove_dead_thread_references(): """Remove inactive threads from _thread_references. - Should be called periodically to prevent memory leaks in scenarios such as: + Should be called periodically to prevent thread objects from accumulating in + scenarios such as: >>> while True: >>> ... t = ThreadPoolExecutor(max_threads=5) >>> ... t.map(int, ['1', '2', '3', '4', '5']) @@ -109,6 +110,12 @@ def _worker(executor_reference, work_queue): class ThreadPoolExecutor(Executor): def __init__(self, max_threads): + """Initializes a new ThreadPoolExecutor instance. + + Args: + max_threads: The maximum number of threads that can be used to + execute the given calls. + """ _remove_dead_thread_references() self._max_threads = max_threads @@ -147,6 +154,7 @@ class ThreadPoolExecutor(Executor): return fl finally: self._shutdown_lock.release() + run_to_futures.__doc__ = Executor.run_to_futures.__doc__ def shutdown(self): self._shutdown_lock.acquire() @@ -154,3 +162,4 @@ class ThreadPoolExecutor(Executor): self._shutdown = True finally: self._shutdown_lock.release() + shutdown.__doc__ = Executor.shutdown.__doc__ diff --git a/python3/futures/_base.py b/python3/futures/_base.py index 21bae32..ad338ae 100644 --- a/python3/futures/_base.py +++ b/python3/futures/_base.py @@ -7,19 +7,21 @@ import logging import threading import time -FIRST_COMPLETED = 0 -FIRST_EXCEPTION = 1 -ALL_COMPLETED = 2 -RETURN_IMMEDIATELY = 3 - -# Possible future states -PENDING = 0 -RUNNING = 1 -CANCELLED = 2 # The future was cancelled... -CANCELLED_AND_NOTIFIED = 3 # ...and .add_cancelled() was called. -FINISHED = 4 - -FUTURE_STATES = [ +FIRST_COMPLETED = 'FIRST_COMPLETED' +FIRST_EXCEPTION = 'FIRST_EXCEPTION' +ALL_COMPLETED = 'ALL_COMPLETED' +RETURN_IMMEDIATELY = 'RETURN_IMMEDIATELY' + +# Possible future states (for internal use by the futures package). +PENDING = 'PENDING' +RUNNING = 'RUNNING' +# The future was cancelled by the user... +CANCELLED = 'CANCELLED' +# ...and ThreadEventSink.add_cancelled() was called by a worker. +CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED' +FINISHED = 'FINISHED' + +_FUTURE_STATES = [ PENDING, RUNNING, CANCELLED, @@ -35,12 +37,24 @@ _STATE_TO_DESCRIPTION_MAP = { FINISHED: "finished" } +# Logger for internal use by the futures package. LOGGER = logging.getLogger("futures") _handler = logging.StreamHandler() LOGGER.addHandler(_handler) del _handler def set_future_exception(future, event_sink, exception): + """Sets a future as having terminated with an exception. + + This function should only be used within the futures package. + + Args: + future: The Future that finished with an exception. + event_sink: The ThreadEventSink accociated with the Future's FutureList. + The event_sink will be notified of the Future's completion, which + may unblock some clients that have called FutureList.wait(). + exception: The expection that executing the Future raised. + """ with future._condition: future._exception = exception with event_sink._condition: @@ -49,6 +63,17 @@ def set_future_exception(future, event_sink, exception): future._condition.notify_all() def set_future_result(future, event_sink, result): + """Sets a future as having terminated without exception. + + This function should only be used within the futures package. + + Args: + future: The Future that completed. + event_sink: The ThreadEventSink accociated with the Future's FutureList. + The event_sink will be notified of the Future's completion, which + may unblock some clients that have called FutureList.wait(). + result: The value returned by the Future. + """ with future._condition: future._result = result with event_sink._condition: @@ -65,9 +90,98 @@ class CancelledError(Error): class TimeoutError(Error): pass +class _WaitTracker(object): + """Provides the event that FutureList.wait(...) blocks on. + + """ + def __init__(self): + self.event = threading.Event() + + def add_result(self): + raise NotImplementedError() + + def add_exception(self): + raise NotImplementedError() + + def add_cancelled(self): + raise NotImplementedError() + +class _FirstCompletedWaitTracker(_WaitTracker): + """Used by wait(return_when=FIRST_COMPLETED).""" + + def add_result(self): + self.event.set() + + def add_exception(self): + self.event.set() + + def add_cancelled(self): + self.event.set() + +class _AllCompletedWaitTracker(_WaitTracker): + """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED).""" + + def __init__(self, num_pending_calls, stop_on_exception): + self.num_pending_calls = num_pending_calls + self.stop_on_exception = stop_on_exception + super().__init__() + + def add_result(self): + self.num_pending_calls -= 1 + if not self.num_pending_calls: + self.event.set() + + def add_exception(self): + if self.stop_on_exception: + self.event.set() + else: + self.add_result() + + def add_cancelled(self): + self.add_result() + +class ThreadEventSink(object): + """Forwards events to many _WaitTrackers. + + Each FutureList has a ThreadEventSink and each call to FutureList.wait() + causes a new _WaitTracker to be added to the ThreadEventSink. This design + allows many threads to call FutureList.wait() on the same FutureList with + different arguments. + + This class should not be used by clients. + """ + def __init__(self): + self._condition = threading.Lock() + self._waiters = [] + + def add(self, e): + self._waiters.append(e) + + def remove(self, e): + self._waiters.remove(e) + + def add_result(self): + for waiter in self._waiters: + waiter.add_result() + + def add_exception(self): + for waiter in self._waiters: + waiter.add_exception() + + def add_cancelled(self): + for waiter in self._waiters: + waiter.add_cancelled() + class Future(object): """Represents the result of an asynchronous computation.""" + # Transitions into the CANCELLED_AND_NOTIFIED and FINISHED states trigger notifications to the ThreadEventSink + # belonging to the Future's FutureList and must be made with ThreadEventSink._condition held to prevent a race + # condition when the transition is made concurrently with the addition of a new _WaitTracker to the ThreadEventSink. + # Other state transitions need only have the Future._condition held. + # When ThreadEventSink._condition and Future._condition must both be held then Future._condition is always acquired + # first. + def __init__(self, index): """Initializes the future. Should not be called by clients.""" self._condition = threading.Condition() @@ -193,61 +307,6 @@ class Future(object): else: raise TimeoutError() -class _FirstCompletedWaitTracker(object): - def __init__(self): - self.event = threading.Event() - - def add_result(self): - self.event.set() - - def add_exception(self): - self.event.set() - - def add_cancelled(self): - self.event.set() - -class _AllCompletedWaitTracker(object): - def __init__(self, pending_calls, stop_on_exception): - self.pending_calls = pending_calls - self.stop_on_exception = stop_on_exception - self.event = threading.Event() - - def add_result(self): - self.pending_calls -= 1 - if not self.pending_calls: - self.event.set() - - def add_exception(self): - if self.stop_on_exception: - self.event.set() - else: - self.add_result() - - def add_cancelled(self): - self.add_result() - -class ThreadEventSink(object): - def __init__(self): - self._condition = threading.Lock() - self._waiters = [] - - def add(self, e): - self._waiters.append(e) - - def remove(self, e): - self._waiters.remove(e) - - def add_result(self): - for waiter in self._waiters: - waiter.add_result() - - def add_exception(self): - for waiter in self._waiters: - waiter.add_exception() - - def add_cancelled(self): - for waiter in self._waiters: - waiter.add_cancelled() class FutureList(object): def __init__(self, futures, event_sink): @@ -282,11 +341,12 @@ class FutureList(object): if return_when == RETURN_IMMEDIATELY: return + # Futures cannot change state without this condition being held. with self._event_sink._condition: # Make a quick exit if every future is already done. This check is # necessary because, if every future is in the # CANCELLED_AND_NOTIFIED or FINISHED state then the WaitTracker will - # never receive any + # never receive any events. if all(f._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] for f in self): return @@ -390,7 +450,7 @@ class FutureList(object): return future in self._futures def __repr__(self): - states = {state: 0 for state in FUTURE_STATES} + states = {state: 0 for state in _FUTURE_STATES} for f in self: states[f._state] += 1 diff --git a/python3/futures/thread.py b/python3/futures/thread.py index 9292863..286d08b 100644 --- a/python3/futures/thread.py +++ b/python3/futures/thread.py @@ -103,6 +103,14 @@ def _worker(executor_reference, work_queue): class ThreadPoolExecutor(Executor): def __init__(self, max_threads): + """Initializes a new ThreadPoolExecutor instance. + + Args: + max_threads: The maximum number of threads that can be used to + execute the given calls. + """ + _remove_dead_thread_references() + self._max_threads = max_threads self._work_queue = queue.Queue() self._threads = set() @@ -136,7 +144,9 @@ class ThreadPoolExecutor(Executor): fl = FutureList(futures, event_sink) fl.wait(timeout=timeout, return_when=return_when) return fl + run_to_futures.__doc__ = Executor.run_to_futures.__doc__ def shutdown(self): with self._shutdown_lock: self._shutdown = True + shutdown.__doc__ = Executor.shutdown.__doc__ |