From b9b69003d91c6ea94b890ce24ed25686d30f1428 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Thu, 14 Sep 2017 14:40:56 -0700 Subject: bpo-31234: Add support.join_thread() helper (#3587) join_thread() joins a thread but raises an AssertionError if the thread is still alive after timeout seconds. --- Lib/test/_test_multiprocessing.py | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) (limited to 'Lib/test/_test_multiprocessing.py') diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index d6fe7d6267..bddcdadfee 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -21,6 +21,7 @@ import operator import weakref import test.support import test.support.script_helper +from test import support # Skip tests if _multiprocessing wasn't built. @@ -72,6 +73,12 @@ def close_queue(queue): queue.join_thread() +def join_process(process, timeout): + # Since multiprocessing.Process has the same API than threading.Thread + # (join() and is_alive(), the support function can be reused + support.join_thread(process, timeout) + + # # Constants # @@ -477,7 +484,7 @@ class _TestProcess(BaseTestCase): for p in procs: p.start() for p in procs: - p.join(timeout=10) + join_process(p, timeout=10) for p in procs: self.assertEqual(p.exitcode, 0) @@ -489,7 +496,7 @@ class _TestProcess(BaseTestCase): for p in procs: p.terminate() for p in procs: - p.join(timeout=10) + join_process(p, timeout=10) if os.name != 'nt': for p in procs: self.assertEqual(p.exitcode, -signal.SIGTERM) @@ -652,7 +659,7 @@ class _TestSubclassingProcess(BaseTestCase): p = self.Process(target=self._test_sys_exit, args=(reason, testfn)) p.daemon = True p.start() - p.join(5) + join_process(p, timeout=5) self.assertEqual(p.exitcode, 1) with open(testfn, 'r') as f: @@ -665,7 +672,7 @@ class _TestSubclassingProcess(BaseTestCase): p = self.Process(target=sys.exit, args=(reason,)) p.daemon = True p.start() - p.join(5) + join_process(p, timeout=5) self.assertEqual(p.exitcode, reason) # @@ -1254,8 +1261,7 @@ class _TestCondition(BaseTestCase): state.value += 1 cond.notify() - p.join(5) - self.assertFalse(p.is_alive()) + join_process(p, timeout=5) self.assertEqual(p.exitcode, 0) @classmethod @@ -1291,7 +1297,7 @@ class _TestCondition(BaseTestCase): state.value += 1 cond.notify() - p.join(5) + join_process(p, timeout=5) self.assertTrue(success.value) @classmethod @@ -4005,7 +4011,7 @@ class TestTimeouts(unittest.TestCase): self.assertEqual(conn.recv(), 456) conn.close() l.close() - p.join(10) + join_process(p, timeout=10) finally: socket.setdefaulttimeout(old_timeout) @@ -4041,7 +4047,7 @@ class TestForkAwareThreadLock(unittest.TestCase): p = multiprocessing.Process(target=cls.child, args=(n-1, conn)) p.start() conn.close() - p.join(timeout=5) + join_process(p, timeout=5) else: conn.send(len(util._afterfork_registry)) conn.close() @@ -4054,7 +4060,7 @@ class TestForkAwareThreadLock(unittest.TestCase): p.start() w.close() new_size = r.recv() - p.join(timeout=5) + join_process(p, timeout=5) self.assertLessEqual(new_size, old_size) # @@ -4109,7 +4115,7 @@ class TestCloseFds(unittest.TestCase): p.start() writer.close() e = reader.recv() - p.join(timeout=5) + join_process(p, timeout=5) finally: self.close(fd) writer.close() -- cgit v1.2.1