diff options
Diffstat (limited to 'python3/futures/process.py')
-rw-r--r-- | python3/futures/process.py | 25 |
1 files changed, 20 insertions, 5 deletions
diff --git a/python3/futures/process.py b/python3/futures/process.py index 71dd602..75ee83c 100644 --- a/python3/futures/process.py +++ b/python3/futures/process.py @@ -65,6 +65,8 @@ def _add_call_item_to_queue(pending_work_items, work_ids, call_queue): while True: + if call_queue.full(): + return try: work_id = work_ids.get(block=False) except queue.Empty: @@ -76,17 +78,16 @@ def _add_call_item_to_queue(pending_work_items, with work_item.future._condition: work_item.future._condition.notify_all() work_item.completion_tracker.add_cancelled() + del pending_work_items[work_id] continue else: with work_item.future._condition: work_item.future._state = RUNNING - call_queue.put(_CallItem(work_id, work_item.call), block=True) - if call_queue.full(): - return def _result(executor_reference, + processes, pending_work_items, work_ids_queue, call_queue, @@ -101,9 +102,22 @@ def _result(executor_reference, result_item = result_queue.get(block=True, timeout=0.1) except queue.Empty: executor = executor_reference() + # No more work items can be added if: + # - The interpreter is shutting down OR + # - The executor that owns this worker has been collected OR + # - The executor that owns this worker has been shutdown. if _shutdown or executor is None or executor._shutdown_thread: - shutdown_process_event.set() - return + # Since no new work items can be added, it is safe to shutdown + # this thread if there are no pending work items. + if not pending_work_items: + shutdown_process_event.set() + + # If .join() is not called on the created processes then + # some multiprocessing.Queue methods may deadlock on Mac OS + # X. + for p in processes: + p.join() + return del executor else: work_item = pending_work_items[result_item.work_id] @@ -147,6 +161,7 @@ class ProcessPoolExecutor(Executor): self._queue_management_thread = threading.Thread( target=_result, args=(weakref.ref(self), + self._processes, self._pending_work_items, self._work_ids, self._call_queue, |