diff options
author | Thomas Moreau <thomas.moreau.2010@gmail.com> | 2018-01-05 11:15:54 +0100 |
---|---|---|
committer | Antoine Pitrou <pitrou@free.fr> | 2018-01-05 11:15:54 +0100 |
commit | 94459fd7dc25ce19096f2080eb7339497d319eb0 (patch) | |
tree | 7623769fafc2025884ac9a8b1a41e2f0ba5f13db /Lib/test/_test_multiprocessing.py | |
parent | 65f2a6dcc2bc28a8566b74c8e9273f982331ec48 (diff) | |
download | cpython-git-94459fd7dc25ce19096f2080eb7339497d319eb0.tar.gz |
bpo-31699 Deadlocks in `concurrent.futures.ProcessPoolExecutor` with pickling error (#3895)
Fix deadlocks in :class:`concurrent.futures.ProcessPoolExecutor` when task arguments or results cause pickling or unpickling errors.
This should make sure that calls to the :class:`ProcessPoolExecutor` API always eventually return.
Diffstat (limited to 'Lib/test/_test_multiprocessing.py')
-rw-r--r-- | Lib/test/_test_multiprocessing.py | 37 |
1 files changed, 37 insertions, 0 deletions
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 7575c5d368..05166b91ba 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -1029,6 +1029,43 @@ class _TestQueue(BaseTestCase): self.assertTrue(q.get(timeout=1.0)) close_queue(q) + def test_queue_feeder_on_queue_feeder_error(self): + # bpo-30006: verify feeder handles exceptions using the + # _on_queue_feeder_error hook. + if self.TYPE != 'processes': + self.skipTest('test not appropriate for {}'.format(self.TYPE)) + + class NotSerializable(object): + """Mock unserializable object""" + def __init__(self): + self.reduce_was_called = False + self.on_queue_feeder_error_was_called = False + + def __reduce__(self): + self.reduce_was_called = True + raise AttributeError + + class SafeQueue(multiprocessing.queues.Queue): + """Queue with overloaded _on_queue_feeder_error hook""" + @staticmethod + def _on_queue_feeder_error(e, obj): + if (isinstance(e, AttributeError) and + isinstance(obj, NotSerializable)): + obj.on_queue_feeder_error_was_called = True + + not_serializable_obj = NotSerializable() + # The captured_stderr reduces the noise in the test report + with test.support.captured_stderr(): + q = SafeQueue(ctx=multiprocessing.get_context()) + q.put(not_serializable_obj) + + # Verify that q is still functionning correctly + q.put(True) + self.assertTrue(q.get(timeout=1.0)) + + # Assert that the serialization and the hook have been called correctly + self.assertTrue(not_serializable_obj.reduce_was_called) + self.assertTrue(not_serializable_obj.on_queue_feeder_error_was_called) # # # |