[Python-checkins] python/dist/src/Lib/test test_threading.py, 1.3, 1.4

tim_one at users.sourceforge.net tim_one at users.sourceforge.net
Sat Jan 8 07:03:20 CET 2005


Update of /cvsroot/python/python/dist/src/Lib/test
In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv32327/Lib/test

Modified Files:
	test_threading.py 
Log Message:
Converted to a unittest.  Added checks that the bounded semaphore actually
does what it's supposed to do.


Index: test_threading.py
===================================================================
RCS file: /cvsroot/python/python/dist/src/Lib/test/test_threading.py,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -d -r1.3 -r1.4
--- test_threading.py	23 Jul 2002 19:04:06 -0000	1.3
+++ test_threading.py	8 Jan 2005 06:03:17 -0000	1.4
@@ -1,55 +1,91 @@
 # Very rudimentary test of threading module
 
-# Create a bunch of threads, let each do some work, wait until all are done
-
+import test.test_support
 from test.test_support import verbose
 import random
 import threading
 import time
+import unittest
 
-# This takes about n/3 seconds to run (about n/3 clumps of tasks, times
-# about 1 second per clump).
-numtasks = 10
-
-# no more than 3 of the 10 can run at once
-sema = threading.BoundedSemaphore(value=3)
-mutex = threading.RLock()
-running = 0
+# A trivial mutable counter.
+class Counter(object):
+    def __init__(self):
+        self.value = 0
+    def inc(self):
+        self.value += 1
+    def dec(self):
+        self.value -= 1
+    def get(self):
+        return self.value
 
 class TestThread(threading.Thread):
+    def __init__(self, name, testcase, sema, mutex, nrunning):
+        threading.Thread.__init__(self, name=name)
+        self.testcase = testcase
+        self.sema = sema
+        self.mutex = mutex
+        self.nrunning = nrunning
+
     def run(self):
-        global running
         delay = random.random() * 2
         if verbose:
             print 'task', self.getName(), 'will run for', delay, 'sec'
-        sema.acquire()
-        mutex.acquire()
-        running = running + 1
+
+        self.sema.acquire()
+
+        self.mutex.acquire()
+        self.nrunning.inc()
         if verbose:
-            print running, 'tasks are running'
-        mutex.release()
+            print self.nrunning.get(), 'tasks are running'
+        self.testcase.assert_(self.nrunning.get() <= 3)
+        self.mutex.release()
+
         time.sleep(delay)
         if verbose:
             print 'task', self.getName(), 'done'
-        mutex.acquire()
-        running = running - 1
+
+        self.mutex.acquire()
+        self.nrunning.dec()
+        self.testcase.assert_(self.nrunning.get() >= 0)
         if verbose:
-            print self.getName(), 'is finished.', running, 'tasks are running'
-        mutex.release()
-        sema.release()
+            print self.getName(), 'is finished.', self.nrunning.get(), \
+                  'tasks are running'
+        self.mutex.release()
 
-threads = []
-def starttasks():
-    for i in range(numtasks):
-        t = TestThread(name="<thread %d>"%i)
-        threads.append(t)
-        t.start()
+        self.sema.release()
 
-starttasks()
+class ThreadTests(unittest.TestCase):
 
-if verbose:
-    print 'waiting for all tasks to complete'
-for t in threads:
-    t.join()
-if verbose:
-    print 'all tasks done'
+    # Create a bunch of threads, let each do some work, wait until all are
+    # done.
+    def test_various_ops(self):
+        # This takes about n/3 seconds to run (about n/3 clumps of tasks,
+        # times about 1 second per clump).
+        NUMTASKS = 10
+
+        # no more than 3 of the 10 can run at once
+        sema = threading.BoundedSemaphore(value=3)
+        mutex = threading.RLock()
+        numrunning = Counter()
+
+        threads = []
+
+        for i in range(NUMTASKS):
+            t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning)
+            threads.append(t)
+            t.start()
+
+        if verbose:
+            print 'waiting for all tasks to complete'
+        for t in threads:
+            t.join()
+        if verbose:
+            print 'all tasks done'
+        self.assertEqual(numrunning.get(), 0)
+
+
+def test_main():
+    test.test_support.run_unittest(ThreadTests)
+
+if __name__ == "__main__":
+    test_main()



More information about the Python-checkins mailing list