[Python-checkins] cpython (3.4): subprocess's Popen.wait() is now thread safe so that multiple threads

gregory.p.smith python-checkins at python.org
Wed Apr 23 09:38:33 CEST 2014


http://hg.python.org/cpython/rev/5d745d97b7da
changeset:   90437:5d745d97b7da
branch:      3.4
parent:      90435:660d53bfb332
user:        Gregory P. Smith <greg at krypto.org>
date:        Wed Apr 23 00:27:17 2014 -0700
summary:
  subprocess's Popen.wait() is now thread safe so that multiple threads
may be calling wait() or poll() on a Popen instance at the same time
without losing the Popen.returncode value.  Fixes issue #21291.

files:
  Lib/subprocess.py           |  50 ++++++++++++++++++++-----
  Lib/test/test_subprocess.py |  48 ++++++++++++++++++++++++
  Misc/NEWS                   |   4 ++
  3 files changed, 92 insertions(+), 10 deletions(-)


diff --git a/Lib/subprocess.py b/Lib/subprocess.py
--- a/Lib/subprocess.py
+++ b/Lib/subprocess.py
@@ -405,6 +405,10 @@
     import _posixsubprocess
     import select
     import selectors
+    try:
+        import threading
+    except ImportError:
+        import dummy_threading as threading
 
     # When select or poll has indicated that the file is writable,
     # we can write up to _PIPE_BUF bytes without risk of blocking.
@@ -748,6 +752,12 @@
                  pass_fds=()):
         """Create new Popen instance."""
         _cleanup()
+        # Held while anything is calling waitpid before returncode has been
+        # updated to prevent clobbering returncode if wait() or poll() are
+        # called from multiple threads at once.  After acquiring the lock,
+        # code must re-check self.returncode to see if another thread just
+        # finished a waitpid() call.
+        self._waitpid_lock = threading.Lock()
 
         self._input = None
         self._communication_started = False
@@ -1450,6 +1460,7 @@
         def _handle_exitstatus(self, sts, _WIFSIGNALED=os.WIFSIGNALED,
                 _WTERMSIG=os.WTERMSIG, _WIFEXITED=os.WIFEXITED,
                 _WEXITSTATUS=os.WEXITSTATUS):
+            """All callers to this function MUST hold self._waitpid_lock."""
             # This method is called (indirectly) by __del__, so it cannot
             # refer to anything outside of its local scope.
             if _WIFSIGNALED(sts):
@@ -1471,7 +1482,13 @@
 
             """
             if self.returncode is None:
+                if not self._waitpid_lock.acquire(False):
+                    # Something else is busy calling waitpid.  Don't allow two
+                    # at once.  We know nothing yet.
+                    return None
                 try:
+                    if self.returncode is not None:
+                        return self.returncode  # Another thread waited.
                     pid, sts = _waitpid(self.pid, _WNOHANG)
                     if pid == self.pid:
                         self._handle_exitstatus(sts)
@@ -1485,10 +1502,13 @@
                         # can't get the status.
                         # http://bugs.python.org/issue15756
                         self.returncode = 0
+                finally:
+                    self._waitpid_lock.release()
             return self.returncode
 
 
         def _try_wait(self, wait_flags):
+            """All callers to this function MUST hold self._waitpid_lock."""
             try:
                 (pid, sts) = _eintr_retry_call(os.waitpid, self.pid, wait_flags)
             except OSError as e:
@@ -1521,11 +1541,17 @@
                 # cribbed from Lib/threading.py in Thread.wait() at r71065.
                 delay = 0.0005 # 500 us -> initial delay of 1 ms
                 while True:
-                    (pid, sts) = self._try_wait(os.WNOHANG)
-                    assert pid == self.pid or pid == 0
-                    if pid == self.pid:
-                        self._handle_exitstatus(sts)
-                        break
+                    if self._waitpid_lock.acquire(False):
+                        try:
+                            if self.returncode is not None:
+                                break  # Another thread waited.
+                            (pid, sts) = self._try_wait(os.WNOHANG)
+                            assert pid == self.pid or pid == 0
+                            if pid == self.pid:
+                                self._handle_exitstatus(sts)
+                                break
+                        finally:
+                            self._waitpid_lock.release()
                     remaining = self._remaining_time(endtime)
                     if remaining <= 0:
                         raise TimeoutExpired(self.args, timeout)
@@ -1533,11 +1559,15 @@
                     time.sleep(delay)
             else:
                 while self.returncode is None:
-                    (pid, sts) = self._try_wait(0)
-                    # Check the pid and loop as waitpid has been known to return
-                    # 0 even without WNOHANG in odd situations.  issue14396.
-                    if pid == self.pid:
-                        self._handle_exitstatus(sts)
+                    with self._waitpid_lock:
+                        if self.returncode is not None:
+                            break  # Another thread waited.
+                        (pid, sts) = self._try_wait(0)
+                        # Check the pid and loop as waitpid has been known to
+                        # return 0 even without WNOHANG in odd situations.
+                        # http://bugs.python.org/issue14396.
+                        if pid == self.pid:
+                            self._handle_exitstatus(sts)
             return self.returncode
 
 
diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py
--- a/Lib/test/test_subprocess.py
+++ b/Lib/test/test_subprocess.py
@@ -1052,6 +1052,54 @@
             if exc is not None:
                 raise exc
 
+    @unittest.skipIf(threading is None, "threading required")
+    def test_threadsafe_wait(self):
+        """Issue21291: Popen.wait() needs to be threadsafe for returncode."""
+        proc = subprocess.Popen([sys.executable, '-c',
+                                 'import time; time.sleep(12)'])
+        self.assertEqual(proc.returncode, None)
+        results = []
+
+        def kill_proc_timer_thread():
+            results.append(('thread-start-poll-result', proc.poll()))
+            # terminate it from the thread and wait for the result.
+            proc.kill()
+            proc.wait()
+            results.append(('thread-after-kill-and-wait', proc.returncode))
+            # this wait should be a no-op given the above.
+            proc.wait()
+            results.append(('thread-after-second-wait', proc.returncode))
+
+        # This is a timing sensitive test, the failure mode is
+        # triggered when both the main thread and this thread are in
+        # the wait() call at once.  The delay here is to allow the
+        # main thread to most likely be blocked in its wait() call.
+        t = threading.Timer(0.2, kill_proc_timer_thread)
+        t.start()
+
+        # Wait for the process to finish; the thread should kill it
+        # long before it finishes on its own.  Supplying a timeout
+        # triggers a different code path for better coverage.
+        proc.wait(timeout=20)
+        # Should be -9 because of the proc.kill() from the thread.
+        self.assertEqual(proc.returncode, -9,
+                         msg="unexpected result in wait from main thread")
+
+        # This should be a no-op with no change in returncode.
+        proc.wait()
+        self.assertEqual(proc.returncode, -9,
+                         msg="unexpected result in second main wait.")
+
+        t.join()
+        # Ensure that all of the thread results are as expected.
+        # When a race condition occurs in wait(), the returncode could
+        # be set by the wrong thread that doesn't actually have it
+        # leading to an incorrect value.
+        self.assertEqual([('thread-start-poll-result', None),
+                          ('thread-after-kill-and-wait', -9),
+                          ('thread-after-second-wait', -9)],
+                         results)
+
     def test_issue8780(self):
         # Ensure that stdout is inherited from the parent
         # if stdout=PIPE is not used
diff --git a/Misc/NEWS b/Misc/NEWS
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -39,6 +39,10 @@
 Library
 -------
 
+- Issue #21291: subprocess's Popen.wait() is now thread safe so that
+  multiple threads may be calling wait() or poll() on a Popen instance
+  at the same time without losing the Popen.returncode value.
+
 - Issue #21127: Path objects can now be instantiated from str subclass
   instances (such as numpy.str_).
 

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list