From b43c4caf81b10e5c7ebaeb3a712c6db584f60bbd Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Mon, 18 Sep 2017 22:04:20 +0200 Subject: Restore dummy_threading and _dummy_thread, but deprecate them (bpo-31370) (#3648) --- Lib/test/test_dummy_threading.py | 60 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) create mode 100644 Lib/test/test_dummy_threading.py (limited to 'Lib/test/test_dummy_threading.py') diff --git a/Lib/test/test_dummy_threading.py b/Lib/test/test_dummy_threading.py new file mode 100644 index 0000000000..a0c2972a60 --- /dev/null +++ b/Lib/test/test_dummy_threading.py @@ -0,0 +1,60 @@ +from test import support +import unittest +import dummy_threading as _threading +import time + +class DummyThreadingTestCase(unittest.TestCase): + + class TestThread(_threading.Thread): + + def run(self): + global running + global sema + global mutex + # Uncomment if testing another module, such as the real 'threading' + # module. + #delay = random.random() * 2 + delay = 0 + if support.verbose: + print('task', self.name, 'will run for', delay, 'sec') + sema.acquire() + mutex.acquire() + running += 1 + if support.verbose: + print(running, 'tasks are running') + mutex.release() + time.sleep(delay) + if support.verbose: + print('task', self.name, 'done') + mutex.acquire() + running -= 1 + if support.verbose: + print(self.name, 'is finished.', running, 'tasks are running') + mutex.release() + sema.release() + + def setUp(self): + self.numtasks = 10 + global sema + sema = _threading.BoundedSemaphore(value=3) + global mutex + mutex = _threading.RLock() + global running + running = 0 + self.threads = [] + + def test_tasks(self): + for i in range(self.numtasks): + t = self.TestThread(name=""%i) + self.threads.append(t) + t.start() + + if support.verbose: + print('waiting for all tasks to complete') + for t in self.threads: + t.join() + if support.verbose: + print('all tasks done') + +if __name__ == '__main__': + unittest.main() -- cgit v1.2.1