diff options
author | Thomas Moreau <thomas.moreau.2010@gmail.com> | 2018-01-05 11:15:54 +0100 |
---|---|---|
committer | Antoine Pitrou <pitrou@free.fr> | 2018-01-05 11:15:54 +0100 |
commit | 94459fd7dc25ce19096f2080eb7339497d319eb0 (patch) | |
tree | 7623769fafc2025884ac9a8b1a41e2f0ba5f13db /Lib/concurrent | |
parent | 65f2a6dcc2bc28a8566b74c8e9273f982331ec48 (diff) | |
download | cpython-git-94459fd7dc25ce19096f2080eb7339497d319eb0.tar.gz |
bpo-31699 Deadlocks in `concurrent.futures.ProcessPoolExecutor` with pickling error (#3895)
Fix deadlocks in :class:`concurrent.futures.ProcessPoolExecutor` when task arguments or results cause pickling or unpickling errors.
This should make sure that calls to the :class:`ProcessPoolExecutor` API always eventually return.
Diffstat (limited to 'Lib/concurrent')
-rw-r--r-- | Lib/concurrent/futures/process.py | 208 |
1 files changed, 159 insertions, 49 deletions
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 35af65d0be..aaa5151e01 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -8,10 +8,10 @@ The follow diagram and text describe the data-flow through the system: |======================= In-process =====================|== Out-of-process ==| +----------+ +----------+ +--------+ +-----------+ +---------+ -| | => | Work Ids | => | | => | Call Q | => | | -| | +----------+ | | +-----------+ | | -| | | ... | | | | ... | | | -| | | 6 | | | | 5, call() | | | +| | => | Work Ids | | | | Call Q | | Process | +| | +----------+ | | +-----------+ | Pool | +| | | ... | | | | ... | +---------+ +| | | 6 | => | | => | 5, call() | => | | | | | 7 | | | | ... | | | | Process | | ... | | Local | +-----------+ | Process | | Pool | +----------+ | Worker | | #1..n | @@ -52,6 +52,7 @@ import queue from queue import Full import multiprocessing as mp from multiprocessing.connection import wait +from multiprocessing.queues import Queue import threading import weakref from functools import partial @@ -72,16 +73,31 @@ import traceback # workers to exit when their work queues are empty and then waits until the # threads/processes finish. -_threads_queues = weakref.WeakKeyDictionary() +_threads_wakeups = weakref.WeakKeyDictionary() _global_shutdown = False + +class _ThreadWakeup: + __slot__ = ["_state"] + + def __init__(self): + self._reader, self._writer = mp.Pipe(duplex=False) + + def wakeup(self): + self._writer.send_bytes(b"") + + def clear(self): + while self._reader.poll(): + self._reader.recv_bytes() + + def _python_exit(): global _global_shutdown _global_shutdown = True - items = list(_threads_queues.items()) - for t, q in items: - q.put(None) - for t, q in items: + items = list(_threads_wakeups.items()) + for _, thread_wakeup in items: + thread_wakeup.wakeup() + for t, _ in items: t.join() # Controls how many more calls than processes will be queued in the call queue. @@ -90,6 +106,7 @@ def _python_exit(): # (Futures in the call queue cannot be cancelled). EXTRA_QUEUED_CALLS = 1 + # Hack to embed stringification of remote traceback in local traceback class _RemoteTraceback(Exception): @@ -132,6 +149,25 @@ class _CallItem(object): self.kwargs = kwargs +class _SafeQueue(Queue): + """Safe Queue set exception to the future object linked to a job""" + def __init__(self, max_size=0, *, ctx, pending_work_items): + self.pending_work_items = pending_work_items + super().__init__(max_size, ctx=ctx) + + def _on_queue_feeder_error(self, e, obj): + if isinstance(obj, _CallItem): + tb = traceback.format_exception(type(e), e, e.__traceback__) + e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb))) + work_item = self.pending_work_items.pop(obj.work_id, None) + # work_item can be None if another process terminated. In this case, + # the queue_manager_thread fails all work_items with BrokenProcessPool + if work_item is not None: + work_item.future.set_exception(e) + else: + super()._on_queue_feeder_error(e, obj) + + def _get_chunks(*iterables, chunksize): """ Iterates over zip()ed iterables in chunks. """ it = zip(*iterables) @@ -152,6 +188,17 @@ def _process_chunk(fn, chunk): """ return [fn(*args) for args in chunk] + +def _sendback_result(result_queue, work_id, result=None, exception=None): + """Safely send back the given result or exception""" + try: + result_queue.put(_ResultItem(work_id, result=result, + exception=exception)) + except BaseException as e: + exc = _ExceptionWithTraceback(e, e.__traceback__) + result_queue.put(_ResultItem(work_id, exception=exc)) + + def _process_worker(call_queue, result_queue, initializer, initargs): """Evaluates calls from call_queue and places the results in result_queue. @@ -183,10 +230,9 @@ def _process_worker(call_queue, result_queue, initializer, initargs): r = call_item.fn(*call_item.args, **call_item.kwargs) except BaseException as e: exc = _ExceptionWithTraceback(e, e.__traceback__) - result_queue.put(_ResultItem(call_item.work_id, exception=exc)) + _sendback_result(result_queue, call_item.work_id, exception=exc) else: - result_queue.put(_ResultItem(call_item.work_id, - result=r)) + _sendback_result(result_queue, call_item.work_id, result=r) # Liberate the resource as soon as possible, to avoid holding onto # open files or shared memory that is not needed anymore @@ -230,12 +276,14 @@ def _add_call_item_to_queue(pending_work_items, del pending_work_items[work_id] continue + def _queue_management_worker(executor_reference, processes, pending_work_items, work_ids_queue, call_queue, - result_queue): + result_queue, + thread_wakeup): """Manages the communication between this process and the worker processes. This function is run in a local thread. @@ -253,6 +301,9 @@ def _queue_management_worker(executor_reference, derived from _WorkItems for processing by the process workers. result_queue: A ctx.SimpleQueue of _ResultItems generated by the process workers. + thread_wakeup: A _ThreadWakeup to allow waking up the + queue_manager_thread from the main Thread and avoid deadlocks + caused by permanently locked queues. """ executor = None @@ -261,10 +312,21 @@ def _queue_management_worker(executor_reference, or executor._shutdown_thread) def shutdown_worker(): - # This is an upper bound - nb_children_alive = sum(p.is_alive() for p in processes.values()) - for i in range(0, nb_children_alive): - call_queue.put_nowait(None) + # This is an upper bound on the number of children alive. + n_children_alive = sum(p.is_alive() for p in processes.values()) + n_children_to_stop = n_children_alive + n_sentinels_sent = 0 + # Send the right number of sentinels, to make sure all children are + # properly terminated. + while n_sentinels_sent < n_children_to_stop and n_children_alive > 0: + for i in range(n_children_to_stop - n_sentinels_sent): + try: + call_queue.put_nowait(None) + n_sentinels_sent += 1 + except Full: + break + n_children_alive = sum(p.is_alive() for p in processes.values()) + # Release the queue's resources as soon as possible. call_queue.close() # If .join() is not called on the created processes then @@ -272,19 +334,37 @@ def _queue_management_worker(executor_reference, for p in processes.values(): p.join() - reader = result_queue._reader + result_reader = result_queue._reader + wakeup_reader = thread_wakeup._reader + readers = [result_reader, wakeup_reader] while True: _add_call_item_to_queue(pending_work_items, work_ids_queue, call_queue) - sentinels = [p.sentinel for p in processes.values()] - assert sentinels - ready = wait([reader] + sentinels) - if reader in ready: - result_item = reader.recv() - else: + # Wait for a result to be ready in the result_queue while checking + # that all worker processes are still running, or for a wake up + # signal send. The wake up signals come either from new tasks being + # submitted, from the executor being shutdown/gc-ed, or from the + # shutdown of the python interpreter. + worker_sentinels = [p.sentinel for p in processes.values()] + ready = wait(readers + worker_sentinels) + + cause = None + is_broken = True + if result_reader in ready: + try: + result_item = result_reader.recv() + is_broken = False + except BaseException as e: + cause = traceback.format_exception(type(e), e, e.__traceback__) + + elif wakeup_reader in ready: + is_broken = False + result_item = None + thread_wakeup.clear() + if is_broken: # Mark the process pool broken so that submits fail right now. executor = executor_reference() if executor is not None: @@ -293,14 +373,15 @@ def _queue_management_worker(executor_reference, 'usable anymore') executor._shutdown_thread = True executor = None + bpe = BrokenProcessPool("A process in the process pool was " + "terminated abruptly while the future was " + "running or pending.") + if cause is not None: + bpe.__cause__ = _RemoteTraceback( + f"\n'''\n{''.join(cause)}'''") # All futures in flight must be marked failed for work_id, work_item in pending_work_items.items(): - work_item.future.set_exception( - BrokenProcessPool( - "A process in the process pool was " - "terminated abruptly while the future was " - "running or pending." - )) + work_item.future.set_exception(bpe) # Delete references to object. See issue16284 del work_item pending_work_items.clear() @@ -329,6 +410,9 @@ def _queue_management_worker(executor_reference, work_item.future.set_result(result_item.result) # Delete references to object. See issue16284 del work_item + # Delete reference to result_item + del result_item + # Check whether we should start shutting down. executor = executor_reference() # No more work items can be added if: @@ -348,8 +432,11 @@ def _queue_management_worker(executor_reference, pass executor = None + _system_limits_checked = False _system_limited = None + + def _check_system_limits(): global _system_limits_checked, _system_limited if _system_limits_checked: @@ -369,7 +456,8 @@ def _check_system_limits(): # minimum number of semaphores available # according to POSIX return - _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max + _system_limited = ("system provides too few semaphores (%d" + " available, 256 necessary)" % nsems_max) raise NotImplementedError(_system_limited) @@ -415,6 +503,7 @@ class ProcessPoolExecutor(_base.Executor): raise ValueError("max_workers must be greater than 0") self._max_workers = max_workers + if mp_context is None: mp_context = mp.get_context() self._mp_context = mp_context @@ -424,34 +513,52 @@ class ProcessPoolExecutor(_base.Executor): self._initializer = initializer self._initargs = initargs + # Management thread + self._queue_management_thread = None + + # Map of pids to processes + self._processes = {} + + # Shutdown is a two-step process. + self._shutdown_thread = False + self._shutdown_lock = threading.Lock() + self._broken = False + self._queue_count = 0 + self._pending_work_items = {} + + # Create communication channels for the executor # Make the call queue slightly larger than the number of processes to # prevent the worker processes from idling. But don't make it too big # because futures in the call queue cannot be cancelled. queue_size = self._max_workers + EXTRA_QUEUED_CALLS - self._call_queue = mp_context.Queue(queue_size) + self._call_queue = _SafeQueue( + max_size=queue_size, ctx=self._mp_context, + pending_work_items=self._pending_work_items) # Killed worker processes can produce spurious "broken pipe" # tracebacks in the queue's own worker thread. But we detect killed # processes anyway, so silence the tracebacks. self._call_queue._ignore_epipe = True self._result_queue = mp_context.SimpleQueue() self._work_ids = queue.Queue() - self._queue_management_thread = None - # Map of pids to processes - self._processes = {} - # Shutdown is a two-step process. - self._shutdown_thread = False - self._shutdown_lock = threading.Lock() - self._broken = False - self._queue_count = 0 - self._pending_work_items = {} + # _ThreadWakeup is a communication channel used to interrupt the wait + # of the main loop of queue_manager_thread from another thread (e.g. + # when calling executor.submit or executor.shutdown). We do not use the + # _result_queue to send the wakeup signal to the queue_manager_thread + # as it could result in a deadlock if a worker process dies with the + # _result_queue write lock still acquired. + self._queue_management_thread_wakeup = _ThreadWakeup() def _start_queue_management_thread(self): - # When the executor gets lost, the weakref callback will wake up - # the queue management thread. - def weakref_cb(_, q=self._result_queue): - q.put(None) if self._queue_management_thread is None: + # When the executor gets garbarge collected, the weakref callback + # will wake up the queue management thread so that it can terminate + # if there is no pending work item. + def weakref_cb(_, + thread_wakeup=self._queue_management_thread_wakeup): + mp.util.debug('Executor collected: triggering callback for' + ' QueueManager wakeup') + thread_wakeup.wakeup() # Start the processes so that their sentinels are known. self._adjust_process_count() self._queue_management_thread = threading.Thread( @@ -461,10 +568,13 @@ class ProcessPoolExecutor(_base.Executor): self._pending_work_items, self._work_ids, self._call_queue, - self._result_queue)) + self._result_queue, + self._queue_management_thread_wakeup), + name="QueueManagerThread") self._queue_management_thread.daemon = True self._queue_management_thread.start() - _threads_queues[self._queue_management_thread] = self._result_queue + _threads_wakeups[self._queue_management_thread] = \ + self._queue_management_thread_wakeup def _adjust_process_count(self): for _ in range(len(self._processes), self._max_workers): @@ -491,7 +601,7 @@ class ProcessPoolExecutor(_base.Executor): self._work_ids.put(self._queue_count) self._queue_count += 1 # Wake up queue management thread - self._result_queue.put(None) + self._queue_management_thread_wakeup.wakeup() self._start_queue_management_thread() return f @@ -531,7 +641,7 @@ class ProcessPoolExecutor(_base.Executor): self._shutdown_thread = True if self._queue_management_thread: # Wake up queue management thread - self._result_queue.put(None) + self._queue_management_thread_wakeup.wakeup() if wait: self._queue_management_thread.join() # To reduce the risk of opening too many files, remove references to |