diff options
author | brian.quinlan <devnull@localhost> | 2009-05-10 11:37:27 +0000 |
---|---|---|
committer | brian.quinlan <devnull@localhost> | 2009-05-10 11:37:27 +0000 |
commit | b32b0bfba4f6c8cd4f0b71cf7cfbad979c6c2695 (patch) | |
tree | 65c2c23588edbb0fd4e0fede7eaaba578600cbe1 /futures/process.py | |
parent | 0a24721b25c89aed65030b940b19ac5fc2c27aaa (diff) | |
download | futures-b32b0bfba4f6c8cd4f0b71cf7cfbad979c6c2695.tar.gz |
Same names change and doc additions.
Diffstat (limited to 'futures/process.py')
-rw-r--r-- | futures/process.py | 27 |
1 files changed, 15 insertions, 12 deletions
diff --git a/futures/process.py b/futures/process.py index 06a8c25..c4dad74 100644 --- a/futures/process.py +++ b/futures/process.py @@ -3,9 +3,8 @@ from futures._base import (PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, ALL_COMPLETED, - LOGGER, set_future_exception, set_future_result, - Executor, Future, FutureList,ThreadEventSink) + Executor, Future, FutureList, ThreadEventSink) import queue import multiprocessing @@ -54,18 +53,22 @@ class ProcessPoolExecutor(Executor): max_processes = 16 self._max_processes = max_processes + # Make the call queue slightly larger than the number of processes to + # prevent the worker processes from starving but to make future.cancel() + # responsive. self._call_queue = multiprocessing.Queue(self._max_processes + 1) self._result_queue = multiprocessing.Queue() self._work_ids = queue.Queue() self._queue_management_thread = None self._processes = set() - self._shutdown = False + + # Shutdown is a two-step process. + self._shutdown_thread = False self._shutdown_process_event = multiprocessing.Event() - self._lock = threading.Lock() + self._shutdown_lock = threading.Lock() self._queue_count = 0 self._pending_work_items = {} - def _add_call_item_to_queue(self): while True: try: @@ -96,7 +99,7 @@ class ProcessPoolExecutor(Executor): result_item = self._result_queue.get(block=True, timeout=0.1) except queue.Empty: - if self._shutdown and not self._pending_work_items: + if self._shutdown_thread and not self._pending_work_items: self._shutdown_process_event.set() return else: @@ -129,10 +132,10 @@ class ProcessPoolExecutor(Executor): p.start() self._processes.add(p) - def run(self, calls, timeout=None, return_when=ALL_COMPLETED): - with self._lock: - if self._shutdown: - raise RuntimeError() + def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED): + with self._shutdown_lock: + if self._shutdown_thread: + raise RuntimeError('cannot run new futures after shutdown') futures = [] event_sink = ThreadEventSink() @@ -151,5 +154,5 @@ class ProcessPoolExecutor(Executor): return fl def shutdown(self): - with self._lock: - self._shutdown = True + with self._shutdown_lock: + self._shutdown_thread = True |