summaryrefslogtreecommitdiff
path: root/Lib/test/test_dummy_threading.py
diff options
context:
space:
mode:
authorGuido van Rossum <guido@python.org>2002-12-30 22:30:22 +0000
committerGuido van Rossum <guido@python.org>2002-12-30 22:30:22 +0000
commitad50ca91a99e9a16a583fb13799c79f23067ef30 (patch)
tree31e4e18a5980f1c50029b117b3c6231cb0e59458 /Lib/test/test_dummy_threading.py
parentc91ed400e053dc9f11dd30c84e2bb611999dce50 (diff)
downloadcpython-git-ad50ca91a99e9a16a583fb13799c79f23067ef30.tar.gz
Brett Cannon's dummy_thread and dummy_threading modules (SF patch
622537), with some nitpicking editorial changes.
Diffstat (limited to 'Lib/test/test_dummy_threading.py')
-rw-r--r--Lib/test/test_dummy_threading.py70
1 files changed, 70 insertions, 0 deletions
diff --git a/Lib/test/test_dummy_threading.py b/Lib/test/test_dummy_threading.py
new file mode 100644
index 0000000000..cc8c0e3adc
--- /dev/null
+++ b/Lib/test/test_dummy_threading.py
@@ -0,0 +1,70 @@
+# Very rudimentary test of threading module
+
+# Create a bunch of threads, let each do some work, wait until all are done
+
+from test.test_support import verbose
+import random
+import dummy_threading as _threading
+import time
+
+
+class TestThread(_threading.Thread):
+
+ def run(self):
+ global running
+ delay = random.random() * 2
+ if verbose:
+ print 'task', self.getName(), 'will run for', delay, 'sec'
+ sema.acquire()
+ mutex.acquire()
+ running = running + 1
+ if verbose:
+ print running, 'tasks are running'
+ mutex.release()
+ time.sleep(delay)
+ if verbose:
+ print 'task', self.getName(), 'done'
+ mutex.acquire()
+ running = running - 1
+ if verbose:
+ print self.getName(), 'is finished.', running, 'tasks are running'
+ mutex.release()
+ sema.release()
+
+def starttasks():
+ for i in range(numtasks):
+ t = TestThread(name="<thread %d>"%i)
+ threads.append(t)
+ t.start()
+
+
+def test_main():
+ # This takes about n/3 seconds to run (about n/3 clumps of tasks, times
+ # about 1 second per clump).
+ global numtasks
+ numtasks = 10
+
+ # no more than 3 of the 10 can run at once
+ global sema
+ sema = _threading.BoundedSemaphore(value=3)
+ global mutex
+ mutex = _threading.RLock()
+ global running
+ running = 0
+
+ global threads
+ threads = []
+
+ starttasks()
+
+ if verbose:
+ print 'waiting for all tasks to complete'
+ for t in threads:
+ t.join()
+ if verbose:
+ print 'all tasks done'
+
+
+
+if __name__ == '__main__':
+ test_main()