| 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
 | from . import util as test_util
init = test_util.import_importlib('importlib')
import sys
import time
import unittest
import weakref
from test import support
try:
    import threading
except ImportError:
    threading = None
else:
    from test import lock_tests
if threading is not None:
    class ModuleLockAsRLockTests:
        locktype = classmethod(lambda cls: cls.LockType("some_lock"))
        # _is_owned() unsupported
        test__is_owned = None
        # acquire(blocking=False) unsupported
        test_try_acquire = None
        test_try_acquire_contended = None
        # `with` unsupported
        test_with = None
        # acquire(timeout=...) unsupported
        test_timeout = None
        # _release_save() unsupported
        test_release_save_unacquired = None
        # lock status in repr unsupported
        test_repr = None
        test_locked_repr = None
    LOCK_TYPES = {kind: splitinit._bootstrap._ModuleLock
                  for kind, splitinit in init.items()}
    (Frozen_ModuleLockAsRLockTests,
     Source_ModuleLockAsRLockTests
     ) = test_util.test_both(ModuleLockAsRLockTests, lock_tests.RLockTests,
                             LockType=LOCK_TYPES)
else:
    LOCK_TYPES = {}
    class Frozen_ModuleLockAsRLockTests(unittest.TestCase):
        pass
    class Source_ModuleLockAsRLockTests(unittest.TestCase):
        pass
if threading is not None:
    class DeadlockAvoidanceTests:
        def setUp(self):
            try:
                self.old_switchinterval = sys.getswitchinterval()
                sys.setswitchinterval(0.000001)
            except AttributeError:
                self.old_switchinterval = None
        def tearDown(self):
            if self.old_switchinterval is not None:
                sys.setswitchinterval(self.old_switchinterval)
        def run_deadlock_avoidance_test(self, create_deadlock):
            NLOCKS = 10
            locks = [self.LockType(str(i)) for i in range(NLOCKS)]
            pairs = [(locks[i], locks[(i+1)%NLOCKS]) for i in range(NLOCKS)]
            if create_deadlock:
                NTHREADS = NLOCKS
            else:
                NTHREADS = NLOCKS - 1
            barrier = threading.Barrier(NTHREADS)
            results = []
            def _acquire(lock):
                """Try to acquire the lock. Return True on success,
                False on deadlock."""
                try:
                    lock.acquire()
                except self.DeadlockError:
                    return False
                else:
                    return True
            def f():
                a, b = pairs.pop()
                ra = _acquire(a)
                barrier.wait()
                rb = _acquire(b)
                results.append((ra, rb))
                if rb:
                    b.release()
                if ra:
                    a.release()
            lock_tests.Bunch(f, NTHREADS).wait_for_finished()
            self.assertEqual(len(results), NTHREADS)
            return results
        def test_deadlock(self):
            results = self.run_deadlock_avoidance_test(True)
            # At least one of the threads detected a potential deadlock on its
            # second acquire() call.  It may be several of them, because the
            # deadlock avoidance mechanism is conservative.
            nb_deadlocks = results.count((True, False))
            self.assertGreaterEqual(nb_deadlocks, 1)
            self.assertEqual(results.count((True, True)), len(results) - nb_deadlocks)
        def test_no_deadlock(self):
            results = self.run_deadlock_avoidance_test(False)
            self.assertEqual(results.count((True, False)), 0)
            self.assertEqual(results.count((True, True)), len(results))
    DEADLOCK_ERRORS = {kind: splitinit._bootstrap._DeadlockError
                       for kind, splitinit in init.items()}
    (Frozen_DeadlockAvoidanceTests,
     Source_DeadlockAvoidanceTests
     ) = test_util.test_both(DeadlockAvoidanceTests,
                             LockType=LOCK_TYPES,
                             DeadlockError=DEADLOCK_ERRORS)
else:
    DEADLOCK_ERRORS = {}
    class Frozen_DeadlockAvoidanceTests(unittest.TestCase):
        pass
    class Source_DeadlockAvoidanceTests(unittest.TestCase):
        pass
class LifetimeTests:
    @property
    def bootstrap(self):
        return self.init._bootstrap
    def test_lock_lifetime(self):
        name = "xyzzy"
        self.assertNotIn(name, self.bootstrap._module_locks)
        lock = self.bootstrap._get_module_lock(name)
        self.assertIn(name, self.bootstrap._module_locks)
        wr = weakref.ref(lock)
        del lock
        support.gc_collect()
        self.assertNotIn(name, self.bootstrap._module_locks)
        self.assertIsNone(wr())
    def test_all_locks(self):
        support.gc_collect()
        self.assertEqual(0, len(self.bootstrap._module_locks),
                         self.bootstrap._module_locks)
(Frozen_LifetimeTests,
 Source_LifetimeTests
 ) = test_util.test_both(LifetimeTests, init=init)
@support.reap_threads
def test_main():
    support.run_unittest(Frozen_ModuleLockAsRLockTests,
                         Source_ModuleLockAsRLockTests,
                         Frozen_DeadlockAvoidanceTests,
                         Source_DeadlockAvoidanceTests,
                         Frozen_LifetimeTests,
                         Source_LifetimeTests)
if __name__ == '__main__':
    test_main()
 |