summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAntoine Pitrou <solipsis@pitrou.net>2009-11-06 22:41:14 +0000
committerAntoine Pitrou <solipsis@pitrou.net>2009-11-06 22:41:14 +0000
commit557934f5a9a23908603365e483af3434f3c98284 (patch)
treeecb14756332276c5bf294b9362e3da62fcdbedcc
parent1766b9d10e2b0d657bbadde664aeeab5ae9b3966 (diff)
downloadcpython-git-557934f5a9a23908603365e483af3434f3c98284.tar.gz
Merged revisions 76137 via svnmerge from
svn+ssh://pythondev@svn.python.org/python/trunk ........ r76137 | antoine.pitrou | 2009-11-06 23:34:35 +0100 (ven., 06 nov. 2009) | 4 lines Issue #7270: Add some dedicated unit tests for multi-thread synchronization primitives such as Lock, RLock, Condition, Event and Semaphore. ........
-rw-r--r--Lib/test/lock_tests.py533
-rw-r--r--Lib/test/test_thread.py7
-rw-r--r--Lib/test/test_threading.py52
-rw-r--r--Misc/NEWS3
4 files changed, 574 insertions, 21 deletions
diff --git a/Lib/test/lock_tests.py b/Lib/test/lock_tests.py
new file mode 100644
index 0000000000..f9c2259914
--- /dev/null
+++ b/Lib/test/lock_tests.py
@@ -0,0 +1,533 @@
+"""
+Various tests for synchronization primitives.
+"""
+
+import sys
+import time
+from _thread import start_new_thread, get_ident
+import threading
+import unittest
+
+from test import support
+
+
+def _wait():
+ # A crude wait/yield function not relying on synchronization primitives.
+ time.sleep(0.01)
+
+class Bunch(object):
+ """
+ A bunch of threads.
+ """
+ def __init__(self, f, n, wait_before_exit=False):
+ """
+ Construct a bunch of `n` threads running the same function `f`.
+ If `wait_before_exit` is True, the threads won't terminate until
+ do_finish() is called.
+ """
+ self.f = f
+ self.n = n
+ self.started = []
+ self.finished = []
+ self._can_exit = not wait_before_exit
+ def task():
+ tid = get_ident()
+ self.started.append(tid)
+ try:
+ f()
+ finally:
+ self.finished.append(tid)
+ while not self._can_exit:
+ _wait()
+ for i in range(n):
+ start_new_thread(task, ())
+
+ def wait_for_started(self):
+ while len(self.started) < self.n:
+ _wait()
+
+ def wait_for_finished(self):
+ while len(self.finished) < self.n:
+ _wait()
+
+ def do_finish(self):
+ self._can_exit = True
+
+
+class BaseTestCase(unittest.TestCase):
+ def setUp(self):
+ self._threads = support.threading_setup()
+
+ def tearDown(self):
+ support.threading_cleanup(*self._threads)
+ support.reap_children()
+
+
+class BaseLockTests(BaseTestCase):
+ """
+ Tests for both recursive and non-recursive locks.
+ """
+
+ def test_constructor(self):
+ lock = self.locktype()
+ del lock
+
+ def test_acquire_destroy(self):
+ lock = self.locktype()
+ lock.acquire()
+ del lock
+
+ def test_acquire_release(self):
+ lock = self.locktype()
+ lock.acquire()
+ lock.release()
+ del lock
+
+ def test_try_acquire(self):
+ lock = self.locktype()
+ self.assertTrue(lock.acquire(False))
+ lock.release()
+
+ def test_try_acquire_contended(self):
+ lock = self.locktype()
+ lock.acquire()
+ result = []
+ def f():
+ result.append(lock.acquire(False))
+ Bunch(f, 1).wait_for_finished()
+ self.assertFalse(result[0])
+ lock.release()
+
+ def test_acquire_contended(self):
+ lock = self.locktype()
+ lock.acquire()
+ N = 5
+ def f():
+ lock.acquire()
+ lock.release()
+
+ b = Bunch(f, N)
+ b.wait_for_started()
+ _wait()
+ self.assertEqual(len(b.finished), 0)
+ lock.release()
+ b.wait_for_finished()
+ self.assertEqual(len(b.finished), N)
+
+ def test_with(self):
+ lock = self.locktype()
+ def f():
+ lock.acquire()
+ lock.release()
+ def _with(err=None):
+ with lock:
+ if err is not None:
+ raise err
+ _with()
+ # Check the lock is unacquired
+ Bunch(f, 1).wait_for_finished()
+ self.assertRaises(TypeError, _with, TypeError)
+ # Check the lock is unacquired
+ Bunch(f, 1).wait_for_finished()
+
+
+class LockTests(BaseLockTests):
+ """
+ Tests for non-recursive, weak locks
+ (which can be acquired and released from different threads).
+ """
+ def test_reacquire(self):
+ # Lock needs to be released before re-acquiring.
+ lock = self.locktype()
+ phase = []
+ def f():
+ lock.acquire()
+ phase.append(None)
+ lock.acquire()
+ phase.append(None)
+ start_new_thread(f, ())
+ while len(phase) == 0:
+ _wait()
+ _wait()
+ self.assertEqual(len(phase), 1)
+ lock.release()
+ while len(phase) == 1:
+ _wait()
+ self.assertEqual(len(phase), 2)
+
+ def test_different_thread(self):
+ # Lock can be released from a different thread.
+ lock = self.locktype()
+ lock.acquire()
+ def f():
+ lock.release()
+ b = Bunch(f, 1)
+ b.wait_for_finished()
+ lock.acquire()
+ lock.release()
+
+
+class RLockTests(BaseLockTests):
+ """
+ Tests for recursive locks.
+ """
+ def test_reacquire(self):
+ lock = self.locktype()
+ lock.acquire()
+ lock.acquire()
+ lock.release()
+ lock.acquire()
+ lock.release()
+ lock.release()
+
+ def test_release_unacquired(self):
+ # Cannot release an unacquired lock
+ lock = self.locktype()
+ self.assertRaises(RuntimeError, lock.release)
+ lock.acquire()
+ lock.acquire()
+ lock.release()
+ lock.acquire()
+ lock.release()
+ lock.release()
+ self.assertRaises(RuntimeError, lock.release)
+
+ def test_different_thread(self):
+ # Cannot release from a different thread
+ lock = self.locktype()
+ def f():
+ lock.acquire()
+ b = Bunch(f, 1, True)
+ try:
+ self.assertRaises(RuntimeError, lock.release)
+ finally:
+ b.do_finish()
+
+ def test__is_owned(self):
+ lock = self.locktype()
+ self.assertFalse(lock._is_owned())
+ lock.acquire()
+ self.assertTrue(lock._is_owned())
+ lock.acquire()
+ self.assertTrue(lock._is_owned())
+ result = []
+ def f():
+ result.append(lock._is_owned())
+ Bunch(f, 1).wait_for_finished()
+ self.assertFalse(result[0])
+ lock.release()
+ self.assertTrue(lock._is_owned())
+ lock.release()
+ self.assertFalse(lock._is_owned())
+
+
+class EventTests(BaseTestCase):
+ """
+ Tests for Event objects.
+ """
+
+ def test_is_set(self):
+ evt = self.eventtype()
+ self.assertFalse(evt.is_set())
+ evt.set()
+ self.assertTrue(evt.is_set())
+ evt.set()
+ self.assertTrue(evt.is_set())
+ evt.clear()
+ self.assertFalse(evt.is_set())
+ evt.clear()
+ self.assertFalse(evt.is_set())
+
+ def _check_notify(self, evt):
+ # All threads get notified
+ N = 5
+ results1 = []
+ results2 = []
+ def f():
+ results1.append(evt.wait())
+ results2.append(evt.wait())
+ b = Bunch(f, N)
+ b.wait_for_started()
+ _wait()
+ self.assertEqual(len(results1), 0)
+ evt.set()
+ b.wait_for_finished()
+ self.assertEqual(results1, [True] * N)
+ self.assertEqual(results2, [True] * N)
+
+ def test_notify(self):
+ evt = self.eventtype()
+ self._check_notify(evt)
+ # Another time, after an explicit clear()
+ evt.set()
+ evt.clear()
+ self._check_notify(evt)
+
+ def test_timeout(self):
+ evt = self.eventtype()
+ results1 = []
+ results2 = []
+ N = 5
+ def f():
+ results1.append(evt.wait(0.0))
+ t1 = time.time()
+ r = evt.wait(0.2)
+ t2 = time.time()
+ results2.append((r, t2 - t1))
+ Bunch(f, N).wait_for_finished()
+ self.assertEqual(results1, [False] * N)
+ for r, dt in results2:
+ self.assertFalse(r)
+ self.assertTrue(dt >= 0.2, dt)
+ # The event is set
+ results1 = []
+ results2 = []
+ evt.set()
+ Bunch(f, N).wait_for_finished()
+ self.assertEqual(results1, [True] * N)
+ for r, dt in results2:
+ self.assertTrue(r)
+
+
+class ConditionTests(BaseTestCase):
+ """
+ Tests for condition variables.
+ """
+
+ def test_acquire(self):
+ cond = self.condtype()
+ # Be default we have an RLock: the condition can be acquired multiple
+ # times.
+ cond.acquire()
+ cond.acquire()
+ cond.release()
+ cond.release()
+ lock = threading.Lock()
+ cond = self.condtype(lock)
+ cond.acquire()
+ self.assertFalse(lock.acquire(False))
+ cond.release()
+ self.assertTrue(lock.acquire(False))
+ self.assertFalse(cond.acquire(False))
+ lock.release()
+ with cond:
+ self.assertFalse(lock.acquire(False))
+
+ def test_unacquired_wait(self):
+ cond = self.condtype()
+ self.assertRaises(RuntimeError, cond.wait)
+
+ def test_unacquired_notify(self):
+ cond = self.condtype()
+ self.assertRaises(RuntimeError, cond.notify)
+
+ def _check_notify(self, cond):
+ N = 5
+ results1 = []
+ results2 = []
+ phase_num = 0
+ def f():
+ cond.acquire()
+ cond.wait()
+ cond.release()
+ results1.append(phase_num)
+ cond.acquire()
+ cond.wait()
+ cond.release()
+ results2.append(phase_num)
+ b = Bunch(f, N)
+ b.wait_for_started()
+ _wait()
+ self.assertEqual(results1, [])
+ # Notify 3 threads at first
+ cond.acquire()
+ cond.notify(3)
+ _wait()
+ phase_num = 1
+ cond.release()
+ while len(results1) < 3:
+ _wait()
+ self.assertEqual(results1, [1] * 3)
+ self.assertEqual(results2, [])
+ # Notify 5 threads: they might be in their first or second wait
+ cond.acquire()
+ cond.notify(5)
+ _wait()
+ phase_num = 2
+ cond.release()
+ while len(results1) + len(results2) < 8:
+ _wait()
+ self.assertEqual(results1, [1] * 3 + [2] * 2)
+ self.assertEqual(results2, [2] * 3)
+ # Notify all threads: they are all in their second wait
+ cond.acquire()
+ cond.notify_all()
+ _wait()
+ phase_num = 3
+ cond.release()
+ while len(results2) < 5:
+ _wait()
+ self.assertEqual(results1, [1] * 3 + [2] * 2)
+ self.assertEqual(results2, [2] * 3 + [3] * 2)
+ b.wait_for_finished()
+
+ def test_notify(self):
+ cond = self.condtype()
+ self._check_notify(cond)
+ # A second time, to check internal state is still ok.
+ self._check_notify(cond)
+
+ def test_timeout(self):
+ cond = self.condtype()
+ results = []
+ N = 5
+ def f():
+ cond.acquire()
+ t1 = time.time()
+ cond.wait(0.2)
+ t2 = time.time()
+ cond.release()
+ results.append(t2 - t1)
+ Bunch(f, N).wait_for_finished()
+ self.assertEqual(len(results), 5)
+ for dt in results:
+ self.assertTrue(dt >= 0.2, dt)
+
+
+class BaseSemaphoreTests(BaseTestCase):
+ """
+ Common tests for {bounded, unbounded} semaphore objects.
+ """
+
+ def test_constructor(self):
+ self.assertRaises(ValueError, self.semtype, value = -1)
+ self.assertRaises(ValueError, self.semtype, value = -sys.maxsize)
+
+ def test_acquire(self):
+ sem = self.semtype(1)
+ sem.acquire()
+ sem.release()
+ sem = self.semtype(2)
+ sem.acquire()
+ sem.acquire()
+ sem.release()
+ sem.release()
+
+ def test_acquire_destroy(self):
+ sem = self.semtype()
+ sem.acquire()
+ del sem
+
+ def test_acquire_contended(self):
+ sem = self.semtype(7)
+ sem.acquire()
+ N = 10
+ results1 = []
+ results2 = []
+ phase_num = 0
+ def f():
+ sem.acquire()
+ results1.append(phase_num)
+ sem.acquire()
+ results2.append(phase_num)
+ b = Bunch(f, 10)
+ b.wait_for_started()
+ while len(results1) + len(results2) < 6:
+ _wait()
+ self.assertEqual(results1 + results2, [0] * 6)
+ phase_num = 1
+ for i in range(7):
+ sem.release()
+ while len(results1) + len(results2) < 13:
+ _wait()
+ self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)
+ phase_num = 2
+ for i in range(6):
+ sem.release()
+ while len(results1) + len(results2) < 19:
+ _wait()
+ self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)
+ # The semaphore is still locked
+ self.assertFalse(sem.acquire(False))
+ # Final release, to let the last thread finish
+ sem.release()
+ b.wait_for_finished()
+
+ def test_try_acquire(self):
+ sem = self.semtype(2)
+ self.assertTrue(sem.acquire(False))
+ self.assertTrue(sem.acquire(False))
+ self.assertFalse(sem.acquire(False))
+ sem.release()
+ self.assertTrue(sem.acquire(False))
+
+ def test_try_acquire_contended(self):
+ sem = self.semtype(4)
+ sem.acquire()
+ results = []
+ def f():
+ results.append(sem.acquire(False))
+ results.append(sem.acquire(False))
+ Bunch(f, 5).wait_for_finished()
+ # There can be a thread switch between acquiring the semaphore and
+ # appending the result, therefore results will not necessarily be
+ # ordered.
+ self.assertEqual(sorted(results), [False] * 7 + [True] * 3 )
+
+ def test_default_value(self):
+ # The default initial value is 1.
+ sem = self.semtype()
+ sem.acquire()
+ def f():
+ sem.acquire()
+ sem.release()
+ b = Bunch(f, 1)
+ b.wait_for_started()
+ _wait()
+ self.assertFalse(b.finished)
+ sem.release()
+ b.wait_for_finished()
+
+ def test_with(self):
+ sem = self.semtype(2)
+ def _with(err=None):
+ with sem:
+ self.assertTrue(sem.acquire(False))
+ sem.release()
+ with sem:
+ self.assertFalse(sem.acquire(False))
+ if err:
+ raise err
+ _with()
+ self.assertTrue(sem.acquire(False))
+ sem.release()
+ self.assertRaises(TypeError, _with, TypeError)
+ self.assertTrue(sem.acquire(False))
+ sem.release()
+
+class SemaphoreTests(BaseSemaphoreTests):
+ """
+ Tests for unbounded semaphores.
+ """
+
+ def test_release_unacquired(self):
+ # Unbounded releases are allowed and increment the semaphore's value
+ sem = self.semtype(1)
+ sem.release()
+ sem.acquire()
+ sem.acquire()
+ sem.release()
+
+
+class BoundedSemaphoreTests(BaseSemaphoreTests):
+ """
+ Tests for bounded semaphores.
+ """
+
+ def test_release_unacquired(self):
+ # Cannot go past the initial value
+ sem = self.semtype()
+ self.assertRaises(ValueError, sem.release)
+ sem.acquire()
+ sem.release()
+ self.assertRaises(ValueError, sem.release)
diff --git a/Lib/test/test_thread.py b/Lib/test/test_thread.py
index c25668d781..70d89fe41d 100644
--- a/Lib/test/test_thread.py
+++ b/Lib/test/test_thread.py
@@ -6,6 +6,7 @@ import _thread as thread
import time
import weakref
+from test import lock_tests
NUMTASKS = 10
NUMTRIPS = 3
@@ -188,8 +189,12 @@ class BarrierTest(BasicThreadTest):
if finished:
self.done_mutex.release()
+class LockTests(lock_tests.LockTests):
+ locktype = thread.allocate_lock
+
+
def test_main():
- support.run_unittest(ThreadRunningTests, BarrierTest)
+ support.run_unittest(ThreadRunningTests, BarrierTest, LockTests)
if __name__ == "__main__":
test_main()
diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py
index b5b05c359c..8f7c676ee4 100644
--- a/Lib/test/test_threading.py
+++ b/Lib/test/test_threading.py
@@ -12,6 +12,8 @@ import unittest
import weakref
import os
+from test import lock_tests
+
# A trivial mutable counter.
class Counter(object):
def __init__(self):
@@ -487,22 +489,6 @@ class ThreadingExceptionTests(BaseTestCase):
thread.start()
self.assertRaises(RuntimeError, thread.start)
- def test_releasing_unacquired_rlock(self):
- rlock = threading.RLock()
- self.assertRaises(RuntimeError, rlock.release)
-
- def test_waiting_on_unacquired_condition(self):
- cond = threading.Condition()
- self.assertRaises(RuntimeError, cond.wait)
-
- def test_notify_on_unacquired_condition(self):
- cond = threading.Condition()
- self.assertRaises(RuntimeError, cond.notify)
-
- def test_semaphore_with_negative_value(self):
- self.assertRaises(ValueError, threading.Semaphore, value = -1)
- self.assertRaises(ValueError, threading.Semaphore, value = -sys.maxsize)
-
def test_joining_current_thread(self):
current_thread = threading.current_thread()
self.assertRaises(RuntimeError, current_thread.join);
@@ -517,11 +503,37 @@ class ThreadingExceptionTests(BaseTestCase):
self.assertRaises(RuntimeError, setattr, thread, "daemon", True)
+class LockTests(lock_tests.LockTests):
+ locktype = staticmethod(threading.Lock)
+
+class RLockTests(lock_tests.RLockTests):
+ locktype = staticmethod(threading.RLock)
+
+class EventTests(lock_tests.EventTests):
+ eventtype = staticmethod(threading.Event)
+
+class ConditionAsRLockTests(lock_tests.RLockTests):
+ # An Condition uses an RLock by default and exports its API.
+ locktype = staticmethod(threading.Condition)
+
+class ConditionTests(lock_tests.ConditionTests):
+ condtype = staticmethod(threading.Condition)
+
+class SemaphoreTests(lock_tests.SemaphoreTests):
+ semtype = staticmethod(threading.Semaphore)
+
+class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests):
+ semtype = staticmethod(threading.BoundedSemaphore)
+
+
def test_main():
- test.support.run_unittest(ThreadTests,
- ThreadJoinOnShutdown,
- ThreadingExceptionTests,
- )
+ test.support.run_unittest(LockTests, RLockTests, EventTests,
+ ConditionAsRLockTests, ConditionTests,
+ SemaphoreTests, BoundedSemaphoreTests,
+ ThreadTests,
+ ThreadJoinOnShutdown,
+ ThreadingExceptionTests,
+ )
if __name__ == "__main__":
test_main()
diff --git a/Misc/NEWS b/Misc/NEWS
index 1e93aaa7d6..0b98ccbf8e 100644
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -357,6 +357,9 @@ Documentation
Tests
-----
+- Issue #7270: Add some dedicated unit tests for multi-thread synchronization
+ primitives such as Lock, RLock, Condition, Event and Semaphore.
+
- Issue #7248 (part 2): Use a unique temporary directory for importlib source
tests instead of tempfile.tempdir. This prevents the tests from sharing state
between concurrent executions on the same system.