From dd696496605883a44da983ad81e55a01e996a004 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Wed, 8 Jun 2011 17:21:55 +0200 Subject: Issue #9205: concurrent.futures.ProcessPoolExecutor now detects killed children and raises BrokenProcessPool in such a situation. Previously it would reliably freeze/deadlock. --- Lib/test/test_concurrent_futures.py | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) (limited to 'Lib/test/test_concurrent_futures.py') diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 7457f39282..5968980ff1 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -19,7 +19,7 @@ import unittest from concurrent import futures from concurrent.futures._base import ( PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future) -import concurrent.futures.process +from concurrent.futures.process import BrokenProcessPool def create_future(state=PENDING, exception=None, result=None): @@ -154,7 +154,7 @@ class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest): processes = self.executor._processes self.executor.shutdown() - for p in processes: + for p in processes.values(): p.join() def test_context_manager_shutdown(self): @@ -163,7 +163,7 @@ class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest): self.assertEqual(list(e.map(abs, range(-5, 5))), [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) - for p in processes: + for p in processes.values(): p.join() def test_del_shutdown(self): @@ -174,7 +174,7 @@ class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest): del executor queue_management_thread.join() - for p in processes: + for p in processes.values(): p.join() class WaitTests(unittest.TestCase): @@ -381,7 +381,17 @@ class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest): class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest): - pass + def test_killed_child(self): + # When a child process is abruptly terminated, the whole pool gets + # "broken". + futures = [self.executor.submit(time.sleep, 3)] + # Get one of the processes, and terminate (kill) it + p = next(iter(self.executor._processes.values())) + p.terminate() + for fut in futures: + self.assertRaises(BrokenProcessPool, fut.result) + # Submitting other jobs fails as well. + self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8) class FutureTests(unittest.TestCase): -- cgit v1.2.1