[Python-checkins] bpo-36725: Refactor regrtest multiprocessing code (GH-12961)

Victor Stinner webhook-mailer at python.org
Fri Apr 26 02:40:46 EDT 2019


https://github.com/python/cpython/commit/3cde440f20a9db75fb2c4e65e8e4d04a53216a2d
commit: 3cde440f20a9db75fb2c4e65e8e4d04a53216a2d
branch: master
author: Victor Stinner <vstinner at redhat.com>
committer: GitHub <noreply at github.com>
date: 2019-04-26T08:40:25+02:00
summary:

bpo-36725: Refactor regrtest multiprocessing code (GH-12961)

Rewrite run_tests_multiprocess() function as a new MultiprocessRunner
class with multiple methods to better report errors and stop
immediately when needed.

Changes:

* Worker processes are now killed immediately if tests are
  interrupted or if a test does crash (CHILD_ERROR): worker
  processes are killed.
* Rewrite how errors in a worker thread are reported to
  the main thread. No longer ignore BaseException or parsing errors
  silently.
* Remove 'finished' variable: use worker.is_alive() instead
* Always compute omitted tests. Add Regrtest.get_executed() method.

files:
A Misc/NEWS.d/next/Tests/2019-04-26-04-12-29.bpo-36725.B8-ghi.rst
M Lib/test/libregrtest/main.py
M Lib/test/libregrtest/runtest.py
M Lib/test/libregrtest/runtest_mp.py
M Lib/test/test_regrtest.py

diff --git a/Lib/test/libregrtest/main.py b/Lib/test/libregrtest/main.py
index ef1336a7e233..606dc268ae3f 100644
--- a/Lib/test/libregrtest/main.py
+++ b/Lib/test/libregrtest/main.py
@@ -79,8 +79,8 @@ def __init__(self):
         self.skipped = []
         self.resource_denieds = []
         self.environment_changed = []
-        self.rerun = []
         self.run_no_tests = []
+        self.rerun = []
         self.first_result = None
         self.interrupted = False
 
@@ -105,6 +105,11 @@ def __init__(self):
         # used by --junit-xml
         self.testsuite_xml = None
 
+    def get_executed(self):
+        return (set(self.good) | set(self.bad) | set(self.skipped)
+                | set(self.resource_denieds) | set(self.environment_changed)
+                | set(self.run_no_tests))
+
     def accumulate_result(self, result):
         test_name = result.test_name
         ok = result.result
@@ -311,8 +316,6 @@ def rerun_failed_tests(self):
                 self.bad.remove(test_name)
 
             if ok.result == INTERRUPTED:
-                # print a newline separate from the ^C
-                print()
                 self.interrupted = True
                 break
         else:
@@ -331,11 +334,11 @@ def display_result(self):
         print("== Tests result: %s ==" % self.get_tests_result())
 
         if self.interrupted:
-            print()
-            # print a newline after ^C
             print("Test suite interrupted by signal SIGINT.")
-            executed = set(self.good) | set(self.bad) | set(self.skipped)
-            omitted = set(self.selected) - executed
+
+        omitted = set(self.selected) - self.get_executed()
+        if omitted:
+            print()
             print(count(len(omitted), "test"), "omitted:")
             printlist(omitted)
 
diff --git a/Lib/test/libregrtest/runtest.py b/Lib/test/libregrtest/runtest.py
index 55913b3842d7..c0cfa5312f70 100644
--- a/Lib/test/libregrtest/runtest.py
+++ b/Lib/test/libregrtest/runtest.py
@@ -275,6 +275,7 @@ def _runtest_inner(ns, test_name, display_failure=True):
     except support.TestDidNotRun:
         return TEST_DID_NOT_RUN
     except KeyboardInterrupt:
+        print()
         return INTERRUPTED
     except:
         if not ns.pgo:
diff --git a/Lib/test/libregrtest/runtest_mp.py b/Lib/test/libregrtest/runtest_mp.py
index 0a95bf622b05..e6c4f4f74a1e 100644
--- a/Lib/test/libregrtest/runtest_mp.py
+++ b/Lib/test/libregrtest/runtest_mp.py
@@ -3,9 +3,11 @@
 import json
 import os
 import queue
+import subprocess
 import sys
 import threading
 import time
+import traceback
 import types
 from test import support
 
@@ -19,20 +21,12 @@
 # Display the running tests if nothing happened last N seconds
 PROGRESS_UPDATE = 30.0   # seconds
 
-# If interrupted, display the wait progress every N seconds
-WAIT_PROGRESS = 2.0   # seconds
 
+def must_stop(result):
+    return result.result in (INTERRUPTED, CHILD_ERROR)
 
-def run_test_in_subprocess(testname, ns):
-    """Run the given test in a subprocess with --worker-args.
-
-    ns is the option Namespace parsed from command-line arguments. regrtest
-    is invoked in a subprocess with the --worker-args argument; when the
-    subprocess exits, its return code, stdout and stderr are returned as a
-    3-tuple.
-    """
-    from subprocess import Popen, PIPE
 
+def run_test_in_subprocess(testname, ns):
     ns_dict = vars(ns)
     worker_args = (ns_dict, testname)
     worker_args = json.dumps(worker_args)
@@ -47,15 +41,12 @@ def run_test_in_subprocess(testname, ns):
     # Running the child from the same working directory as regrtest's original
     # invocation ensures that TEMPDIR for the child is the same when
     # sysconfig.is_python_build() is true. See issue 15300.
-    popen = Popen(cmd,
-                  stdout=PIPE, stderr=PIPE,
-                  universal_newlines=True,
-                  close_fds=(os.name != 'nt'),
-                  cwd=support.SAVEDCWD)
-    with popen:
-        stdout, stderr = popen.communicate()
-        retcode = popen.wait()
-    return retcode, stdout, stderr
+    return subprocess.Popen(cmd,
+                            stdout=subprocess.PIPE,
+                            stderr=subprocess.PIPE,
+                            universal_newlines=True,
+                            close_fds=(os.name != 'nt'),
+                            cwd=support.SAVEDCWD)
 
 
 def run_tests_worker(worker_args):
@@ -66,7 +57,6 @@ def run_tests_worker(worker_args):
 
     result = runtest(ns, testname)
     print()   # Force a newline (just in case)
-
     print(json.dumps(result), flush=True)
     sys.exit(0)
 
@@ -77,7 +67,6 @@ class MultiprocessIterator:
     """A thread-safe iterator over tests for multiprocess mode."""
 
     def __init__(self, tests):
-        self.interrupted = False
         self.lock = threading.Lock()
         self.tests = tests
 
@@ -86,8 +75,6 @@ def __iter__(self):
 
     def __next__(self):
         with self.lock:
-            if self.interrupted:
-                raise StopIteration('tests interrupted')
             return next(self.tests)
 
 
@@ -102,143 +89,202 @@ def __init__(self, pending, output, ns):
         self.ns = ns
         self.current_test_name = None
         self.start_time = None
+        self._popen = None
 
-    def _runtest(self):
-        try:
-            test_name = next(self.pending)
-        except StopIteration:
-            self.output.put(None)
-            return True
+    def kill(self):
+        if not self.is_alive():
+            return
+        if self._popen is not None:
+            self._popen.kill()
 
+    def _runtest(self, test_name):
         try:
             self.start_time = time.monotonic()
             self.current_test_name = test_name
 
-            retcode, stdout, stderr = run_test_in_subprocess(test_name, self.ns)
+            popen = run_test_in_subprocess(test_name, self.ns)
+            self._popen = popen
+            with popen:
+                try:
+                    stdout, stderr = popen.communicate()
+                except:
+                    popen.kill()
+                    popen.wait()
+                    raise
+
+            retcode = popen.wait()
         finally:
             self.current_test_name = None
+            self._popen = None
+
+        stdout = stdout.strip()
+        stderr = stderr.rstrip()
 
+        err_msg = None
         if retcode != 0:
+            err_msg = "Exit code %s" % retcode
+        else:
+            stdout, _, result = stdout.rpartition("\n")
+            stdout = stdout.rstrip()
+            if not result:
+                err_msg = "Failed to parse worker stdout"
+            else:
+                try:
+                    # deserialize run_tests_worker() output
+                    result = json.loads(result)
+                    result = TestResult(*result)
+                except Exception as exc:
+                    err_msg = "Failed to parse worker JSON: %s" % exc
+
+        if err_msg is not None:
             test_time = time.monotonic() - self.start_time
             result = TestResult(test_name, CHILD_ERROR, test_time, None)
-            err_msg = "Exit code %s" % retcode
-            mp_result = MultiprocessResult(result, stdout.rstrip(), stderr.rstrip(), err_msg)
-            self.output.put(mp_result)
-            return False
 
-        stdout, _, result = stdout.strip().rpartition("\n")
-        if not result:
-            self.output.put(None)
-            return True
-
-        # deserialize run_tests_worker() output
-        result = json.loads(result)
-        result = TestResult(*result)
-        mp_result = MultiprocessResult(result, stdout.rstrip(), stderr.rstrip(), None)
-        self.output.put(mp_result)
-        return False
+        return MultiprocessResult(result, stdout, stderr, err_msg)
 
     def run(self):
-        try:
-            stop = False
-            while not stop:
-                stop = self._runtest()
-        except BaseException:
-            self.output.put(None)
-            raise
+        while True:
+            try:
+                try:
+                    test_name = next(self.pending)
+                except StopIteration:
+                    break
 
+                mp_result = self._runtest(test_name)
+                self.output.put((False, mp_result))
 
-def run_tests_multiprocess(regrtest):
-    output = queue.Queue()
-    pending = MultiprocessIterator(regrtest.tests)
-    test_timeout = regrtest.ns.timeout
-    use_timeout = (test_timeout is not None)
-
-    workers = [MultiprocessThread(pending, output, regrtest.ns)
-               for i in range(regrtest.ns.use_mp)]
-    print("Run tests in parallel using %s child processes"
-          % len(workers))
+                if must_stop(mp_result.result):
+                    break
+            except BaseException:
+                self.output.put((True, traceback.format_exc()))
+                break
+
+
+def get_running(workers):
+    running = []
     for worker in workers:
-        worker.start()
-
-    def get_running(workers):
-        running = []
-        for worker in workers:
-            current_test_name = worker.current_test_name
-            if not current_test_name:
-                continue
-            dt = time.monotonic() - worker.start_time
-            if dt >= PROGRESS_MIN_TIME:
-                text = '%s (%s)' % (current_test_name, format_duration(dt))
-                running.append(text)
-        return running
-
-    finished = 0
-    test_index = 1
-    get_timeout = max(PROGRESS_UPDATE, PROGRESS_MIN_TIME)
-    try:
-        while finished < regrtest.ns.use_mp:
-            if use_timeout:
-                faulthandler.dump_traceback_later(test_timeout, exit=True)
+        current_test_name = worker.current_test_name
+        if not current_test_name:
+            continue
+        dt = time.monotonic() - worker.start_time
+        if dt >= PROGRESS_MIN_TIME:
+            text = '%s (%s)' % (current_test_name, format_duration(dt))
+            running.append(text)
+    return running
+
+
+class MultiprocessRunner:
+    def __init__(self, regrtest):
+        self.regrtest = regrtest
+        self.ns = regrtest.ns
+        self.output = queue.Queue()
+        self.pending = MultiprocessIterator(self.regrtest.tests)
+        if self.ns.timeout is not None:
+            self.test_timeout = self.ns.timeout * 1.5
+        else:
+            self.test_timeout = None
+        self.workers = None
+
+    def start_workers(self):
+        self.workers = [MultiprocessThread(self.pending, self.output, self.ns)
+                        for _ in range(self.ns.use_mp)]
+        print("Run tests in parallel using %s child processes"
+              % len(self.workers))
+        for worker in self.workers:
+            worker.start()
+
+    def wait_workers(self):
+        for worker in self.workers:
+            worker.kill()
+        for worker in self.workers:
+            worker.join()
+
+    def _get_result(self):
+        if not any(worker.is_alive() for worker in self.workers):
+            # all worker threads are done: consume pending results
+            try:
+                return self.output.get(timeout=0)
+            except queue.Empty:
+                return None
+
+        while True:
+            if self.test_timeout is not None:
+                faulthandler.dump_traceback_later(self.test_timeout, exit=True)
 
+            # wait for a thread
+            timeout = max(PROGRESS_UPDATE, PROGRESS_MIN_TIME)
             try:
-                mp_result = output.get(timeout=get_timeout)
+                return self.output.get(timeout=timeout)
             except queue.Empty:
-                running = get_running(workers)
-                if running and not regrtest.ns.pgo:
-                    print('running: %s' % ', '.join(running), flush=True)
-                continue
-
-            if mp_result is None:
-                finished += 1
-                continue
-            result = mp_result.result
-            regrtest.accumulate_result(result)
-
-            # Display progress
-            ok = result.result
-
-            text = format_test_result(result)
-            if (ok not in (CHILD_ERROR, INTERRUPTED)
-                and result.test_time >= PROGRESS_MIN_TIME
-                and not regrtest.ns.pgo):
-                text += ' (%s)' % format_duration(result.test_time)
-            elif ok == CHILD_ERROR:
-                text = '%s (%s)' % (text, mp_result.error_msg)
-            running = get_running(workers)
-            if running and not regrtest.ns.pgo:
-                text += ' -- running: %s' % ', '.join(running)
-            regrtest.display_progress(test_index, text)
-
-            # Copy stdout and stderr from the child process
-            if mp_result.stdout:
-                print(mp_result.stdout, flush=True)
-            if mp_result.stderr and not regrtest.ns.pgo:
-                print(mp_result.stderr, file=sys.stderr, flush=True)
-
-            if result.result == INTERRUPTED:
-                raise KeyboardInterrupt
-            test_index += 1
-    except KeyboardInterrupt:
-        regrtest.interrupted = True
-        pending.interrupted = True
-        print()
-    finally:
-        if use_timeout:
-            faulthandler.cancel_dump_traceback_later()
-
-    # If tests are interrupted, wait until tests complete
-    wait_start = time.monotonic()
-    while True:
-        running = [worker.current_test_name for worker in workers]
-        running = list(filter(bool, running))
-        if not running:
-            break
-
-        dt = time.monotonic() - wait_start
-        line = "Waiting for %s (%s tests)" % (', '.join(running), len(running))
-        if dt >= WAIT_PROGRESS:
-            line = "%s since %.0f sec" % (line, dt)
-        print(line, flush=True)
-        for worker in workers:
-            worker.join(WAIT_PROGRESS)
+                pass
+
+            # display progress
+            running = get_running(self.workers)
+            if running and not self.ns.pgo:
+                print('running: %s' % ', '.join(running), flush=True)
+
+    def display_result(self, mp_result):
+        result = mp_result.result
+
+        text = format_test_result(result)
+        if mp_result.error_msg is not None:
+            # CHILD_ERROR
+            text += ' (%s)' % mp_result.error_msg
+        elif (result.test_time >= PROGRESS_MIN_TIME and not self.ns.pgo):
+            text += ' (%s)' % format_duration(result.test_time)
+        running = get_running(self.workers)
+        if running and not self.ns.pgo:
+            text += ' -- running: %s' % ', '.join(running)
+        self.regrtest.display_progress(self.test_index, text)
+
+    def _process_result(self, item):
+        if item[0]:
+            # Thread got an exception
+            format_exc = item[1]
+            print(f"regrtest worker thread failed: {format_exc}",
+                  file=sys.stderr, flush=True)
+            return True
+
+        self.test_index += 1
+        mp_result = item[1]
+        self.regrtest.accumulate_result(mp_result.result)
+        self.display_result(mp_result)
+
+        if mp_result.stdout:
+            print(mp_result.stdout, flush=True)
+        if mp_result.stderr and not self.ns.pgo:
+            print(mp_result.stderr, file=sys.stderr, flush=True)
+
+        if mp_result.result.result == INTERRUPTED:
+            self.regrtest.interrupted = True
+
+        if must_stop(mp_result.result):
+            return True
+
+        return False
+
+    def run_tests(self):
+        self.start_workers()
+
+        self.test_index = 0
+        try:
+            while True:
+                item = self._get_result()
+                if item is None:
+                    break
+
+                stop = self._process_result(item)
+                if stop:
+                    break
+        except KeyboardInterrupt:
+            print()
+            self.regrtest.interrupted = True
+        finally:
+            if self.test_timeout is not None:
+                faulthandler.cancel_dump_traceback_later()
+
+        self.wait_workers()
+
+
+def run_tests_multiprocess(regrtest):
+    MultiprocessRunner(regrtest).run_tests()
diff --git a/Lib/test/test_regrtest.py b/Lib/test/test_regrtest.py
index 5c65e6dd8520..e0d1d3cec7c2 100644
--- a/Lib/test/test_regrtest.py
+++ b/Lib/test/test_regrtest.py
@@ -916,13 +916,13 @@ def test_method2(self):
                                 testname)
         self.assertEqual(output.splitlines(), all_methods)
 
+    @support.cpython_only
     def test_crashed(self):
         # Any code which causes a crash
         code = 'import faulthandler; faulthandler._sigsegv()'
         crash_test = self.create_test(name="crash", code=code)
-        ok_test = self.create_test(name="ok")
 
-        tests = [crash_test, ok_test]
+        tests = [crash_test]
         output = self.run_tests("-j2", *tests, exitcode=2)
         self.check_executed_tests(output, tests, failed=crash_test,
                                   randomize=True)
diff --git a/Misc/NEWS.d/next/Tests/2019-04-26-04-12-29.bpo-36725.B8-ghi.rst b/Misc/NEWS.d/next/Tests/2019-04-26-04-12-29.bpo-36725.B8-ghi.rst
new file mode 100644
index 000000000000..b632c46d2b67
--- /dev/null
+++ b/Misc/NEWS.d/next/Tests/2019-04-26-04-12-29.bpo-36725.B8-ghi.rst
@@ -0,0 +1,3 @@
+When using mulitprocessing mode (-jN), regrtest now better reports errors if
+a worker process fails, and it exits immediately on a worker thread failure
+or when interrupted.



More information about the Python-checkins mailing list