summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Lib/concurrent/futures/process.py47
-rw-r--r--Lib/test/test_concurrent_futures.py74
-rw-r--r--Misc/NEWS.d/next/Library/2020-02-16-18-49-16.bpo-39104.cI5MJY.rst2
3 files changed, 100 insertions, 23 deletions
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index fd9f572b6c..d77322831a 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -80,18 +80,23 @@ _global_shutdown = False
class _ThreadWakeup:
def __init__(self):
+ self._closed = False
self._reader, self._writer = mp.Pipe(duplex=False)
def close(self):
- self._writer.close()
- self._reader.close()
+ if not self._closed:
+ self._closed = True
+ self._writer.close()
+ self._reader.close()
def wakeup(self):
- self._writer.send_bytes(b"")
+ if not self._closed:
+ self._writer.send_bytes(b"")
def clear(self):
- while self._reader.poll():
- self._reader.recv_bytes()
+ if not self._closed:
+ while self._reader.poll():
+ self._reader.recv_bytes()
def _python_exit():
@@ -160,8 +165,9 @@ class _CallItem(object):
class _SafeQueue(Queue):
"""Safe Queue set exception to the future object linked to a job"""
- def __init__(self, max_size=0, *, ctx, pending_work_items):
+ def __init__(self, max_size=0, *, ctx, pending_work_items, thread_wakeup):
self.pending_work_items = pending_work_items
+ self.thread_wakeup = thread_wakeup
super().__init__(max_size, ctx=ctx)
def _on_queue_feeder_error(self, e, obj):
@@ -169,6 +175,7 @@ class _SafeQueue(Queue):
tb = traceback.format_exception(type(e), e, e.__traceback__)
e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
work_item = self.pending_work_items.pop(obj.work_id, None)
+ self.thread_wakeup.wakeup()
# work_item can be None if another process terminated. In this case,
# the queue_manager_thread fails all work_items with BrokenProcessPool
if work_item is not None:
@@ -339,6 +346,8 @@ def _queue_management_worker(executor_reference,
# Release the queue's resources as soon as possible.
call_queue.close()
+ call_queue.join_thread()
+ thread_wakeup.close()
# If .join() is not called on the created processes then
# some ctx.Queue methods may deadlock on Mac OS X.
for p in processes.values():
@@ -566,6 +575,14 @@ class ProcessPoolExecutor(_base.Executor):
self._pending_work_items = {}
self._cancel_pending_futures = False
+ # _ThreadWakeup is a communication channel used to interrupt the wait
+ # of the main loop of queue_manager_thread from another thread (e.g.
+ # when calling executor.submit or executor.shutdown). We do not use the
+ # _result_queue to send the wakeup signal to the queue_manager_thread
+ # as it could result in a deadlock if a worker process dies with the
+ # _result_queue write lock still acquired.
+ self._queue_management_thread_wakeup = _ThreadWakeup()
+
# Create communication channels for the executor
# Make the call queue slightly larger than the number of processes to
# prevent the worker processes from idling. But don't make it too big
@@ -573,7 +590,8 @@ class ProcessPoolExecutor(_base.Executor):
queue_size = self._max_workers + EXTRA_QUEUED_CALLS
self._call_queue = _SafeQueue(
max_size=queue_size, ctx=self._mp_context,
- pending_work_items=self._pending_work_items)
+ pending_work_items=self._pending_work_items,
+ thread_wakeup=self._queue_management_thread_wakeup)
# Killed worker processes can produce spurious "broken pipe"
# tracebacks in the queue's own worker thread. But we detect killed
# processes anyway, so silence the tracebacks.
@@ -581,14 +599,6 @@ class ProcessPoolExecutor(_base.Executor):
self._result_queue = mp_context.SimpleQueue()
self._work_ids = queue.Queue()
- # _ThreadWakeup is a communication channel used to interrupt the wait
- # of the main loop of queue_manager_thread from another thread (e.g.
- # when calling executor.submit or executor.shutdown). We do not use the
- # _result_queue to send the wakeup signal to the queue_manager_thread
- # as it could result in a deadlock if a worker process dies with the
- # _result_queue write lock still acquired.
- self._queue_management_thread_wakeup = _ThreadWakeup()
-
def _start_queue_management_thread(self):
if self._queue_management_thread is None:
# When the executor gets garbarge collected, the weakref callback
@@ -692,16 +702,11 @@ class ProcessPoolExecutor(_base.Executor):
# To reduce the risk of opening too many files, remove references to
# objects that use file descriptors.
self._queue_management_thread = None
- if self._call_queue is not None:
- self._call_queue.close()
- if wait:
- self._call_queue.join_thread()
- self._call_queue = None
+ self._call_queue = None
self._result_queue = None
self._processes = None
if self._queue_management_thread_wakeup:
- self._queue_management_thread_wakeup.close()
self._queue_management_thread_wakeup = None
shutdown.__doc__ = _base.Executor.shutdown.__doc__
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py
index af77f81341..a7381f9d13 100644
--- a/Lib/test/test_concurrent_futures.py
+++ b/Lib/test/test_concurrent_futures.py
@@ -415,13 +415,32 @@ class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase
def test_del_shutdown(self):
executor = futures.ThreadPoolExecutor(max_workers=5)
- executor.map(abs, range(-5, 5))
+ res = executor.map(abs, range(-5, 5))
threads = executor._threads
del executor
for t in threads:
t.join()
+ # Make sure the results were all computed before the
+ # executor got shutdown.
+ assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
+
+ def test_shutdown_no_wait(self):
+ # Ensure that the executor cleans up the threads when calling
+ # shutdown with wait=False
+ executor = futures.ThreadPoolExecutor(max_workers=5)
+ res = executor.map(abs, range(-5, 5))
+ threads = executor._threads
+ executor.shutdown(wait=False)
+ for t in threads:
+ t.join()
+
+ # Make sure the results were all computed before the
+ # executor got shutdown.
+ assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
+
+
def test_thread_names_assigned(self):
executor = futures.ThreadPoolExecutor(
max_workers=5, thread_name_prefix='SpecialPool')
@@ -488,7 +507,7 @@ class ProcessPoolShutdownTest(ExecutorShutdownTest):
def test_del_shutdown(self):
executor = futures.ProcessPoolExecutor(max_workers=5)
- list(executor.map(abs, range(-5, 5)))
+ res = executor.map(abs, range(-5, 5))
queue_management_thread = executor._queue_management_thread
processes = executor._processes
call_queue = executor._call_queue
@@ -502,6 +521,31 @@ class ProcessPoolShutdownTest(ExecutorShutdownTest):
p.join()
call_queue.join_thread()
+ # Make sure the results were all computed before the
+ # executor got shutdown.
+ assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
+
+ def test_shutdown_no_wait(self):
+ # Ensure that the executor cleans up the processes when calling
+ # shutdown with wait=False
+ executor = futures.ProcessPoolExecutor(max_workers=5)
+ res = executor.map(abs, range(-5, 5))
+ processes = executor._processes
+ call_queue = executor._call_queue
+ queue_management_thread = executor._queue_management_thread
+ executor.shutdown(wait=False)
+
+ # Make sure that all the executor resources were properly cleaned by
+ # the shutdown process
+ queue_management_thread.join()
+ for p in processes.values():
+ p.join()
+ call_queue.join_thread()
+
+ # Make sure the results were all computed before the executor got
+ # shutdown.
+ assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
+
create_executor_tests(ProcessPoolShutdownTest,
executor_mixins=(ProcessPoolForkMixin,
@@ -1086,6 +1130,32 @@ class ExecutorDeadlockTest:
with self.assertRaises(BrokenProcessPool):
f.result()
+ def test_shutdown_deadlock_pickle(self):
+ # Test that the pool calling shutdown with wait=False does not cause
+ # a deadlock if a task fails at pickle after the shutdown call.
+ # Reported in bpo-39104.
+ self.executor.shutdown(wait=True)
+ with self.executor_type(max_workers=2,
+ mp_context=get_context(self.ctx)) as executor:
+ self.executor = executor # Allow clean up in fail_on_deadlock
+
+ # Start the executor and get the queue_management_thread to collect
+ # the threads and avoid dangling thread that should be cleaned up
+ # asynchronously.
+ executor.submit(id, 42).result()
+ queue_manager = executor._queue_management_thread
+
+ # Submit a task that fails at pickle and shutdown the executor
+ # without waiting
+ f = executor.submit(id, ErrorAtPickle())
+ executor.shutdown(wait=False)
+ with self.assertRaises(PicklingError):
+ f.result()
+
+ # Make sure the executor is eventually shutdown and do not leave
+ # dangling threads
+ queue_manager.join()
+
create_executor_tests(ExecutorDeadlockTest,
executor_mixins=(ProcessPoolForkMixin,
diff --git a/Misc/NEWS.d/next/Library/2020-02-16-18-49-16.bpo-39104.cI5MJY.rst b/Misc/NEWS.d/next/Library/2020-02-16-18-49-16.bpo-39104.cI5MJY.rst
new file mode 100644
index 0000000000..52779bf098
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2020-02-16-18-49-16.bpo-39104.cI5MJY.rst
@@ -0,0 +1,2 @@
+Fix hanging ProcessPoolExcutor on ``shutdown(wait=False)`` when a task has
+failed pickling.