summaryrefslogtreecommitdiff
path: root/Lib
diff options
context:
space:
mode:
authorVictor Stinner <victor.stinner@haypocalc.com>2011-06-20 23:28:09 +0200
committerVictor Stinner <victor.stinner@haypocalc.com>2011-06-20 23:28:09 +0200
commitd628496323bf594679b3d5b3e10d85da8a1c8996 (patch)
tree0b8b093b87725762bcbd819a01505166694a3c30 /Lib
parent395dc58ee784087547115b684b2b39f40655ac47 (diff)
downloadcpython-git-d628496323bf594679b3d5b3e10d85da8a1c8996.tar.gz
Close #12363: fix a race condition in siginterrupt() tests
The previous tests used time.sleep() to synchronize two processes. If the host was too slow, the test could fail. The new tests only use one process, but they use a subprocess to: - have only one thread - have a timeout on the blocking read (select cannot be used in the test, select always fail with EINTR, the kernel doesn't restart it) - not touch signal handling of the parent process
Diffstat (limited to 'Lib')
-rw-r--r--Lib/test/test_signal.py124
1 files changed, 50 insertions, 74 deletions
diff --git a/Lib/test/test_signal.py b/Lib/test/test_signal.py
index effdbef127..e613f5bc63 100644
--- a/Lib/test/test_signal.py
+++ b/Lib/test/test_signal.py
@@ -9,7 +9,7 @@ import struct
import subprocess
import traceback
import sys, os, time, errno
-from test.script_helper import assert_python_ok
+from test.script_helper import assert_python_ok, spawn_python
try:
import threading
except ImportError:
@@ -314,100 +314,76 @@ class WakeupSignalTests(unittest.TestCase):
@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
class SiginterruptTest(unittest.TestCase):
- def setUp(self):
- """Install a no-op signal handler that can be set to allow
- interrupts or not, and arrange for the original signal handler to be
- re-installed when the test is finished.
- """
- self.signum = signal.SIGUSR1
- oldhandler = signal.signal(self.signum, lambda x,y: None)
- self.addCleanup(signal.signal, self.signum, oldhandler)
-
- def readpipe_interrupted(self):
+ def readpipe_interrupted(self, interrupt):
"""Perform a read during which a signal will arrive. Return True if the
read is interrupted by the signal and raises an exception. Return False
if it returns normally.
"""
- # Create a pipe that can be used for the read. Also clean it up
- # when the test is over, since nothing else will (but see below for
- # the write end).
- r, w = os.pipe()
- self.addCleanup(os.close, r)
-
- # Create another process which can send a signal to this one to try
- # to interrupt the read.
- ppid = os.getpid()
- pid = os.fork()
-
- if pid == 0:
- # Child code: sleep to give the parent enough time to enter the
- # read() call (there's a race here, but it's really tricky to
- # eliminate it); then signal the parent process. Also, sleep
- # again to make it likely that the signal is delivered to the
- # parent process before the child exits. If the child exits
- # first, the write end of the pipe will be closed and the test
- # is invalid.
- try:
- time.sleep(0.2)
- os.kill(ppid, self.signum)
- time.sleep(0.2)
- finally:
- # No matter what, just exit as fast as possible now.
- exit_subprocess()
- else:
- # Parent code.
- # Make sure the child is eventually reaped, else it'll be a
- # zombie for the rest of the test suite run.
- self.addCleanup(os.waitpid, pid, 0)
-
- # Close the write end of the pipe. The child has a copy, so
- # it's not really closed until the child exits. We need it to
- # close when the child exits so that in the non-interrupt case
- # the read eventually completes, otherwise we could just close
- # it *after* the test.
- os.close(w)
-
- # Try the read and report whether it is interrupted or not to
- # the caller.
+ # use a subprocess to have only one thread, to have a timeout on the
+ # blocking read and to not touch signal handling in this process
+ code = """if 1:
+ import errno
+ import os
+ import signal
+ import sys
+
+ interrupt = %r
+ r, w = os.pipe()
+
+ def handler(signum, frame):
+ pass
+
+ signal.signal(signal.SIGALRM, handler)
+ if interrupt is not None:
+ signal.siginterrupt(signal.SIGALRM, interrupt)
+
+ # run the test twice
+ for loop in range(2):
+ # send a SIGALRM in a second (during the read)
+ signal.alarm(1)
+ try:
+ # blocking call: read from a pipe without data
+ os.read(r, 1)
+ except OSError as err:
+ if err.errno != errno.EINTR:
+ raise
+ else:
+ sys.exit(2)
+ sys.exit(3)
+ """ % (interrupt,)
+ with spawn_python('-c', code) as process:
try:
- d = os.read(r, 1)
+ stdout, stderr = process.communicate(timeout=3.0)
+ except subprocess.TimeoutExpired:
+ process.kill()
return False
- except OSError as err:
- if err.errno != errno.EINTR:
- raise
- return True
+ else:
+ exitcode = process.wait()
+ if exitcode not in (2, 3):
+ raise Exception("Child error (exit code %s): %s"
+ % (exitcode, stdout))
+ return (exitcode == 3)
def test_without_siginterrupt(self):
# If a signal handler is installed and siginterrupt is not called
# at all, when that signal arrives, it interrupts a syscall that's in
# progress.
- i = self.readpipe_interrupted()
- self.assertTrue(i)
- # Arrival of the signal shouldn't have changed anything.
- i = self.readpipe_interrupted()
- self.assertTrue(i)
+ interrupted = self.readpipe_interrupted(None)
+ self.assertTrue(interrupted)
def test_siginterrupt_on(self):
# If a signal handler is installed and siginterrupt is called with
# a true value for the second argument, when that signal arrives, it
# interrupts a syscall that's in progress.
- signal.siginterrupt(self.signum, 1)
- i = self.readpipe_interrupted()
- self.assertTrue(i)
- # Arrival of the signal shouldn't have changed anything.
- i = self.readpipe_interrupted()
- self.assertTrue(i)
+ interrupted = self.readpipe_interrupted(True)
+ self.assertTrue(interrupted)
def test_siginterrupt_off(self):
# If a signal handler is installed and siginterrupt is called with
# a false value for the second argument, when that signal arrives, it
# does not interrupt a syscall that's in progress.
- signal.siginterrupt(self.signum, 0)
- i = self.readpipe_interrupted()
- self.assertFalse(i)
- # Arrival of the signal shouldn't have changed anything.
- i = self.readpipe_interrupted()
- self.assertFalse(i)
+ interrupted = self.readpipe_interrupted(False)
+ self.assertFalse(interrupted)
@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")