[Python-checkins] r87727 - in python/branches/release27-maint/Lib: test/test_threading.py threading.py

gregory.p.smith python-checkins at python.org
Tue Jan 4 02:10:08 CET 2011


Author: gregory.p.smith
Date: Tue Jan  4 02:10:08 2011
New Revision: 87727

Log:
Merged revisions 87710 via svnmerge from 
svn+ssh://pythondev@svn.python.org/python/branches/py3k

........
  r87710 | gregory.p.smith | 2011-01-03 13:06:12 -0800 (Mon, 03 Jan 2011) | 4 lines
  
  issue6643 - Two locks held within the threading module on each thread instance
  needed to be reinitialized after fork().  Adds tests to confirm that they are
  and that a potential deadlock and crasher bug are fixed (platform dependant).
........

This required a bit more fiddling for 2.x as __block and __started are __
private as well as the __started Event's __cond.  A new "private"
_reset_internal_locks() method is added to Thread and _Event objects to
address this.


Modified:
   python/branches/release27-maint/Lib/test/test_threading.py
   python/branches/release27-maint/Lib/threading.py

Modified: python/branches/release27-maint/Lib/test/test_threading.py
==============================================================================
--- python/branches/release27-maint/Lib/test/test_threading.py	(original)
+++ python/branches/release27-maint/Lib/test/test_threading.py	Tue Jan  4 02:10:08 2011
@@ -10,6 +10,8 @@
 import time
 import unittest
 import weakref
+import os
+import subprocess
 
 from test import lock_tests
 
@@ -275,7 +277,6 @@
                 print("test_finalize_with_runnning_thread can't import ctypes")
             return  # can't do anything
 
-        import subprocess
         rc = subprocess.call([sys.executable, "-c", """if 1:
             import ctypes, sys, time, thread
 
@@ -306,7 +307,6 @@
     def test_finalize_with_trace(self):
         # Issue1733757
         # Avoid a deadlock when sys.settrace steps into threading._shutdown
-        import subprocess
         p = subprocess.Popen([sys.executable, "-c", """if 1:
             import sys, threading
 
@@ -341,7 +341,6 @@
     def test_join_nondaemon_on_shutdown(self):
         # Issue 1722344
         # Raising SystemExit skipped threading._shutdown
-        import subprocess
         p = subprocess.Popen([sys.executable, "-c", """if 1:
                 import threading
                 from time import sleep
@@ -428,7 +427,6 @@
                 print 'end of thread'
         \n""" + script
 
-        import subprocess
         p = subprocess.Popen([sys.executable, "-c", script], stdout=subprocess.PIPE)
         rc = p.wait()
         data = p.stdout.read().replace('\r', '')
@@ -500,6 +498,152 @@
             """
         self._run_and_join(script)
 
+    def assertScriptHasOutput(self, script, expected_output):
+        p = subprocess.Popen([sys.executable, "-c", script],
+                             stdout=subprocess.PIPE)
+        rc = p.wait()
+        data = p.stdout.read().decode().replace('\r', '')
+        self.assertEqual(rc, 0, "Unexpected error")
+        self.assertEqual(data, expected_output)
+
+    @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
+    def test_4_joining_across_fork_in_worker_thread(self):
+        # There used to be a possible deadlock when forking from a child
+        # thread.  See http://bugs.python.org/issue6643.
+
+        # Skip platforms with known problems forking from a worker thread.
+        # See http://bugs.python.org/issue3863.
+        if sys.platform in ('freebsd4', 'freebsd5', 'freebsd6', 'os2emx'):
+            raise unittest.SkipTest('due to known OS bugs on ' + sys.platform)
+
+        # The script takes the following steps:
+        # - The main thread in the parent process starts a new thread and then
+        #   tries to join it.
+        # - The join operation acquires the Lock inside the thread's _block
+        #   Condition.  (See threading.py:Thread.join().)
+        # - We stub out the acquire method on the condition to force it to wait
+        #   until the child thread forks.  (See LOCK ACQUIRED HERE)
+        # - The child thread forks.  (See LOCK HELD and WORKER THREAD FORKS
+        #   HERE)
+        # - The main thread of the parent process enters Condition.wait(),
+        #   which releases the lock on the child thread.
+        # - The child process returns.  Without the necessary fix, when the
+        #   main thread of the child process (which used to be the child thread
+        #   in the parent process) attempts to exit, it will try to acquire the
+        #   lock in the Thread._block Condition object and hang, because the
+        #   lock was held across the fork.
+
+        script = """if 1:
+            import os, time, threading
+
+            finish_join = False
+            start_fork = False
+
+            def worker():
+                # Wait until this thread's lock is acquired before forking to
+                # create the deadlock.
+                global finish_join
+                while not start_fork:
+                    time.sleep(0.01)
+                # LOCK HELD: Main thread holds lock across this call.
+                childpid = os.fork()
+                finish_join = True
+                if childpid != 0:
+                    # Parent process just waits for child.
+                    os.waitpid(childpid, 0)
+                # Child process should just return.
+
+            w = threading.Thread(target=worker)
+
+            # Stub out the private condition variable's lock acquire method.
+            # This acquires the lock and then waits until the child has forked
+            # before returning, which will release the lock soon after.  If
+            # someone else tries to fix this test case by acquiring this lock
+            # before forking instead of reseting it, the test case will
+            # deadlock when it shouldn't.
+            condition = w._block
+            orig_acquire = condition.acquire
+            call_count_lock = threading.Lock()
+            call_count = 0
+            def my_acquire():
+                global call_count
+                global start_fork
+                orig_acquire()  # LOCK ACQUIRED HERE
+                start_fork = True
+                if call_count == 0:
+                    while not finish_join:
+                        time.sleep(0.01)  # WORKER THREAD FORKS HERE
+                with call_count_lock:
+                    call_count += 1
+            condition.acquire = my_acquire
+
+            w.start()
+            w.join()
+            print('end of main')
+            """
+        self.assertScriptHasOutput(script, "end of main\n")
+
+    @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
+    def test_5_clear_waiter_locks_to_avoid_crash(self):
+        # Check that a spawned thread that forks doesn't segfault on certain
+        # platforms, namely OS X.  This used to happen if there was a waiter
+        # lock in the thread's condition variable's waiters list.  Even though
+        # we know the lock will be held across the fork, it is not safe to
+        # release locks held across forks on all platforms, so releasing the
+        # waiter lock caused a segfault on OS X.  Furthermore, since locks on
+        # OS X are (as of this writing) implemented with a mutex + condition
+        # variable instead of a semaphore, while we know that the Python-level
+        # lock will be acquired, we can't know if the internal mutex will be
+        # acquired at the time of the fork.
+
+        # Skip platforms with known problems forking from a worker thread.
+        # See http://bugs.python.org/issue3863.
+        if sys.platform in ('freebsd4', 'freebsd5', 'freebsd6', 'os2emx'):
+            raise unittest.SkipTest('due to known OS bugs on ' + sys.platform)
+        script = """if True:
+            import os, time, threading
+
+            start_fork = False
+
+            def worker():
+                # Wait until the main thread has attempted to join this thread
+                # before continuing.
+                while not start_fork:
+                    time.sleep(0.01)
+                childpid = os.fork()
+                if childpid != 0:
+                    # Parent process just waits for child.
+                    (cpid, rc) = os.waitpid(childpid, 0)
+                    assert cpid == childpid
+                    assert rc == 0
+                    print('end of worker thread')
+                else:
+                    # Child process should just return.
+                    pass
+
+            w = threading.Thread(target=worker)
+
+            # Stub out the private condition variable's _release_save method.
+            # This releases the condition's lock and flips the global that
+            # causes the worker to fork.  At this point, the problematic waiter
+            # lock has been acquired once by the waiter and has been put onto
+            # the waiters list.
+            condition = w._block
+            orig_release_save = condition._release_save
+            def my_release_save():
+                global start_fork
+                orig_release_save()
+                # Waiter lock held here, condition lock released.
+                start_fork = True
+            condition._release_save = my_release_save
+
+            w.start()
+            w.join()
+            print('end of main thread')
+            """
+        output = "end of worker thread\nend of main thread\n"
+        self.assertScriptHasOutput(script, output)
+
 
 class ThreadingExceptionTests(BaseTestCase):
     # A RuntimeError should be raised if Thread.start() is called

Modified: python/branches/release27-maint/Lib/threading.py
==============================================================================
--- python/branches/release27-maint/Lib/threading.py	(original)
+++ python/branches/release27-maint/Lib/threading.py	Tue Jan  4 02:10:08 2011
@@ -373,6 +373,10 @@
         self.__cond = Condition(Lock())
         self.__flag = False
 
+    def _reset_internal_locks(self):
+        # private!  called by Thread._reset_internal_locks by _after_fork()
+        self.__cond.__init__()
+
     def isSet(self):
         return self.__flag
 
@@ -449,6 +453,17 @@
         # sys.exc_info since it can be changed between instances
         self.__stderr = _sys.stderr
 
+    def _reset_internal_locks(self):
+        # private!  Called by _after_fork() to reset our internal locks as
+        # they may be in an invalid state leading to a deadlock or crash.
+        self.__block.__init__()
+        self.__started._reset_internal_locks()
+
+    @property
+    def _block(self):
+        # used by a unittest
+        return self.__block
+
     def _set_daemon(self):
         # Overridden in _MainThread and _DummyThread
         return current_thread().daemon
@@ -867,6 +882,9 @@
                 # its new value since it can have changed.
                 ident = _get_ident()
                 thread._Thread__ident = ident
+                # Any condition variables hanging off of the active thread may
+                # be in an invalid state, so we reinitialize them.
+                thread._reset_internal_locks()
                 new_active[ident] = thread
             else:
                 # All the others are already stopped.


More information about the Python-checkins mailing list