summaryrefslogtreecommitdiff
path: root/Lib/test/test_threadsignals.py
diff options
context:
space:
mode:
authorVictor Stinner <vstinner@redhat.com>2018-06-01 15:51:02 +0200
committerGitHub <noreply@github.com>2018-06-01 15:51:02 +0200
commit5dbb48aaac0ff74648b355ebdde222856004b1ef (patch)
treec47bd93d6333eb31b1461d75c5463ef058d666b2 /Lib/test/test_threadsignals.py
parent95681c7a7ddd436ba7d6c10d1202c33dd6bd648b (diff)
downloadcpython-git-5dbb48aaac0ff74648b355ebdde222856004b1ef.tar.gz
[3.6] bpo-31234: Add test.support.wait_threads_exit() (GH-3578) (GH-7315)
* bpo-31234: Add test.support.wait_threads_exit() (GH-3578) Use _thread.count() to wait until threads exit. The new context manager prevents the "dangling thread" warning. (cherry picked from commit ff40ecda73178dfcad24e26240d684356ef20793) * bpo-31234: Try to fix lock_tests warning (#3557) Try to fix the "Warning -- threading_cleanup() failed to cleanup 1 threads" warning in test.lock_tests: wait a little bit longer to give time to the threads to complete. Warning seen on test_thread and test_importlib. (cherry picked from commit 096ae3373abac2c8b3a26a3fe33cc8bd4cbccd4e)
Diffstat (limited to 'Lib/test/test_threadsignals.py')
-rw-r--r--Lib/test/test_threadsignals.py92
1 files changed, 50 insertions, 42 deletions
diff --git a/Lib/test/test_threadsignals.py b/Lib/test/test_threadsignals.py
index 7d4d8c4106..99b60cd9e6 100644
--- a/Lib/test/test_threadsignals.py
+++ b/Lib/test/test_threadsignals.py
@@ -4,8 +4,8 @@ import unittest
import signal
import os
import sys
-from test.support import run_unittest, import_module
-thread = import_module('_thread')
+from test import support
+thread = support.import_module('_thread')
import time
if (sys.platform[:3] == 'win'):
@@ -39,13 +39,15 @@ def send_signals():
class ThreadSignals(unittest.TestCase):
def test_signals(self):
- # Test signal handling semantics of threads.
- # We spawn a thread, have the thread send two signals, and
- # wait for it to finish. Check that we got both signals
- # and that they were run by the main thread.
- signalled_all.acquire()
- self.spawnSignallingThread()
- signalled_all.acquire()
+ with support.wait_threads_exit():
+ # Test signal handling semantics of threads.
+ # We spawn a thread, have the thread send two signals, and
+ # wait for it to finish. Check that we got both signals
+ # and that they were run by the main thread.
+ signalled_all.acquire()
+ self.spawnSignallingThread()
+ signalled_all.acquire()
+
# the signals that we asked the kernel to send
# will come back, but we don't know when.
# (it might even be after the thread exits
@@ -118,17 +120,19 @@ class ThreadSignals(unittest.TestCase):
# thread.
def other_thread():
rlock.acquire()
- thread.start_new_thread(other_thread, ())
- # Wait until we can't acquire it without blocking...
- while rlock.acquire(blocking=False):
- rlock.release()
- time.sleep(0.01)
- signal.alarm(1)
- t1 = time.time()
- self.assertRaises(KeyboardInterrupt, rlock.acquire, timeout=5)
- dt = time.time() - t1
- # See rationale above in test_lock_acquire_interruption
- self.assertLess(dt, 3.0)
+
+ with support.wait_threads_exit():
+ thread.start_new_thread(other_thread, ())
+ # Wait until we can't acquire it without blocking...
+ while rlock.acquire(blocking=False):
+ rlock.release()
+ time.sleep(0.01)
+ signal.alarm(1)
+ t1 = time.time()
+ self.assertRaises(KeyboardInterrupt, rlock.acquire, timeout=5)
+ dt = time.time() - t1
+ # See rationale above in test_lock_acquire_interruption
+ self.assertLess(dt, 3.0)
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, oldalrm)
@@ -137,6 +141,7 @@ class ThreadSignals(unittest.TestCase):
self.sig_recvd = False
def my_handler(signal, frame):
self.sig_recvd = True
+
old_handler = signal.signal(signal.SIGUSR1, my_handler)
try:
def other_thread():
@@ -151,14 +156,16 @@ class ThreadSignals(unittest.TestCase):
# the lock acquisition. Then we'll let it run.
time.sleep(0.5)
lock.release()
- thread.start_new_thread(other_thread, ())
- # Wait until we can't acquire it without blocking...
- while lock.acquire(blocking=False):
- lock.release()
- time.sleep(0.01)
- result = lock.acquire() # Block while we receive a signal.
- self.assertTrue(self.sig_recvd)
- self.assertTrue(result)
+
+ with support.wait_threads_exit():
+ thread.start_new_thread(other_thread, ())
+ # Wait until we can't acquire it without blocking...
+ while lock.acquire(blocking=False):
+ lock.release()
+ time.sleep(0.01)
+ result = lock.acquire() # Block while we receive a signal.
+ self.assertTrue(self.sig_recvd)
+ self.assertTrue(result)
finally:
signal.signal(signal.SIGUSR1, old_handler)
@@ -197,19 +204,20 @@ class ThreadSignals(unittest.TestCase):
os.kill(process_pid, signal.SIGUSR1)
done.release()
- # Send the signals from the non-main thread, since the main thread
- # is the only one that can process signals.
- thread.start_new_thread(send_signals, ())
- timed_acquire()
- # Wait for thread to finish
- done.acquire()
- # This allows for some timing and scheduling imprecision
- self.assertLess(self.end - self.start, 2.0)
- self.assertGreater(self.end - self.start, 0.3)
- # If the signal is received several times before PyErr_CheckSignals()
- # is called, the handler will get called less than 40 times. Just
- # check it's been called at least once.
- self.assertGreater(self.sigs_recvd, 0)
+ with support.wait_threads_exit():
+ # Send the signals from the non-main thread, since the main thread
+ # is the only one that can process signals.
+ thread.start_new_thread(send_signals, ())
+ timed_acquire()
+ # Wait for thread to finish
+ done.acquire()
+ # This allows for some timing and scheduling imprecision
+ self.assertLess(self.end - self.start, 2.0)
+ self.assertGreater(self.end - self.start, 0.3)
+ # If the signal is received several times before PyErr_CheckSignals()
+ # is called, the handler will get called less than 40 times. Just
+ # check it's been called at least once.
+ self.assertGreater(self.sigs_recvd, 0)
finally:
signal.signal(signal.SIGUSR1, old_handler)
@@ -223,7 +231,7 @@ def test_main():
oldsigs = registerSignals(handle_signals, handle_signals, handle_signals)
try:
- run_unittest(ThreadSignals)
+ support.run_unittest(ThreadSignals)
finally:
registerSignals(*oldsigs)