summaryrefslogtreecommitdiff
path: root/futures/process.py
diff options
context:
space:
mode:
Diffstat (limited to 'futures/process.py')
-rw-r--r--futures/process.py27
1 files changed, 15 insertions, 12 deletions
diff --git a/futures/process.py b/futures/process.py
index 06a8c25..c4dad74 100644
--- a/futures/process.py
+++ b/futures/process.py
@@ -3,9 +3,8 @@
from futures._base import (PENDING, RUNNING, CANCELLED,
CANCELLED_AND_NOTIFIED, FINISHED,
ALL_COMPLETED,
- LOGGER,
set_future_exception, set_future_result,
- Executor, Future, FutureList,ThreadEventSink)
+ Executor, Future, FutureList, ThreadEventSink)
import queue
import multiprocessing
@@ -54,18 +53,22 @@ class ProcessPoolExecutor(Executor):
max_processes = 16
self._max_processes = max_processes
+ # Make the call queue slightly larger than the number of processes to
+ # prevent the worker processes from starving but to make future.cancel()
+ # responsive.
self._call_queue = multiprocessing.Queue(self._max_processes + 1)
self._result_queue = multiprocessing.Queue()
self._work_ids = queue.Queue()
self._queue_management_thread = None
self._processes = set()
- self._shutdown = False
+
+ # Shutdown is a two-step process.
+ self._shutdown_thread = False
self._shutdown_process_event = multiprocessing.Event()
- self._lock = threading.Lock()
+ self._shutdown_lock = threading.Lock()
self._queue_count = 0
self._pending_work_items = {}
-
def _add_call_item_to_queue(self):
while True:
try:
@@ -96,7 +99,7 @@ class ProcessPoolExecutor(Executor):
result_item = self._result_queue.get(block=True,
timeout=0.1)
except queue.Empty:
- if self._shutdown and not self._pending_work_items:
+ if self._shutdown_thread and not self._pending_work_items:
self._shutdown_process_event.set()
return
else:
@@ -129,10 +132,10 @@ class ProcessPoolExecutor(Executor):
p.start()
self._processes.add(p)
- def run(self, calls, timeout=None, return_when=ALL_COMPLETED):
- with self._lock:
- if self._shutdown:
- raise RuntimeError()
+ def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED):
+ with self._shutdown_lock:
+ if self._shutdown_thread:
+ raise RuntimeError('cannot run new futures after shutdown')
futures = []
event_sink = ThreadEventSink()
@@ -151,5 +154,5 @@ class ProcessPoolExecutor(Executor):
return fl
def shutdown(self):
- with self._lock:
- self._shutdown = True
+ with self._shutdown_lock:
+ self._shutdown_thread = True