[Python-checkins] cpython: Issue 18984: Remove ._stopped Event from Thread internals.

tim.peters python-checkins at python.org
Mon Sep 9 01:46:45 CEST 2013


http://hg.python.org/cpython/rev/aff959a3ba13
changeset:   85631:aff959a3ba13
parent:      85629:9dc5bdab157e
user:        Tim Peters <tim at python.org>
date:        Sun Sep 08 18:44:40 2013 -0500
summary:
  Issue 18984:  Remove ._stopped Event from Thread internals.

The fix for issue 18808 left us checking two things to be sure a Thread
was done:  an Event (._stopped) and a mutex (._tstate_lock).  Clumsy &
brittle.  This patch removes the Event, leaving just a happy lock :-)

The bulk of the patch removes two excruciating tests, which were
verifying sanity of the internals of the ._stopped Event after a fork.
Thanks to Antoine Pitrou for verifying that's the only real value
these tests had.

One consequence of moving from an Event to a mutex:  waiters (threads
calling Thread.join()) used to block each on their own unique mutex
(internal to the ._stopped event), but now all contend on the same
mutex (._tstate_lock).  These approaches have different performance
characteristics on different platforms.  I don't think it matters in
this context.

files:
  Lib/test/test_threading.py |  138 +------------------------
  Lib/threading.py           |   59 +++++-----
  2 files changed, 30 insertions(+), 167 deletions(-)


diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py
--- a/Lib/test/test_threading.py
+++ b/Lib/test/test_threading.py
@@ -647,144 +647,8 @@
             """
         self._run_and_join(script)
 
-    def assertScriptHasOutput(self, script, expected_output):
-        rc, out, err = assert_python_ok("-c", script)
-        data = out.decode().replace('\r', '')
-        self.assertEqual(data, expected_output)
-
-    @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
     @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
-    def test_4_joining_across_fork_in_worker_thread(self):
-        # There used to be a possible deadlock when forking from a child
-        # thread.  See http://bugs.python.org/issue6643.
-
-        # The script takes the following steps:
-        # - The main thread in the parent process starts a new thread and then
-        #   tries to join it.
-        # - The join operation acquires the Lock inside the thread's _block
-        #   Condition.  (See threading.py:Thread.join().)
-        # - We stub out the acquire method on the condition to force it to wait
-        #   until the child thread forks.  (See LOCK ACQUIRED HERE)
-        # - The child thread forks.  (See LOCK HELD and WORKER THREAD FORKS
-        #   HERE)
-        # - The main thread of the parent process enters Condition.wait(),
-        #   which releases the lock on the child thread.
-        # - The child process returns.  Without the necessary fix, when the
-        #   main thread of the child process (which used to be the child thread
-        #   in the parent process) attempts to exit, it will try to acquire the
-        #   lock in the Thread._block Condition object and hang, because the
-        #   lock was held across the fork.
-
-        script = """if 1:
-            import os, time, threading
-
-            finish_join = False
-            start_fork = False
-
-            def worker():
-                # Wait until this thread's lock is acquired before forking to
-                # create the deadlock.
-                global finish_join
-                while not start_fork:
-                    time.sleep(0.01)
-                # LOCK HELD: Main thread holds lock across this call.
-                childpid = os.fork()
-                finish_join = True
-                if childpid != 0:
-                    # Parent process just waits for child.
-                    os.waitpid(childpid, 0)
-                # Child process should just return.
-
-            w = threading.Thread(target=worker)
-
-            # Stub out the private condition variable's lock acquire method.
-            # This acquires the lock and then waits until the child has forked
-            # before returning, which will release the lock soon after.  If
-            # someone else tries to fix this test case by acquiring this lock
-            # before forking instead of resetting it, the test case will
-            # deadlock when it shouldn't.
-            condition = w._stopped._cond
-            orig_acquire = condition.acquire
-            call_count_lock = threading.Lock()
-            call_count = 0
-            def my_acquire():
-                global call_count
-                global start_fork
-                orig_acquire()  # LOCK ACQUIRED HERE
-                start_fork = True
-                if call_count == 0:
-                    while not finish_join:
-                        time.sleep(0.01)  # WORKER THREAD FORKS HERE
-                with call_count_lock:
-                    call_count += 1
-            condition.acquire = my_acquire
-
-            w.start()
-            w.join()
-            print('end of main')
-            """
-        self.assertScriptHasOutput(script, "end of main\n")
-
-    @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
-    @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
-    def test_5_clear_waiter_locks_to_avoid_crash(self):
-        # Check that a spawned thread that forks doesn't segfault on certain
-        # platforms, namely OS X.  This used to happen if there was a waiter
-        # lock in the thread's condition variable's waiters list.  Even though
-        # we know the lock will be held across the fork, it is not safe to
-        # release locks held across forks on all platforms, so releasing the
-        # waiter lock caused a segfault on OS X.  Furthermore, since locks on
-        # OS X are (as of this writing) implemented with a mutex + condition
-        # variable instead of a semaphore, while we know that the Python-level
-        # lock will be acquired, we can't know if the internal mutex will be
-        # acquired at the time of the fork.
-
-        script = """if True:
-            import os, time, threading
-
-            start_fork = False
-
-            def worker():
-                # Wait until the main thread has attempted to join this thread
-                # before continuing.
-                while not start_fork:
-                    time.sleep(0.01)
-                childpid = os.fork()
-                if childpid != 0:
-                    # Parent process just waits for child.
-                    (cpid, rc) = os.waitpid(childpid, 0)
-                    assert cpid == childpid
-                    assert rc == 0
-                    print('end of worker thread')
-                else:
-                    # Child process should just return.
-                    pass
-
-            w = threading.Thread(target=worker)
-
-            # Stub out the private condition variable's _release_save method.
-            # This releases the condition's lock and flips the global that
-            # causes the worker to fork.  At this point, the problematic waiter
-            # lock has been acquired once by the waiter and has been put onto
-            # the waiters list.
-            condition = w._stopped._cond
-            orig_release_save = condition._release_save
-            def my_release_save():
-                global start_fork
-                orig_release_save()
-                # Waiter lock held here, condition lock released.
-                start_fork = True
-            condition._release_save = my_release_save
-
-            w.start()
-            w.join()
-            print('end of main thread')
-            """
-        output = "end of worker thread\nend of main thread\n"
-        self.assertScriptHasOutput(script, output)
-
-    @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
-    def test_6_daemon_threads(self):
+    def test_4_daemon_threads(self):
         # Check that a daemon thread cannot crash the interpreter on shutdown
         # by manipulating internal structures that are being disposed of in
         # the main thread.
diff --git a/Lib/threading.py b/Lib/threading.py
--- a/Lib/threading.py
+++ b/Lib/threading.py
@@ -549,7 +549,7 @@
         self._ident = None
         self._tstate_lock = None
         self._started = Event()
-        self._stopped = Event()
+        self._is_stopped = False
         self._initialized = True
         # sys.stderr is not stored in the class like
         # sys.exc_info since it can be changed between instances
@@ -561,7 +561,6 @@
         # private!  Called by _after_fork() to reset our internal locks as
         # they may be in an invalid state leading to a deadlock or crash.
         self._started._reset_internal_locks()
-        self._stopped._reset_internal_locks()
         if is_alive:
             self._set_tstate_lock()
         else:
@@ -574,7 +573,7 @@
         status = "initial"
         if self._started.is_set():
             status = "started"
-        if self._stopped.is_set():
+        if self._is_stopped:
             status = "stopped"
         if self._daemonic:
             status += " daemon"
@@ -696,7 +695,6 @@
                 pass
         finally:
             with _active_limbo_lock:
-                self._stop()
                 try:
                     # We don't call self._delete() because it also
                     # grabs _active_limbo_lock.
@@ -705,7 +703,8 @@
                     pass
 
     def _stop(self):
-        self._stopped.set()
+        self._is_stopped = True
+        self._tstate_lock = None
 
     def _delete(self):
         "Remove current thread from the dict of currently running threads."
@@ -749,29 +748,24 @@
             raise RuntimeError("cannot join thread before it is started")
         if self is current_thread():
             raise RuntimeError("cannot join current thread")
-        if not self.is_alive():
-            return
-        self._stopped.wait(timeout)
-        if self._stopped.is_set():
-            self._wait_for_tstate_lock(timeout is None)
+        if timeout is None:
+            self._wait_for_tstate_lock()
+        else:
+            self._wait_for_tstate_lock(timeout=timeout)
 
-    def _wait_for_tstate_lock(self, block):
+    def _wait_for_tstate_lock(self, block=True, timeout=-1):
         # Issue #18808: wait for the thread state to be gone.
-        # When self._stopped is set, the Python part of the thread is done,
-        # but the thread's tstate has not yet been destroyed.  The C code
-        # releases self._tstate_lock when the C part of the thread is done
-        # (the code at the end of the thread's life to remove all knowledge
-        # of the thread from the C data structures).
-        # This method waits to acquire _tstate_lock if `block` is True, or
-        # sees whether it can be acquired immediately if `block` is False.
-        # If it does acquire the lock, the C code is done, and _tstate_lock
-        # is set to None.
+        # At the end of the thread's life, after all knowledge of the thread
+        # is removed from C data structures, C code releases our _tstate_lock.
+        # This method passes its arguments to _tstate_lock.aquire().
+        # If the lock is acquired, the C code is done, and self._stop() is
+        # called.  That sets ._is_stopped to True, and ._tstate_lock to None.
         lock = self._tstate_lock
-        if lock is None:
-            return  # already determined that the C code is done
-        if lock.acquire(block):
+        if lock is None:  # already determined that the C code is done
+            assert self._is_stopped
+        elif lock.acquire(block, timeout):
             lock.release()
-            self._tstate_lock = None
+            self._stop()
 
     @property
     def name(self):
@@ -790,14 +784,10 @@
 
     def is_alive(self):
         assert self._initialized, "Thread.__init__() not called"
-        if not self._started.is_set():
+        if self._is_stopped or not self._started.is_set():
             return False
-        if not self._stopped.is_set():
-            return True
-        # The Python part of the thread is done, but the C part may still be
-        # waiting to run.
         self._wait_for_tstate_lock(False)
-        return self._tstate_lock is not None
+        return not self._is_stopped
 
     isAlive = is_alive
 
@@ -861,6 +851,7 @@
 
     def __init__(self):
         Thread.__init__(self, name="MainThread", daemon=False)
+        self._set_tstate_lock()
         self._started.set()
         self._set_ident()
         with _active_limbo_lock:
@@ -925,6 +916,14 @@
 _main_thread = _MainThread()
 
 def _shutdown():
+    # Obscure:  other threads may be waiting to join _main_thread.  That's
+    # dubious, but some code does it.  We can't wait for C code to release
+    # the main thread's tstate_lock - that won't happen until the interpreter
+    # is nearly dead.  So we release it here.  Note that just calling _stop()
+    # isn't enough:  other threads may already be waiting on _tstate_lock.
+    assert _main_thread._tstate_lock is not None
+    assert _main_thread._tstate_lock.locked()
+    _main_thread._tstate_lock.release()
     _main_thread._stop()
     t = _pickSomeNonDaemonThread()
     while t:

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list