summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Lib/test/test_threading.py110
1 files changed, 73 insertions, 37 deletions
diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py
index cbf6202b5b..8b17460f8a 100644
--- a/Lib/test/test_threading.py
+++ b/Lib/test/test_threading.py
@@ -1,55 +1,91 @@
# Very rudimentary test of threading module
-# Create a bunch of threads, let each do some work, wait until all are done
-
+import test.test_support
from test.test_support import verbose
import random
import threading
import time
+import unittest
-# This takes about n/3 seconds to run (about n/3 clumps of tasks, times
-# about 1 second per clump).
-numtasks = 10
-
-# no more than 3 of the 10 can run at once
-sema = threading.BoundedSemaphore(value=3)
-mutex = threading.RLock()
-running = 0
+# A trivial mutable counter.
+class Counter(object):
+ def __init__(self):
+ self.value = 0
+ def inc(self):
+ self.value += 1
+ def dec(self):
+ self.value -= 1
+ def get(self):
+ return self.value
class TestThread(threading.Thread):
+ def __init__(self, name, testcase, sema, mutex, nrunning):
+ threading.Thread.__init__(self, name=name)
+ self.testcase = testcase
+ self.sema = sema
+ self.mutex = mutex
+ self.nrunning = nrunning
+
def run(self):
- global running
delay = random.random() * 2
if verbose:
print 'task', self.getName(), 'will run for', delay, 'sec'
- sema.acquire()
- mutex.acquire()
- running = running + 1
+
+ self.sema.acquire()
+
+ self.mutex.acquire()
+ self.nrunning.inc()
if verbose:
- print running, 'tasks are running'
- mutex.release()
+ print self.nrunning.get(), 'tasks are running'
+ self.testcase.assert_(self.nrunning.get() <= 3)
+ self.mutex.release()
+
time.sleep(delay)
if verbose:
print 'task', self.getName(), 'done'
- mutex.acquire()
- running = running - 1
+
+ self.mutex.acquire()
+ self.nrunning.dec()
+ self.testcase.assert_(self.nrunning.get() >= 0)
if verbose:
- print self.getName(), 'is finished.', running, 'tasks are running'
- mutex.release()
- sema.release()
-
-threads = []
-def starttasks():
- for i in range(numtasks):
- t = TestThread(name="<thread %d>"%i)
- threads.append(t)
- t.start()
-
-starttasks()
-
-if verbose:
- print 'waiting for all tasks to complete'
-for t in threads:
- t.join()
-if verbose:
- print 'all tasks done'
+ print self.getName(), 'is finished.', self.nrunning.get(), \
+ 'tasks are running'
+ self.mutex.release()
+
+ self.sema.release()
+
+class ThreadTests(unittest.TestCase):
+
+ # Create a bunch of threads, let each do some work, wait until all are
+ # done.
+ def test_various_ops(self):
+ # This takes about n/3 seconds to run (about n/3 clumps of tasks,
+ # times about 1 second per clump).
+ NUMTASKS = 10
+
+ # no more than 3 of the 10 can run at once
+ sema = threading.BoundedSemaphore(value=3)
+ mutex = threading.RLock()
+ numrunning = Counter()
+
+ threads = []
+
+ for i in range(NUMTASKS):
+ t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning)
+ threads.append(t)
+ t.start()
+
+ if verbose:
+ print 'waiting for all tasks to complete'
+ for t in threads:
+ t.join()
+ if verbose:
+ print 'all tasks done'
+ self.assertEqual(numrunning.get(), 0)
+
+
+def test_main():
+ test.test_support.run_unittest(ThreadTests)
+
+if __name__ == "__main__":
+ test_main()