summaryrefslogtreecommitdiff
path: root/python3/futures/process.py
diff options
context:
space:
mode:
Diffstat (limited to 'python3/futures/process.py')
-rw-r--r--python3/futures/process.py25
1 files changed, 20 insertions, 5 deletions
diff --git a/python3/futures/process.py b/python3/futures/process.py
index 71dd602..75ee83c 100644
--- a/python3/futures/process.py
+++ b/python3/futures/process.py
@@ -65,6 +65,8 @@ def _add_call_item_to_queue(pending_work_items,
work_ids,
call_queue):
while True:
+ if call_queue.full():
+ return
try:
work_id = work_ids.get(block=False)
except queue.Empty:
@@ -76,17 +78,16 @@ def _add_call_item_to_queue(pending_work_items,
with work_item.future._condition:
work_item.future._condition.notify_all()
work_item.completion_tracker.add_cancelled()
+ del pending_work_items[work_id]
continue
else:
with work_item.future._condition:
work_item.future._state = RUNNING
-
call_queue.put(_CallItem(work_id, work_item.call),
block=True)
- if call_queue.full():
- return
def _result(executor_reference,
+ processes,
pending_work_items,
work_ids_queue,
call_queue,
@@ -101,9 +102,22 @@ def _result(executor_reference,
result_item = result_queue.get(block=True, timeout=0.1)
except queue.Empty:
executor = executor_reference()
+ # No more work items can be added if:
+ # - The interpreter is shutting down OR
+ # - The executor that owns this worker has been collected OR
+ # - The executor that owns this worker has been shutdown.
if _shutdown or executor is None or executor._shutdown_thread:
- shutdown_process_event.set()
- return
+ # Since no new work items can be added, it is safe to shutdown
+ # this thread if there are no pending work items.
+ if not pending_work_items:
+ shutdown_process_event.set()
+
+ # If .join() is not called on the created processes then
+ # some multiprocessing.Queue methods may deadlock on Mac OS
+ # X.
+ for p in processes:
+ p.join()
+ return
del executor
else:
work_item = pending_work_items[result_item.work_id]
@@ -147,6 +161,7 @@ class ProcessPoolExecutor(Executor):
self._queue_management_thread = threading.Thread(
target=_result,
args=(weakref.ref(self),
+ self._processes,
self._pending_work_items,
self._work_ids,
self._call_queue,