From 94459fd7dc25ce19096f2080eb7339497d319eb0 Mon Sep 17 00:00:00 2001 From: Thomas Moreau Date: Fri, 5 Jan 2018 11:15:54 +0100 Subject: bpo-31699 Deadlocks in `concurrent.futures.ProcessPoolExecutor` with pickling error (#3895) Fix deadlocks in :class:`concurrent.futures.ProcessPoolExecutor` when task arguments or results cause pickling or unpickling errors. This should make sure that calls to the :class:`ProcessPoolExecutor` API always eventually return. --- Lib/test/_test_multiprocessing.py | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) (limited to 'Lib/test/_test_multiprocessing.py') diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 7575c5d368..05166b91ba 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -1029,6 +1029,43 @@ class _TestQueue(BaseTestCase): self.assertTrue(q.get(timeout=1.0)) close_queue(q) + def test_queue_feeder_on_queue_feeder_error(self): + # bpo-30006: verify feeder handles exceptions using the + # _on_queue_feeder_error hook. + if self.TYPE != 'processes': + self.skipTest('test not appropriate for {}'.format(self.TYPE)) + + class NotSerializable(object): + """Mock unserializable object""" + def __init__(self): + self.reduce_was_called = False + self.on_queue_feeder_error_was_called = False + + def __reduce__(self): + self.reduce_was_called = True + raise AttributeError + + class SafeQueue(multiprocessing.queues.Queue): + """Queue with overloaded _on_queue_feeder_error hook""" + @staticmethod + def _on_queue_feeder_error(e, obj): + if (isinstance(e, AttributeError) and + isinstance(obj, NotSerializable)): + obj.on_queue_feeder_error_was_called = True + + not_serializable_obj = NotSerializable() + # The captured_stderr reduces the noise in the test report + with test.support.captured_stderr(): + q = SafeQueue(ctx=multiprocessing.get_context()) + q.put(not_serializable_obj) + + # Verify that q is still functionning correctly + q.put(True) + self.assertTrue(q.get(timeout=1.0)) + + # Assert that the serialization and the hook have been called correctly + self.assertTrue(not_serializable_obj.reduce_was_called) + self.assertTrue(not_serializable_obj.on_queue_feeder_error_was_called) # # # -- cgit v1.2.1