from test import support import unittest import dummy_threading as _threading import time class DummyThreadingTestCase(unittest.TestCase): class TestThread(_threading.Thread): def run(self): global running global sema global mutex # Uncomment if testing another module, such as the real 'threading' # module. #delay = random.random() * 2 delay = 0 if support.verbose: print('task', self.name, 'will run for', delay, 'sec') sema.acquire() mutex.acquire() running += 1 if support.verbose: print(running, 'tasks are running') mutex.release() time.sleep(delay) if support.verbose: print('task', self.name, 'done') mutex.acquire() running -= 1 if support.verbose: print(self.name, 'is finished.', running, 'tasks are running') mutex.release() sema.release() def setUp(self): self.numtasks = 10 global sema sema = _threading.BoundedSemaphore(value=3) global mutex mutex = _threading.RLock() global running running = 0 self.threads = [] def test_tasks(self): for i in range(self.numtasks): t = self.TestThread(name=""%i) self.threads.append(t) t.start() if support.verbose: print('waiting for all tasks to complete') for t in self.threads: t.join() if support.verbose: print('all tasks done') if __name__ == '__main__': unittest.main()