diff options
Diffstat (limited to 'futures/process.py')
-rw-r--r-- | futures/process.py | 27 |
1 files changed, 15 insertions, 12 deletions
diff --git a/futures/process.py b/futures/process.py index 06a8c25..c4dad74 100644 --- a/futures/process.py +++ b/futures/process.py @@ -3,9 +3,8 @@ from futures._base import (PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, ALL_COMPLETED, - LOGGER, set_future_exception, set_future_result, - Executor, Future, FutureList,ThreadEventSink) + Executor, Future, FutureList, ThreadEventSink) import queue import multiprocessing @@ -54,18 +53,22 @@ class ProcessPoolExecutor(Executor): max_processes = 16 self._max_processes = max_processes + # Make the call queue slightly larger than the number of processes to + # prevent the worker processes from starving but to make future.cancel() + # responsive. self._call_queue = multiprocessing.Queue(self._max_processes + 1) self._result_queue = multiprocessing.Queue() self._work_ids = queue.Queue() self._queue_management_thread = None self._processes = set() - self._shutdown = False + + # Shutdown is a two-step process. + self._shutdown_thread = False self._shutdown_process_event = multiprocessing.Event() - self._lock = threading.Lock() + self._shutdown_lock = threading.Lock() self._queue_count = 0 self._pending_work_items = {} - def _add_call_item_to_queue(self): while True: try: @@ -96,7 +99,7 @@ class ProcessPoolExecutor(Executor): result_item = self._result_queue.get(block=True, timeout=0.1) except queue.Empty: - if self._shutdown and not self._pending_work_items: + if self._shutdown_thread and not self._pending_work_items: self._shutdown_process_event.set() return else: @@ -129,10 +132,10 @@ class ProcessPoolExecutor(Executor): p.start() self._processes.add(p) - def run(self, calls, timeout=None, return_when=ALL_COMPLETED): - with self._lock: - if self._shutdown: - raise RuntimeError() + def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED): + with self._shutdown_lock: + if self._shutdown_thread: + raise RuntimeError('cannot run new futures after shutdown') futures = [] event_sink = ThreadEventSink() @@ -151,5 +154,5 @@ class ProcessPoolExecutor(Executor): return fl def shutdown(self): - with self._lock: - self._shutdown = True + with self._shutdown_lock: + self._shutdown_thread = True |