[Python-checkins] cpython: Issue #25220: Create libregrtest/runtest_mp.py

victor.stinner python-checkins at python.org
Tue Sep 29 23:25:04 CEST 2015


https://hg.python.org/cpython/rev/12c666eea556
changeset:   98416:12c666eea556
user:        Victor Stinner <victor.stinner at gmail.com>
date:        Tue Sep 29 23:15:38 2015 +0200
summary:
  Issue #25220: Create libregrtest/runtest_mp.py

Move the code to run tests in multiple processes using threading and subprocess
to a new submodule.

Move also slave_runner() (renamed to run_tests_slave()) and
run_test_in_subprocess() (renamed to run_tests_in_subprocess()) there.

files:
  Lib/test/libregrtest/main.py       |  120 +------------
  Lib/test/libregrtest/runtest.py    |   33 ---
  Lib/test/libregrtest/runtest_mp.py |  158 +++++++++++++++++
  3 files changed, 164 insertions(+), 147 deletions(-)


diff --git a/Lib/test/libregrtest/main.py b/Lib/test/libregrtest/main.py
--- a/Lib/test/libregrtest/main.py
+++ b/Lib/test/libregrtest/main.py
@@ -1,5 +1,4 @@
 import faulthandler
-import json
 import os
 import platform
 import random
@@ -9,13 +8,10 @@
 import sysconfig
 import tempfile
 import textwrap
-import traceback
 import unittest
 from test.libregrtest.runtest import (
-    findtests, runtest, run_test_in_subprocess,
-    STDTESTS, NOTTESTS,
-    PASSED, FAILED, ENV_CHANGED, SKIPPED,
-    RESOURCE_DENIED, INTERRUPTED, CHILD_ERROR)
+    findtests, runtest,
+    STDTESTS, NOTTESTS, PASSED, FAILED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED)
 from test.libregrtest.refleak import warm_caches
 from test.libregrtest.cmdline import _parse_args
 from test import support
@@ -39,23 +35,6 @@
 TEMPDIR = os.path.abspath(TEMPDIR)
 
 
-def slave_runner(slaveargs):
-    args, kwargs = json.loads(slaveargs)
-    if kwargs.get('huntrleaks'):
-        unittest.BaseTestSuite._cleanup = False
-    try:
-        result = runtest(*args, **kwargs)
-    except KeyboardInterrupt:
-        result = INTERRUPTED, ''
-    except BaseException as e:
-        traceback.print_exc()
-        result = CHILD_ERROR, str(e)
-    sys.stdout.flush()
-    print()   # Force a newline (just in case)
-    print(json.dumps(result))
-    sys.exit(0)
-
-
 def setup_python():
     # Display the Python traceback on fatal errors (e.g. segfault)
     faulthandler.enable(all_threads=True)
@@ -367,75 +346,6 @@
                     print(count(len(self.bad), 'test'), "failed again:")
                     printlist(self.bad)
 
-    def _run_tests_mp(self):
-        try:
-            from threading import Thread
-        except ImportError:
-            print("Multiprocess option requires thread support")
-            sys.exit(2)
-        from queue import Queue
-
-        debug_output_pat = re.compile(r"\[\d+ refs, \d+ blocks\]$")
-        output = Queue()
-        pending = MultiprocessTests(self.tests)
-
-        def work():
-            # A worker thread.
-            try:
-                while True:
-                    try:
-                        test = next(pending)
-                    except StopIteration:
-                        output.put((None, None, None, None))
-                        return
-                    retcode, stdout, stderr = run_test_in_subprocess(test, self.ns)
-                    # Strip last refcount output line if it exists, since it
-                    # comes from the shutdown of the interpreter in the subcommand.
-                    stderr = debug_output_pat.sub("", stderr)
-                    stdout, _, result = stdout.strip().rpartition("\n")
-                    if retcode != 0:
-                        result = (CHILD_ERROR, "Exit code %s" % retcode)
-                        output.put((test, stdout.rstrip(), stderr.rstrip(), result))
-                        return
-                    if not result:
-                        output.put((None, None, None, None))
-                        return
-                    result = json.loads(result)
-                    output.put((test, stdout.rstrip(), stderr.rstrip(), result))
-            except BaseException:
-                output.put((None, None, None, None))
-                raise
-
-        workers = [Thread(target=work) for i in range(self.ns.use_mp)]
-        for worker in workers:
-            worker.start()
-        finished = 0
-        test_index = 1
-        try:
-            while finished < self.ns.use_mp:
-                test, stdout, stderr, result = output.get()
-                if test is None:
-                    finished += 1
-                    continue
-                self.accumulate_result(test, result)
-                self.display_progress(test_index, test)
-                if stdout:
-                    print(stdout)
-                if stderr:
-                    print(stderr, file=sys.stderr)
-                sys.stdout.flush()
-                sys.stderr.flush()
-                if result[0] == INTERRUPTED:
-                    raise KeyboardInterrupt
-                if result[0] == CHILD_ERROR:
-                    raise Exception("Child error on {}: {}".format(test, result[1]))
-                test_index += 1
-        except KeyboardInterrupt:
-            self.interrupted = True
-            pending.interrupted = True
-        for worker in workers:
-            worker.join()
-
     def _run_tests_sequential(self):
         save_modules = sys.modules.keys()
 
@@ -491,7 +401,8 @@
             self.test_count_width = len(self.test_count) - 1
 
         if self.ns.use_mp:
-            self._run_tests_mp()
+            from test.libregrtest.runtest_mp import run_tests_multiprocess
+            run_tests_multiprocess(self)
         else:
             self._run_tests_sequential()
 
@@ -518,7 +429,8 @@
         if self.ns.wait:
             input("Press any key to continue...")
         if self.ns.slaveargs is not None:
-            slave_runner(self.ns.slaveargs)
+            from test.libregrtest.runtest_mp import run_tests_slave
+            run_tests_slave(self.ns.slaveargs)
         self.find_tests(tests)
         self.run_tests()
         self.display_result()
@@ -526,26 +438,6 @@
         sys.exit(len(self.bad) > 0 or self.interrupted)
 
 
-# We do not use a generator so multiple threads can call next().
-class MultiprocessTests(object):
-
-    """A thread-safe iterator over tests for multiprocess mode."""
-
-    def __init__(self, tests):
-        self.interrupted = False
-        self.lock = threading.Lock()
-        self.tests = tests
-
-    def __iter__(self):
-        return self
-
-    def __next__(self):
-        with self.lock:
-            if self.interrupted:
-                raise StopIteration('tests interrupted')
-            return next(self.tests)
-
-
 def replace_stdout():
     """Set stdout encoder error handler to backslashreplace (as stderr error
     handler) to avoid UnicodeEncodeError when printing a traceback"""
diff --git a/Lib/test/libregrtest/runtest.py b/Lib/test/libregrtest/runtest.py
--- a/Lib/test/libregrtest/runtest.py
+++ b/Lib/test/libregrtest/runtest.py
@@ -1,7 +1,6 @@
 import faulthandler
 import importlib
 import io
-import json
 import os
 import sys
 import time
@@ -22,38 +21,6 @@
 CHILD_ERROR = -5   # error in a child process
 
 
-def run_test_in_subprocess(testname, ns):
-    """Run the given test in a subprocess with --slaveargs.
-
-    ns is the option Namespace parsed from command-line arguments. regrtest
-    is invoked in a subprocess with the --slaveargs argument; when the
-    subprocess exits, its return code, stdout and stderr are returned as a
-    3-tuple.
-    """
-    from subprocess import Popen, PIPE
-    base_cmd = ([sys.executable] + support.args_from_interpreter_flags() +
-                ['-X', 'faulthandler', '-m', 'test.regrtest'])
-
-    slaveargs = (
-            (testname, ns.verbose, ns.quiet),
-            dict(huntrleaks=ns.huntrleaks,
-                 use_resources=ns.use_resources,
-                 output_on_failure=ns.verbose3,
-                 timeout=ns.timeout, failfast=ns.failfast,
-                 match_tests=ns.match_tests))
-    # Running the child from the same working directory as regrtest's original
-    # invocation ensures that TEMPDIR for the child is the same when
-    # sysconfig.is_python_build() is true. See issue 15300.
-    popen = Popen(base_cmd + ['--slaveargs', json.dumps(slaveargs)],
-                  stdout=PIPE, stderr=PIPE,
-                  universal_newlines=True,
-                  close_fds=(os.name != 'nt'),
-                  cwd=support.SAVEDCWD)
-    stdout, stderr = popen.communicate()
-    retcode = popen.wait()
-    return retcode, stdout, stderr
-
-
 # small set of tests to determine if we have a basically functioning interpreter
 # (i.e. if any of these fail, then anything else is likely to follow)
 STDTESTS = [
diff --git a/Lib/test/libregrtest/runtest_mp.py b/Lib/test/libregrtest/runtest_mp.py
new file mode 100644
--- /dev/null
+++ b/Lib/test/libregrtest/runtest_mp.py
@@ -0,0 +1,158 @@
+import json
+import os
+import re
+import sys
+import traceback
+import unittest
+from queue import Queue
+from test import support
+try:
+    import threading
+except ImportError:
+    print("Multiprocess option requires thread support")
+    sys.exit(2)
+
+from test.libregrtest.runtest import runtest, INTERRUPTED, CHILD_ERROR
+
+
+debug_output_pat = re.compile(r"\[\d+ refs, \d+ blocks\]$")
+
+
+def run_tests_in_subprocess(testname, ns):
+    """Run the given test in a subprocess with --slaveargs.
+
+    ns is the option Namespace parsed from command-line arguments. regrtest
+    is invoked in a subprocess with the --slaveargs argument; when the
+    subprocess exits, its return code, stdout and stderr are returned as a
+    3-tuple.
+    """
+    from subprocess import Popen, PIPE
+    base_cmd = ([sys.executable] + support.args_from_interpreter_flags() +
+                ['-X', 'faulthandler', '-m', 'test.regrtest'])
+
+    slaveargs = (
+            (testname, ns.verbose, ns.quiet),
+            dict(huntrleaks=ns.huntrleaks,
+                 use_resources=ns.use_resources,
+                 output_on_failure=ns.verbose3,
+                 timeout=ns.timeout, failfast=ns.failfast,
+                 match_tests=ns.match_tests))
+    # Running the child from the same working directory as regrtest's original
+    # invocation ensures that TEMPDIR for the child is the same when
+    # sysconfig.is_python_build() is true. See issue 15300.
+    popen = Popen(base_cmd + ['--slaveargs', json.dumps(slaveargs)],
+                  stdout=PIPE, stderr=PIPE,
+                  universal_newlines=True,
+                  close_fds=(os.name != 'nt'),
+                  cwd=support.SAVEDCWD)
+    stdout, stderr = popen.communicate()
+    retcode = popen.wait()
+    return retcode, stdout, stderr
+
+
+def run_tests_slave(slaveargs):
+    args, kwargs = json.loads(slaveargs)
+    if kwargs.get('huntrleaks'):
+        unittest.BaseTestSuite._cleanup = False
+    try:
+        result = runtest(*args, **kwargs)
+    except KeyboardInterrupt:
+        result = INTERRUPTED, ''
+    except BaseException as e:
+        traceback.print_exc()
+        result = CHILD_ERROR, str(e)
+    sys.stdout.flush()
+    print()   # Force a newline (just in case)
+    print(json.dumps(result))
+    sys.exit(0)
+
+
+# We do not use a generator so multiple threads can call next().
+class MultiprocessIterator:
+
+    """A thread-safe iterator over tests for multiprocess mode."""
+
+    def __init__(self, tests):
+        self.interrupted = False
+        self.lock = threading.Lock()
+        self.tests = tests
+
+    def __iter__(self):
+        return self
+
+    def __next__(self):
+        with self.lock:
+            if self.interrupted:
+                raise StopIteration('tests interrupted')
+            return next(self.tests)
+
+
+class MultiprocessThread(threading.Thread):
+    def __init__(self, pending, output, ns):
+        super().__init__()
+        self.pending = pending
+        self.output = output
+        self.ns = ns
+
+    def run(self):
+        # A worker thread.
+        try:
+            while True:
+                try:
+                    test = next(self.pending)
+                except StopIteration:
+                    self.output.put((None, None, None, None))
+                    return
+                retcode, stdout, stderr = run_tests_in_subprocess(test, self.ns)
+                # Strip last refcount output line if it exists, since it
+                # comes from the shutdown of the interpreter in the subcommand.
+                stderr = debug_output_pat.sub("", stderr)
+                stdout, _, result = stdout.strip().rpartition("\n")
+                if retcode != 0:
+                    result = (CHILD_ERROR, "Exit code %s" % retcode)
+                    self.output.put((test, stdout.rstrip(), stderr.rstrip(), result))
+                    return
+                if not result:
+                    self.output.put((None, None, None, None))
+                    return
+                result = json.loads(result)
+                self.output.put((test, stdout.rstrip(), stderr.rstrip(), result))
+        except BaseException:
+            self.output.put((None, None, None, None))
+            raise
+
+
+def run_tests_multiprocess(regrtest):
+    output = Queue()
+    pending = MultiprocessIterator(regrtest.tests)
+
+    workers = [MultiprocessThread(pending, output, regrtest.ns)
+               for i in range(regrtest.ns.use_mp)]
+    for worker in workers:
+        worker.start()
+    finished = 0
+    test_index = 1
+    try:
+        while finished < regrtest.ns.use_mp:
+            test, stdout, stderr, result = output.get()
+            if test is None:
+                finished += 1
+                continue
+            regrtest.accumulate_result(test, result)
+            regrtest.display_progress(test_index, test)
+            if stdout:
+                print(stdout)
+            if stderr:
+                print(stderr, file=sys.stderr)
+            sys.stdout.flush()
+            sys.stderr.flush()
+            if result[0] == INTERRUPTED:
+                raise KeyboardInterrupt
+            if result[0] == CHILD_ERROR:
+                raise Exception("Child error on {}: {}".format(test, result[1]))
+            test_index += 1
+    except KeyboardInterrupt:
+        regrtest.interrupted = True
+        pending.interrupted = True
+    for worker in workers:
+        worker.join()

-- 
Repository URL: https://hg.python.org/cpython


More information about the Python-checkins mailing list