summaryrefslogtreecommitdiff
path: root/Lib/test/_test_multiprocessing.py
diff options
context:
space:
mode:
authorAntoine Pitrou <pitrou@free.fr>2017-11-03 14:31:38 +0100
committerGitHub <noreply@github.com>2017-11-03 14:31:38 +0100
commitcbe1756e3ecefc0e24a5d0a4b8663db9b6d0cc52 (patch)
treeb8b7f15266c062c7e49adbbb3eb4bdd84b778199 /Lib/test/_test_multiprocessing.py
parentfc6b348b12ad401cab0261b7b71a65c60a08c0a8 (diff)
downloadcpython-git-cbe1756e3ecefc0e24a5d0a4b8663db9b6d0cc52.tar.gz
bpo-31310: multiprocessing's semaphore tracker should be launched again if crashed (#3247)
* bpo-31310: multiprocessing's semaphore tracker should be launched again if crashed * Avoid mucking with process state in test. Add a warning if the semaphore process died, as semaphores may then be leaked. * Add NEWS entry
Diffstat (limited to 'Lib/test/_test_multiprocessing.py')
-rw-r--r--Lib/test/_test_multiprocessing.py43
1 files changed, 39 insertions, 4 deletions
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py
index 799146d8a3..d4e8a8a7e1 100644
--- a/Lib/test/_test_multiprocessing.py
+++ b/Lib/test/_test_multiprocessing.py
@@ -4,6 +4,7 @@
import unittest
import queue as pyqueue
+import contextlib
import time
import io
import itertools
@@ -4344,14 +4345,14 @@ class TestStartMethod(unittest.TestCase):
self.fail("failed spawning forkserver or grandchild")
-#
-# Check that killing process does not leak named semaphores
-#
-
@unittest.skipIf(sys.platform == "win32",
"test semantics don't make sense on Windows")
class TestSemaphoreTracker(unittest.TestCase):
+
def test_semaphore_tracker(self):
+ #
+ # Check that killing process does not leak named semaphores
+ #
import subprocess
cmd = '''if 1:
import multiprocessing as mp, time, os
@@ -4385,6 +4386,40 @@ class TestSemaphoreTracker(unittest.TestCase):
self.assertRegex(err, expected)
self.assertRegex(err, r'semaphore_tracker: %r: \[Errno' % name1)
+ def check_semaphore_tracker_death(self, signum, should_die):
+ # bpo-31310: if the semaphore tracker process has died, it should
+ # be restarted implicitly.
+ from multiprocessing.semaphore_tracker import _semaphore_tracker
+ _semaphore_tracker.ensure_running()
+ pid = _semaphore_tracker._pid
+ os.kill(pid, signum)
+ time.sleep(1.0) # give it time to die
+
+ ctx = multiprocessing.get_context("spawn")
+ with contextlib.ExitStack() as stack:
+ if should_die:
+ stack.enter_context(self.assertWarnsRegex(
+ UserWarning,
+ "semaphore_tracker: process died"))
+ sem = ctx.Semaphore()
+ sem.acquire()
+ sem.release()
+ wr = weakref.ref(sem)
+ # ensure `sem` gets collected, which triggers communication with
+ # the semaphore tracker
+ del sem
+ gc.collect()
+ self.assertIsNone(wr())
+
+ def test_semaphore_tracker_sigint(self):
+ # Catchable signal (ignored by semaphore tracker)
+ self.check_semaphore_tracker_death(signal.SIGINT, False)
+
+ def test_semaphore_tracker_sigkill(self):
+ # Uncatchable signal.
+ self.check_semaphore_tracker_death(signal.SIGKILL, True)
+
+
class TestSimpleQueue(unittest.TestCase):
@classmethod