[Python-checkins] bpo-38456: Use /bin/true in test_subprocess (GH-16736)

Gregory P. Smith webhook-mailer at python.org
Sat Oct 12 19:35:57 EDT 2019


https://github.com/python/cpython/commit/67b93f80c764bca01c81c989d74a99df208bea4d
commit: 67b93f80c764bca01c81c989d74a99df208bea4d
branch: master
author: Gregory P. Smith <greg at krypto.org>
committer: GitHub <noreply at github.com>
date: 2019-10-12T16:35:53-07:00
summary:

bpo-38456: Use /bin/true in test_subprocess (GH-16736)

* bpo-38456: Use /bin/true in test_subprocess.

Instead of sys.executable, "-c", "pass" or "import sys; sys.exit(0)"
use /bin/true when it is available.  On a reasonable machine this
shaves up to two seconds wall time off the otherwise ~40sec execution
on a --with-pydebug build.  It should be more notable on many
buildbots or overloaded slower I/O systems (CI, etc).

files:
M Lib/test/test_subprocess.py

diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py
index 5cc324b87894c..2231ff49235c8 100644
--- a/Lib/test/test_subprocess.py
+++ b/Lib/test/test_subprocess.py
@@ -54,6 +54,16 @@
 # Ignore errors that indicate the command was not found
 NONEXISTING_ERRORS = (FileNotFoundError, NotADirectoryError, PermissionError)
 
+ZERO_RETURN_CMD = (sys.executable, '-c', 'pass')
+
+
+def setUpModule():
+    shell_true = shutil.which('true')
+    if (os.access(shell_true, os.X_OK) and
+        subprocess.run([shell_true]).returncode == 0):
+        global ZERO_RETURN_CMD
+        ZERO_RETURN_CMD = (shell_true,)  # Faster than Python startup.
+
 
 class BaseTestCase(unittest.TestCase):
     def setUp(self):
@@ -98,7 +108,7 @@ def _execute_child(self, *args, **kwargs):
 class ProcessTestCase(BaseTestCase):
 
     def test_io_buffered_by_default(self):
-        p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"],
+        p = subprocess.Popen(ZERO_RETURN_CMD,
                              stdin=subprocess.PIPE, stdout=subprocess.PIPE,
                              stderr=subprocess.PIPE)
         try:
@@ -112,7 +122,7 @@ def test_io_buffered_by_default(self):
             p.wait()
 
     def test_io_unbuffered_works(self):
-        p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"],
+        p = subprocess.Popen(ZERO_RETURN_CMD,
                              stdin=subprocess.PIPE, stdout=subprocess.PIPE,
                              stderr=subprocess.PIPE, bufsize=0)
         try:
@@ -142,8 +152,7 @@ def test_call_timeout(self):
 
     def test_check_call_zero(self):
         # check_call() function with zero return code
-        rc = subprocess.check_call([sys.executable, "-c",
-                                    "import sys; sys.exit(0)"])
+        rc = subprocess.check_call(ZERO_RETURN_CMD)
         self.assertEqual(rc, 0)
 
     def test_check_call_nonzero(self):
@@ -709,19 +718,19 @@ def test_invalid_env(self):
         newenv = os.environ.copy()
         newenv["FRUIT\0VEGETABLE"] = "cabbage"
         with self.assertRaises(ValueError):
-            subprocess.Popen([sys.executable, "-c", "pass"], env=newenv)
+            subprocess.Popen(ZERO_RETURN_CMD, env=newenv)
 
         # null character in the environment variable value
         newenv = os.environ.copy()
         newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
         with self.assertRaises(ValueError):
-            subprocess.Popen([sys.executable, "-c", "pass"], env=newenv)
+            subprocess.Popen(ZERO_RETURN_CMD, env=newenv)
 
         # equal character in the environment variable name
         newenv = os.environ.copy()
         newenv["FRUIT=ORANGE"] = "lemon"
         with self.assertRaises(ValueError):
-            subprocess.Popen([sys.executable, "-c", "pass"], env=newenv)
+            subprocess.Popen(ZERO_RETURN_CMD, env=newenv)
 
         # equal character in the environment variable value
         newenv = os.environ.copy()
@@ -822,7 +831,7 @@ def test_communicate_pipe_fd_leak(self):
                         options['stderr'] = subprocess.PIPE
                     if not options:
                         continue
-                    p = subprocess.Popen((sys.executable, "-c", "pass"), **options)
+                    p = subprocess.Popen(ZERO_RETURN_CMD, **options)
                     p.communicate()
                     if p.stdin is not None:
                         self.assertTrue(p.stdin.closed)
@@ -961,7 +970,7 @@ def test_universal_newlines_communicate_input_none(self):
         #
         # We set stdout to PIPE because, as of this writing, a different
         # code path is tested when the number of pipes is zero or one.
-        p = subprocess.Popen([sys.executable, "-c", "pass"],
+        p = subprocess.Popen(ZERO_RETURN_CMD,
                              stdin=subprocess.PIPE,
                              stdout=subprocess.PIPE,
                              universal_newlines=True)
@@ -1109,7 +1118,7 @@ def test_poll(self):
         self.assertEqual(p.poll(), 0)
 
     def test_wait(self):
-        p = subprocess.Popen([sys.executable, "-c", "pass"])
+        p = subprocess.Popen(ZERO_RETURN_CMD)
         self.assertEqual(p.wait(), 0)
         # Subsequent invocations should just return the returncode
         self.assertEqual(p.wait(), 0)
@@ -1128,14 +1137,14 @@ def test_invalid_bufsize(self):
         # an invalid type of the bufsize argument should raise
         # TypeError.
         with self.assertRaises(TypeError):
-            subprocess.Popen([sys.executable, "-c", "pass"], "orange")
+            subprocess.Popen(ZERO_RETURN_CMD, "orange")
 
     def test_bufsize_is_none(self):
         # bufsize=None should be the same as bufsize=0.
-        p = subprocess.Popen([sys.executable, "-c", "pass"], None)
+        p = subprocess.Popen(ZERO_RETURN_CMD, None)
         self.assertEqual(p.wait(), 0)
         # Again with keyword arg
-        p = subprocess.Popen([sys.executable, "-c", "pass"], bufsize=None)
+        p = subprocess.Popen(ZERO_RETURN_CMD, bufsize=None)
         self.assertEqual(p.wait(), 0)
 
     def _test_bufsize_equal_one(self, line, expected, universal_newlines):
@@ -1340,7 +1349,7 @@ def test_handles_closed_on_exception(self):
 
     def test_communicate_epipe(self):
         # Issue 10963: communicate() should hide EPIPE
-        p = subprocess.Popen([sys.executable, "-c", 'pass'],
+        p = subprocess.Popen(ZERO_RETURN_CMD,
                              stdin=subprocess.PIPE,
                              stdout=subprocess.PIPE,
                              stderr=subprocess.PIPE)
@@ -1351,7 +1360,7 @@ def test_communicate_epipe(self):
 
     def test_communicate_epipe_only_stdin(self):
         # Issue 10963: communicate() should hide EPIPE
-        p = subprocess.Popen([sys.executable, "-c", 'pass'],
+        p = subprocess.Popen(ZERO_RETURN_CMD,
                              stdin=subprocess.PIPE)
         self.addCleanup(p.stdin.close)
         p.wait()
@@ -1390,7 +1399,7 @@ def test_failed_child_execute_fd_leak(self):
         fds_before_popen = os.listdir(fd_directory)
         with self.assertRaises(PopenTestException):
             PopenExecuteChildRaises(
-                    [sys.executable, '-c', 'pass'], stdin=subprocess.PIPE,
+                    ZERO_RETURN_CMD, stdin=subprocess.PIPE,
                     stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 
         # NOTE: This test doesn't verify that the real _execute_child
@@ -1433,7 +1442,7 @@ def test_check(self):
 
     def test_check_zero(self):
         # check_returncode shouldn't raise when returncode is zero
-        cp = self.run_python("import sys; sys.exit(0)", check=True)
+        cp = subprocess.run(ZERO_RETURN_CMD, check=True)
         self.assertEqual(cp.returncode, 0)
 
     def test_timeout(self):
@@ -1791,16 +1800,16 @@ def test_user(self):
                         self.assertEqual(child_user, user_uid)
 
         with self.assertRaises(ValueError):
-            subprocess.check_call([sys.executable, "-c", "pass"], user=-1)
+            subprocess.check_call(ZERO_RETURN_CMD, user=-1)
 
         if pwd is None:
             with self.assertRaises(ValueError):
-                subprocess.check_call([sys.executable, "-c", "pass"], user=name_uid)
+                subprocess.check_call(ZERO_RETURN_CMD, user=name_uid)
 
     @unittest.skipIf(hasattr(os, 'setreuid'), 'setreuid() available on platform')
     def test_user_error(self):
         with self.assertRaises(ValueError):
-            subprocess.check_call([sys.executable, "-c", "pass"], user=65535)
+            subprocess.check_call(ZERO_RETURN_CMD, user=65535)
 
     @unittest.skipUnless(hasattr(os, 'setregid'), 'no setregid() on platform')
     def test_group(self):
@@ -1834,16 +1843,16 @@ def test_group(self):
 
         # make sure we bomb on negative values
         with self.assertRaises(ValueError):
-            subprocess.check_call([sys.executable, "-c", "pass"], group=-1)
+            subprocess.check_call(ZERO_RETURN_CMD, group=-1)
 
         if grp is None:
             with self.assertRaises(ValueError):
-                subprocess.check_call([sys.executable, "-c", "pass"], group=name_group)
+                subprocess.check_call(ZERO_RETURN_CMD, group=name_group)
 
     @unittest.skipIf(hasattr(os, 'setregid'), 'setregid() available on platform')
     def test_group_error(self):
         with self.assertRaises(ValueError):
-            subprocess.check_call([sys.executable, "-c", "pass"], group=65535)
+            subprocess.check_call(ZERO_RETURN_CMD, group=65535)
 
     @unittest.skipUnless(hasattr(os, 'setgroups'), 'no setgroups() on platform')
     def test_extra_groups(self):
@@ -1882,17 +1891,17 @@ def test_extra_groups(self):
 
         # make sure we bomb on negative values
         with self.assertRaises(ValueError):
-            subprocess.check_call([sys.executable, "-c", "pass"], extra_groups=[-1])
+            subprocess.check_call(ZERO_RETURN_CMD, extra_groups=[-1])
 
         if grp is None:
             with self.assertRaises(ValueError):
-                subprocess.check_call([sys.executable, "-c", "pass"],
+                subprocess.check_call(ZERO_RETURN_CMD,
                                       extra_groups=[name_group])
 
     @unittest.skipIf(hasattr(os, 'setgroups'), 'setgroups() available on platform')
     def test_extra_groups_error(self):
         with self.assertRaises(ValueError):
-            subprocess.check_call([sys.executable, "-c", "pass"], extra_groups=[])
+            subprocess.check_call(ZERO_RETURN_CMD, extra_groups=[])
 
     @unittest.skipIf(mswindows or not hasattr(os, 'umask'),
                      'POSIX umask() is not available.')
@@ -1904,7 +1913,7 @@ def test_umask(self):
             # We set an unusual umask in the child so as a unique mode
             # for us to test the child's touched file for.
             subprocess.check_call(
-                    [sys.executable, "-c", f"open({name!r}, 'w')"],  # touch
+                    [sys.executable, "-c", f"open({name!r}, 'w').close()"],
                     umask=0o053)
             # Ignore execute permissions entirely in our test,
             # filesystems could be mounted to ignore or force that.
@@ -2007,7 +2016,7 @@ def raise_it():
 
         with self.assertRaises(subprocess.SubprocessError):
             self._TestExecuteChildPopen(
-                        self, [sys.executable, "-c", "pass"],
+                        self, ZERO_RETURN_CMD,
                         stdin=subprocess.PIPE, stdout=subprocess.PIPE,
                         stderr=subprocess.PIPE, preexec_fn=raise_it)
 
@@ -2464,7 +2473,7 @@ def prepare():
 
         try:
             subprocess.call(
-                [sys.executable, "-c", "pass"],
+                ZERO_RETURN_CMD,
                 preexec_fn=prepare)
         except ValueError as err:
             # Pure Python implementations keeps the message
@@ -2507,29 +2516,30 @@ def test_undecodable_env(self):
             self.assertEqual(stdout.decode('ascii'), ascii(encoded_value))
 
     def test_bytes_program(self):
-        abs_program = os.fsencode(sys.executable)
-        path, program = os.path.split(sys.executable)
+        abs_program = os.fsencode(ZERO_RETURN_CMD[0])
+        args = list(ZERO_RETURN_CMD[1:])
+        path, program = os.path.split(ZERO_RETURN_CMD[0])
         program = os.fsencode(program)
 
         # absolute bytes path
-        exitcode = subprocess.call([abs_program, "-c", "pass"])
+        exitcode = subprocess.call([abs_program]+args)
         self.assertEqual(exitcode, 0)
 
         # absolute bytes path as a string
-        cmd = b"'" + abs_program + b"' -c pass"
+        cmd = b"'%s' %s" % (abs_program, " ".join(args).encode("utf-8"))
         exitcode = subprocess.call(cmd, shell=True)
         self.assertEqual(exitcode, 0)
 
         # bytes program, unicode PATH
         env = os.environ.copy()
         env["PATH"] = path
-        exitcode = subprocess.call([program, "-c", "pass"], env=env)
+        exitcode = subprocess.call([program]+args, env=env)
         self.assertEqual(exitcode, 0)
 
         # bytes program, bytes PATH
         envb = os.environb.copy()
         envb[b"PATH"] = os.fsencode(path)
-        exitcode = subprocess.call([program, "-c", "pass"], env=envb)
+        exitcode = subprocess.call([program]+args, env=envb)
         self.assertEqual(exitcode, 0)
 
     def test_pipe_cloexec(self):
@@ -2757,7 +2767,7 @@ def test_pass_fds(self):
             # pass_fds overrides close_fds with a warning.
             with self.assertWarns(RuntimeWarning) as context:
                 self.assertFalse(subprocess.call(
-                        [sys.executable, "-c", "import sys; sys.exit(0)"],
+                        ZERO_RETURN_CMD,
                         close_fds=False, pass_fds=(fd, )))
             self.assertIn('overriding close_fds', str(context.warning))
 
@@ -2819,19 +2829,19 @@ def test_pass_fds_redirected(self):
 
     def test_stdout_stdin_are_single_inout_fd(self):
         with io.open(os.devnull, "r+") as inout:
-            p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"],
+            p = subprocess.Popen(ZERO_RETURN_CMD,
                                  stdout=inout, stdin=inout)
             p.wait()
 
     def test_stdout_stderr_are_single_inout_fd(self):
         with io.open(os.devnull, "r+") as inout:
-            p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"],
+            p = subprocess.Popen(ZERO_RETURN_CMD,
                                  stdout=inout, stderr=inout)
             p.wait()
 
     def test_stderr_stdin_are_single_inout_fd(self):
         with io.open(os.devnull, "r+") as inout:
-            p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"],
+            p = subprocess.Popen(ZERO_RETURN_CMD,
                                  stderr=inout, stdin=inout)
             p.wait()
 
@@ -3031,7 +3041,7 @@ def __int__(self):
     def test_communicate_BrokenPipeError_stdin_close(self):
         # By not setting stdout or stderr or a timeout we force the fast path
         # that just calls _stdin_write() internally due to our mock.
-        proc = subprocess.Popen([sys.executable, '-c', 'pass'])
+        proc = subprocess.Popen(ZERO_RETURN_CMD)
         with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin:
             mock_proc_stdin.close.side_effect = BrokenPipeError
             proc.communicate()  # Should swallow BrokenPipeError from close.
@@ -3040,7 +3050,7 @@ def test_communicate_BrokenPipeError_stdin_close(self):
     def test_communicate_BrokenPipeError_stdin_write(self):
         # By not setting stdout or stderr or a timeout we force the fast path
         # that just calls _stdin_write() internally due to our mock.
-        proc = subprocess.Popen([sys.executable, '-c', 'pass'])
+        proc = subprocess.Popen(ZERO_RETURN_CMD)
         with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin:
             mock_proc_stdin.write.side_effect = BrokenPipeError
             proc.communicate(b'stuff')  # Should swallow the BrokenPipeError.
@@ -3079,7 +3089,7 @@ def test_communicate_BrokenPipeError_stdin_close_with_timeout(self):
                          'need _testcapi.W_STOPCODE')
     def test_stopped(self):
         """Test wait() behavior when waitpid returns WIFSTOPPED; issue29335."""
-        args = [sys.executable, '-c', 'pass']
+        args = ZERO_RETURN_CMD
         proc = subprocess.Popen(args)
 
         # Wait until the real process completes to avoid zombie process
@@ -3109,7 +3119,7 @@ def test_startupinfo(self):
         # Since Python is a console process, it won't be affected
         # by wShowWindow, but the argument should be silently
         # ignored
-        subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"],
+        subprocess.call(ZERO_RETURN_CMD,
                         startupinfo=startupinfo)
 
     def test_startupinfo_keywords(self):
@@ -3125,7 +3135,7 @@ def test_startupinfo_keywords(self):
         # Since Python is a console process, it won't be affected
         # by wShowWindow, but the argument should be silently
         # ignored
-        subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"],
+        subprocess.call(ZERO_RETURN_CMD,
                         startupinfo=startupinfo)
 
     def test_startupinfo_copy(self):
@@ -3137,7 +3147,7 @@ def test_startupinfo_copy(self):
         # Call Popen() twice with the same startupinfo object to make sure
         # that it's not modified
         for _ in range(2):
-            cmd = [sys.executable, "-c", "pass"]
+            cmd = ZERO_RETURN_CMD
             with open(os.devnull, 'w') as null:
                 proc = subprocess.Popen(cmd,
                                         stdout=null,
@@ -3177,7 +3187,7 @@ def test_issue31471(self):
         class BadEnv(dict):
             keys = None
         with self.assertRaises(TypeError):
-            subprocess.Popen([sys.executable, "-c", "pass"], env=BadEnv())
+            subprocess.Popen(ZERO_RETURN_CMD, env=BadEnv())
 
     def test_close_fds(self):
         # close file descriptors
@@ -3238,13 +3248,13 @@ def test_close_fds_with_stdio(self):
     def test_empty_attribute_list(self):
         startupinfo = subprocess.STARTUPINFO()
         startupinfo.lpAttributeList = {}
-        subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"],
+        subprocess.call(ZERO_RETURN_CMD,
                         startupinfo=startupinfo)
 
     def test_empty_handle_list(self):
         startupinfo = subprocess.STARTUPINFO()
         startupinfo.lpAttributeList = {"handle_list": []}
-        subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"],
+        subprocess.call(ZERO_RETURN_CMD,
                         startupinfo=startupinfo)
 
     def test_shell_sequence(self):
@@ -3543,7 +3553,7 @@ def test_invalid_args(self):
 
     def test_broken_pipe_cleanup(self):
         """Broken pipe error should not prevent wait() (Issue 21619)"""
-        proc = subprocess.Popen([sys.executable, '-c', 'pass'],
+        proc = subprocess.Popen(ZERO_RETURN_CMD,
                                 stdin=subprocess.PIPE,
                                 bufsize=support.PIPE_MAX_SIZE*2)
         proc = proc.__enter__()



More information about the Python-checkins mailing list