From cbe1756e3ecefc0e24a5d0a4b8663db9b6d0cc52 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Fri, 3 Nov 2017 14:31:38 +0100 Subject: bpo-31310: multiprocessing's semaphore tracker should be launched again if crashed (#3247) * bpo-31310: multiprocessing's semaphore tracker should be launched again if crashed * Avoid mucking with process state in test. Add a warning if the semaphore process died, as semaphores may then be leaked. * Add NEWS entry --- Lib/test/_test_multiprocessing.py | 43 +++++++++++++++++++++++++++++++++++---- 1 file changed, 39 insertions(+), 4 deletions(-) (limited to 'Lib/test/_test_multiprocessing.py') diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 799146d8a3..d4e8a8a7e1 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -4,6 +4,7 @@ import unittest import queue as pyqueue +import contextlib import time import io import itertools @@ -4344,14 +4345,14 @@ class TestStartMethod(unittest.TestCase): self.fail("failed spawning forkserver or grandchild") -# -# Check that killing process does not leak named semaphores -# - @unittest.skipIf(sys.platform == "win32", "test semantics don't make sense on Windows") class TestSemaphoreTracker(unittest.TestCase): + def test_semaphore_tracker(self): + # + # Check that killing process does not leak named semaphores + # import subprocess cmd = '''if 1: import multiprocessing as mp, time, os @@ -4385,6 +4386,40 @@ class TestSemaphoreTracker(unittest.TestCase): self.assertRegex(err, expected) self.assertRegex(err, r'semaphore_tracker: %r: \[Errno' % name1) + def check_semaphore_tracker_death(self, signum, should_die): + # bpo-31310: if the semaphore tracker process has died, it should + # be restarted implicitly. + from multiprocessing.semaphore_tracker import _semaphore_tracker + _semaphore_tracker.ensure_running() + pid = _semaphore_tracker._pid + os.kill(pid, signum) + time.sleep(1.0) # give it time to die + + ctx = multiprocessing.get_context("spawn") + with contextlib.ExitStack() as stack: + if should_die: + stack.enter_context(self.assertWarnsRegex( + UserWarning, + "semaphore_tracker: process died")) + sem = ctx.Semaphore() + sem.acquire() + sem.release() + wr = weakref.ref(sem) + # ensure `sem` gets collected, which triggers communication with + # the semaphore tracker + del sem + gc.collect() + self.assertIsNone(wr()) + + def test_semaphore_tracker_sigint(self): + # Catchable signal (ignored by semaphore tracker) + self.check_semaphore_tracker_death(signal.SIGINT, False) + + def test_semaphore_tracker_sigkill(self): + # Uncatchable signal. + self.check_semaphore_tracker_death(signal.SIGKILL, True) + + class TestSimpleQueue(unittest.TestCase): @classmethod -- cgit v1.2.1