summaryrefslogtreecommitdiff
path: root/Lib/test/_test_multiprocessing.py
diff options
context:
space:
mode:
authorThomas Moreau <thomas.moreau.2010@gmail.com>2018-01-05 11:15:54 +0100
committerAntoine Pitrou <pitrou@free.fr>2018-01-05 11:15:54 +0100
commit94459fd7dc25ce19096f2080eb7339497d319eb0 (patch)
tree7623769fafc2025884ac9a8b1a41e2f0ba5f13db /Lib/test/_test_multiprocessing.py
parent65f2a6dcc2bc28a8566b74c8e9273f982331ec48 (diff)
downloadcpython-git-94459fd7dc25ce19096f2080eb7339497d319eb0.tar.gz
bpo-31699 Deadlocks in `concurrent.futures.ProcessPoolExecutor` with pickling error (#3895)
Fix deadlocks in :class:`concurrent.futures.ProcessPoolExecutor` when task arguments or results cause pickling or unpickling errors. This should make sure that calls to the :class:`ProcessPoolExecutor` API always eventually return.
Diffstat (limited to 'Lib/test/_test_multiprocessing.py')
-rw-r--r--Lib/test/_test_multiprocessing.py37
1 files changed, 37 insertions, 0 deletions
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py
index 7575c5d368..05166b91ba 100644
--- a/Lib/test/_test_multiprocessing.py
+++ b/Lib/test/_test_multiprocessing.py
@@ -1029,6 +1029,43 @@ class _TestQueue(BaseTestCase):
self.assertTrue(q.get(timeout=1.0))
close_queue(q)
+ def test_queue_feeder_on_queue_feeder_error(self):
+ # bpo-30006: verify feeder handles exceptions using the
+ # _on_queue_feeder_error hook.
+ if self.TYPE != 'processes':
+ self.skipTest('test not appropriate for {}'.format(self.TYPE))
+
+ class NotSerializable(object):
+ """Mock unserializable object"""
+ def __init__(self):
+ self.reduce_was_called = False
+ self.on_queue_feeder_error_was_called = False
+
+ def __reduce__(self):
+ self.reduce_was_called = True
+ raise AttributeError
+
+ class SafeQueue(multiprocessing.queues.Queue):
+ """Queue with overloaded _on_queue_feeder_error hook"""
+ @staticmethod
+ def _on_queue_feeder_error(e, obj):
+ if (isinstance(e, AttributeError) and
+ isinstance(obj, NotSerializable)):
+ obj.on_queue_feeder_error_was_called = True
+
+ not_serializable_obj = NotSerializable()
+ # The captured_stderr reduces the noise in the test report
+ with test.support.captured_stderr():
+ q = SafeQueue(ctx=multiprocessing.get_context())
+ q.put(not_serializable_obj)
+
+ # Verify that q is still functionning correctly
+ q.put(True)
+ self.assertTrue(q.get(timeout=1.0))
+
+ # Assert that the serialization and the hook have been called correctly
+ self.assertTrue(not_serializable_obj.reduce_was_called)
+ self.assertTrue(not_serializable_obj.on_queue_feeder_error_was_called)
#
#
#