summaryrefslogtreecommitdiff
path: root/Lib/test/test_importlib/test_locks.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_importlib/test_locks.py')
-rw-r--r--Lib/test/test_importlib/test_locks.py178
1 files changed, 99 insertions, 79 deletions
diff --git a/Lib/test/test_importlib/test_locks.py b/Lib/test/test_importlib/test_locks.py
index dc97ba1567..df0af12d38 100644
--- a/Lib/test/test_importlib/test_locks.py
+++ b/Lib/test/test_importlib/test_locks.py
@@ -1,7 +1,6 @@
-from . import util
-frozen_init, source_init = util.import_importlib('importlib')
-frozen_bootstrap = frozen_init._bootstrap
-source_bootstrap = source_init._bootstrap
+from . import util as test_util
+
+init = test_util.import_importlib('importlib')
import sys
import time
@@ -32,14 +31,20 @@ if threading is not None:
test_timeout = None
# _release_save() unsupported
test_release_save_unacquired = None
+ # lock status in repr unsupported
+ test_repr = None
+ test_locked_repr = None
- class Frozen_ModuleLockAsRLockTests(ModuleLockAsRLockTests, lock_tests.RLockTests):
- LockType = frozen_bootstrap._ModuleLock
-
- class Source_ModuleLockAsRLockTests(ModuleLockAsRLockTests, lock_tests.RLockTests):
- LockType = source_bootstrap._ModuleLock
+ LOCK_TYPES = {kind: splitinit._bootstrap._ModuleLock
+ for kind, splitinit in init.items()}
+ (Frozen_ModuleLockAsRLockTests,
+ Source_ModuleLockAsRLockTests
+ ) = test_util.test_both(ModuleLockAsRLockTests, lock_tests.RLockTests,
+ LockType=LOCK_TYPES)
else:
+ LOCK_TYPES = {}
+
class Frozen_ModuleLockAsRLockTests(unittest.TestCase):
pass
@@ -47,78 +52,94 @@ else:
pass
-class DeadlockAvoidanceTests:
-
- def setUp(self):
- try:
- self.old_switchinterval = sys.getswitchinterval()
- sys.setswitchinterval(0.000001)
- except AttributeError:
- self.old_switchinterval = None
-
- def tearDown(self):
- if self.old_switchinterval is not None:
- sys.setswitchinterval(self.old_switchinterval)
-
- def run_deadlock_avoidance_test(self, create_deadlock):
- NLOCKS = 10
- locks = [self.LockType(str(i)) for i in range(NLOCKS)]
- pairs = [(locks[i], locks[(i+1)%NLOCKS]) for i in range(NLOCKS)]
- if create_deadlock:
- NTHREADS = NLOCKS
- else:
- NTHREADS = NLOCKS - 1
- barrier = threading.Barrier(NTHREADS)
- results = []
- def _acquire(lock):
- """Try to acquire the lock. Return True on success, False on deadlock."""
+if threading is not None:
+ class DeadlockAvoidanceTests:
+
+ def setUp(self):
try:
- lock.acquire()
- except self.DeadlockError:
- return False
+ self.old_switchinterval = sys.getswitchinterval()
+ sys.setswitchinterval(0.000001)
+ except AttributeError:
+ self.old_switchinterval = None
+
+ def tearDown(self):
+ if self.old_switchinterval is not None:
+ sys.setswitchinterval(self.old_switchinterval)
+
+ def run_deadlock_avoidance_test(self, create_deadlock):
+ NLOCKS = 10
+ locks = [self.LockType(str(i)) for i in range(NLOCKS)]
+ pairs = [(locks[i], locks[(i+1)%NLOCKS]) for i in range(NLOCKS)]
+ if create_deadlock:
+ NTHREADS = NLOCKS
else:
- return True
- def f():
- a, b = pairs.pop()
- ra = _acquire(a)
- barrier.wait()
- rb = _acquire(b)
- results.append((ra, rb))
- if rb:
- b.release()
- if ra:
- a.release()
- lock_tests.Bunch(f, NTHREADS).wait_for_finished()
- self.assertEqual(len(results), NTHREADS)
- return results
-
- def test_deadlock(self):
- results = self.run_deadlock_avoidance_test(True)
- # At least one of the threads detected a potential deadlock on its
- # second acquire() call. It may be several of them, because the
- # deadlock avoidance mechanism is conservative.
- nb_deadlocks = results.count((True, False))
- self.assertGreaterEqual(nb_deadlocks, 1)
- self.assertEqual(results.count((True, True)), len(results) - nb_deadlocks)
-
- def test_no_deadlock(self):
- results = self.run_deadlock_avoidance_test(False)
- self.assertEqual(results.count((True, False)), 0)
- self.assertEqual(results.count((True, True)), len(results))
-
-@unittest.skipUnless(threading, "threads needed for this test")
-class Frozen_DeadlockAvoidanceTests(DeadlockAvoidanceTests, unittest.TestCase):
- LockType = frozen_bootstrap._ModuleLock
- DeadlockError = frozen_bootstrap._DeadlockError
-
-@unittest.skipUnless(threading, "threads needed for this test")
-class Source_DeadlockAvoidanceTests(DeadlockAvoidanceTests, unittest.TestCase):
- LockType = source_bootstrap._ModuleLock
- DeadlockError = source_bootstrap._DeadlockError
+ NTHREADS = NLOCKS - 1
+ barrier = threading.Barrier(NTHREADS)
+ results = []
+
+ def _acquire(lock):
+ """Try to acquire the lock. Return True on success,
+ False on deadlock."""
+ try:
+ lock.acquire()
+ except self.DeadlockError:
+ return False
+ else:
+ return True
+
+ def f():
+ a, b = pairs.pop()
+ ra = _acquire(a)
+ barrier.wait()
+ rb = _acquire(b)
+ results.append((ra, rb))
+ if rb:
+ b.release()
+ if ra:
+ a.release()
+ lock_tests.Bunch(f, NTHREADS).wait_for_finished()
+ self.assertEqual(len(results), NTHREADS)
+ return results
+
+ def test_deadlock(self):
+ results = self.run_deadlock_avoidance_test(True)
+ # At least one of the threads detected a potential deadlock on its
+ # second acquire() call. It may be several of them, because the
+ # deadlock avoidance mechanism is conservative.
+ nb_deadlocks = results.count((True, False))
+ self.assertGreaterEqual(nb_deadlocks, 1)
+ self.assertEqual(results.count((True, True)), len(results) - nb_deadlocks)
+
+ def test_no_deadlock(self):
+ results = self.run_deadlock_avoidance_test(False)
+ self.assertEqual(results.count((True, False)), 0)
+ self.assertEqual(results.count((True, True)), len(results))
+
+
+ DEADLOCK_ERRORS = {kind: splitinit._bootstrap._DeadlockError
+ for kind, splitinit in init.items()}
+
+ (Frozen_DeadlockAvoidanceTests,
+ Source_DeadlockAvoidanceTests
+ ) = test_util.test_both(DeadlockAvoidanceTests,
+ LockType=LOCK_TYPES,
+ DeadlockError=DEADLOCK_ERRORS)
+else:
+ DEADLOCK_ERRORS = {}
+
+ class Frozen_DeadlockAvoidanceTests(unittest.TestCase):
+ pass
+
+ class Source_DeadlockAvoidanceTests(unittest.TestCase):
+ pass
class LifetimeTests:
+ @property
+ def bootstrap(self):
+ return self.init._bootstrap
+
def test_lock_lifetime(self):
name = "xyzzy"
self.assertNotIn(name, self.bootstrap._module_locks)
@@ -135,11 +156,10 @@ class LifetimeTests:
self.assertEqual(0, len(self.bootstrap._module_locks),
self.bootstrap._module_locks)
-class Frozen_LifetimeTests(LifetimeTests, unittest.TestCase):
- bootstrap = frozen_bootstrap
-class Source_LifetimeTests(LifetimeTests, unittest.TestCase):
- bootstrap = source_bootstrap
+(Frozen_LifetimeTests,
+ Source_LifetimeTests
+ ) = test_util.test_both(LifetimeTests, init=init)
@support.reap_threads