[Python-checkins] r61705 - python/trunk/Lib/test/test_threading.py

jeffrey.yasskin python-checkins at python.org
Fri Mar 21 19:48:05 CET 2008


Author: jeffrey.yasskin
Date: Fri Mar 21 19:48:04 2008
New Revision: 61705

Modified:
   python/trunk/Lib/test/test_threading.py
Log:
Speed test_threading up from 14s to .5s, and avoid a deadlock on certain
failures. The test for enumerate-after-join is now a little less rigorous, but
the bug it references says the error happened in the first couple iterations,
so 100 iterations should still be enough.

cProfile was useful for identifying the slow tests here.


Modified: python/trunk/Lib/test/test_threading.py
==============================================================================
--- python/trunk/Lib/test/test_threading.py	(original)
+++ python/trunk/Lib/test/test_threading.py	Fri Mar 21 19:48:04 2008
@@ -30,32 +30,28 @@
         self.nrunning = nrunning
 
     def run(self):
-        delay = random.random() * 2
+        delay = random.random() / 10000.0
         if verbose:
-            print 'task', self.getName(), 'will run for', delay, 'sec'
+            print 'task %s will run for %.1f usec' % (
+                self.getName(), delay * 1e6)
 
-        self.sema.acquire()
+        with self.sema:
+            with self.mutex:
+                self.nrunning.inc()
+                if verbose:
+                    print self.nrunning.get(), 'tasks are running'
+                self.testcase.assert_(self.nrunning.get() <= 3)
 
-        self.mutex.acquire()
-        self.nrunning.inc()
-        if verbose:
-            print self.nrunning.get(), 'tasks are running'
-        self.testcase.assert_(self.nrunning.get() <= 3)
-        self.mutex.release()
-
-        time.sleep(delay)
-        if verbose:
-            print 'task', self.getName(), 'done'
-
-        self.mutex.acquire()
-        self.nrunning.dec()
-        self.testcase.assert_(self.nrunning.get() >= 0)
-        if verbose:
-            print self.getName(), 'is finished.', self.nrunning.get(), \
-                  'tasks are running'
-        self.mutex.release()
+            time.sleep(delay)
+            if verbose:
+                print 'task', self.getName(), 'done'
 
-        self.sema.release()
+            with self.mutex:
+                self.nrunning.dec()
+                self.testcase.assert_(self.nrunning.get() >= 0)
+                if verbose:
+                    print '%s is finished. %d tasks are running' % (
+                        self.getName(), self.nrunning.get())
 
 class ThreadTests(unittest.TestCase):
 
@@ -218,6 +214,10 @@
         rc = subprocess.call([sys.executable, "-c", """if 1:
             import ctypes, sys, time, thread
 
+            # This lock is used as a simple event variable.
+            ready = thread.allocate_lock()
+            ready.acquire()
+
             # Module globals are cleared before __del__ is run
             # So we save the functions in class dict
             class C:
@@ -229,10 +229,11 @@
 
             def waitingThread():
                 x = C()
+                ready.release()
                 time.sleep(100)
 
             thread.start_new_thread(waitingThread, ())
-            time.sleep(1) # be sure the other thread is waiting
+            ready.acquire()  # Be sure the other thread is waiting.
             sys.exit(42)
             """])
         self.assertEqual(rc, 42)
@@ -242,9 +243,11 @@
         # threading.enumerate() after it has been join()ed.
         enum = threading.enumerate
         old_interval = sys.getcheckinterval()
-        sys.setcheckinterval(1)
         try:
-            for i in xrange(1, 1000):
+            for i in xrange(1, 100):
+                # Try a couple times at each thread-switching interval
+                # to get more interleavings.
+                sys.setcheckinterval(i // 5)
                 t = threading.Thread(target=lambda: None)
                 t.start()
                 t.join()


More information about the Python-checkins mailing list