diff options
Diffstat (limited to 'python2/futures/process.py')
-rw-r--r-- | python2/futures/process.py | 24 |
1 files changed, 20 insertions, 4 deletions
diff --git a/python2/futures/process.py b/python2/futures/process.py index b96203b..e4c3ad3 100644 --- a/python2/futures/process.py +++ b/python2/futures/process.py @@ -64,6 +64,8 @@ def _add_call_item_to_queue(pending_work_items, work_ids, call_queue): while True: + if call_queue.full(): + return try: work_id = work_ids.get(block=False) except Queue.Empty: @@ -77,16 +79,16 @@ def _add_call_item_to_queue(pending_work_items, work_item.future._condition.release() work_item.completion_tracker.add_cancelled() + del pending_work_items[work_id] continue else: work_item.future._condition.acquire() work_item.future._state = RUNNING work_item.future._condition.release() call_queue.put(_CallItem(work_id, work_item.call), block=True) - if call_queue.full(): - return def _result(executor_reference, + processes, pending_work_items, work_ids_queue, call_queue, @@ -100,9 +102,22 @@ def _result(executor_reference, result_item = result_queue.get(block=True, timeout=0.1) except Queue.Empty: executor = executor_reference() + # No more work items can be added if: + # - The interpreter is shutting down OR + # - The executor that owns this worker has been collected OR + # - The executor that owns this worker has been shutdown. if _shutdown or executor is None or executor._shutdown_thread: - shutdown_process_event.set() - return + # Since no new work items can be added, it is safe to shutdown + # this thread if there are no pending work items. + if not pending_work_items: + shutdown_process_event.set() + + # If .join() is not called on the created processes then + # some multiprocessing.Queue methods may deadlock on Mac OS + # X. + for p in processes: + p.join() + return del executor else: work_item = pending_work_items[result_item.work_id] @@ -147,6 +162,7 @@ class ProcessPoolExecutor(Executor): self._queue_management_thread = threading.Thread( target=_result, args=(weakref.ref(self), + self._processes, self._pending_work_items, self._work_ids, self._call_queue, |