[Python-checkins] bpo-36725: Refactor regrtest multiprocessing code (GH-12961)
Victor Stinner
webhook-mailer at python.org
Fri Apr 26 02:40:46 EDT 2019
https://github.com/python/cpython/commit/3cde440f20a9db75fb2c4e65e8e4d04a53216a2d
commit: 3cde440f20a9db75fb2c4e65e8e4d04a53216a2d
branch: master
author: Victor Stinner <vstinner at redhat.com>
committer: GitHub <noreply at github.com>
date: 2019-04-26T08:40:25+02:00
summary:
bpo-36725: Refactor regrtest multiprocessing code (GH-12961)
Rewrite run_tests_multiprocess() function as a new MultiprocessRunner
class with multiple methods to better report errors and stop
immediately when needed.
Changes:
* Worker processes are now killed immediately if tests are
interrupted or if a test does crash (CHILD_ERROR): worker
processes are killed.
* Rewrite how errors in a worker thread are reported to
the main thread. No longer ignore BaseException or parsing errors
silently.
* Remove 'finished' variable: use worker.is_alive() instead
* Always compute omitted tests. Add Regrtest.get_executed() method.
files:
A Misc/NEWS.d/next/Tests/2019-04-26-04-12-29.bpo-36725.B8-ghi.rst
M Lib/test/libregrtest/main.py
M Lib/test/libregrtest/runtest.py
M Lib/test/libregrtest/runtest_mp.py
M Lib/test/test_regrtest.py
diff --git a/Lib/test/libregrtest/main.py b/Lib/test/libregrtest/main.py
index ef1336a7e233..606dc268ae3f 100644
--- a/Lib/test/libregrtest/main.py
+++ b/Lib/test/libregrtest/main.py
@@ -79,8 +79,8 @@ def __init__(self):
self.skipped = []
self.resource_denieds = []
self.environment_changed = []
- self.rerun = []
self.run_no_tests = []
+ self.rerun = []
self.first_result = None
self.interrupted = False
@@ -105,6 +105,11 @@ def __init__(self):
# used by --junit-xml
self.testsuite_xml = None
+ def get_executed(self):
+ return (set(self.good) | set(self.bad) | set(self.skipped)
+ | set(self.resource_denieds) | set(self.environment_changed)
+ | set(self.run_no_tests))
+
def accumulate_result(self, result):
test_name = result.test_name
ok = result.result
@@ -311,8 +316,6 @@ def rerun_failed_tests(self):
self.bad.remove(test_name)
if ok.result == INTERRUPTED:
- # print a newline separate from the ^C
- print()
self.interrupted = True
break
else:
@@ -331,11 +334,11 @@ def display_result(self):
print("== Tests result: %s ==" % self.get_tests_result())
if self.interrupted:
- print()
- # print a newline after ^C
print("Test suite interrupted by signal SIGINT.")
- executed = set(self.good) | set(self.bad) | set(self.skipped)
- omitted = set(self.selected) - executed
+
+ omitted = set(self.selected) - self.get_executed()
+ if omitted:
+ print()
print(count(len(omitted), "test"), "omitted:")
printlist(omitted)
diff --git a/Lib/test/libregrtest/runtest.py b/Lib/test/libregrtest/runtest.py
index 55913b3842d7..c0cfa5312f70 100644
--- a/Lib/test/libregrtest/runtest.py
+++ b/Lib/test/libregrtest/runtest.py
@@ -275,6 +275,7 @@ def _runtest_inner(ns, test_name, display_failure=True):
except support.TestDidNotRun:
return TEST_DID_NOT_RUN
except KeyboardInterrupt:
+ print()
return INTERRUPTED
except:
if not ns.pgo:
diff --git a/Lib/test/libregrtest/runtest_mp.py b/Lib/test/libregrtest/runtest_mp.py
index 0a95bf622b05..e6c4f4f74a1e 100644
--- a/Lib/test/libregrtest/runtest_mp.py
+++ b/Lib/test/libregrtest/runtest_mp.py
@@ -3,9 +3,11 @@
import json
import os
import queue
+import subprocess
import sys
import threading
import time
+import traceback
import types
from test import support
@@ -19,20 +21,12 @@
# Display the running tests if nothing happened last N seconds
PROGRESS_UPDATE = 30.0 # seconds
-# If interrupted, display the wait progress every N seconds
-WAIT_PROGRESS = 2.0 # seconds
+def must_stop(result):
+ return result.result in (INTERRUPTED, CHILD_ERROR)
-def run_test_in_subprocess(testname, ns):
- """Run the given test in a subprocess with --worker-args.
-
- ns is the option Namespace parsed from command-line arguments. regrtest
- is invoked in a subprocess with the --worker-args argument; when the
- subprocess exits, its return code, stdout and stderr are returned as a
- 3-tuple.
- """
- from subprocess import Popen, PIPE
+def run_test_in_subprocess(testname, ns):
ns_dict = vars(ns)
worker_args = (ns_dict, testname)
worker_args = json.dumps(worker_args)
@@ -47,15 +41,12 @@ def run_test_in_subprocess(testname, ns):
# Running the child from the same working directory as regrtest's original
# invocation ensures that TEMPDIR for the child is the same when
# sysconfig.is_python_build() is true. See issue 15300.
- popen = Popen(cmd,
- stdout=PIPE, stderr=PIPE,
- universal_newlines=True,
- close_fds=(os.name != 'nt'),
- cwd=support.SAVEDCWD)
- with popen:
- stdout, stderr = popen.communicate()
- retcode = popen.wait()
- return retcode, stdout, stderr
+ return subprocess.Popen(cmd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ universal_newlines=True,
+ close_fds=(os.name != 'nt'),
+ cwd=support.SAVEDCWD)
def run_tests_worker(worker_args):
@@ -66,7 +57,6 @@ def run_tests_worker(worker_args):
result = runtest(ns, testname)
print() # Force a newline (just in case)
-
print(json.dumps(result), flush=True)
sys.exit(0)
@@ -77,7 +67,6 @@ class MultiprocessIterator:
"""A thread-safe iterator over tests for multiprocess mode."""
def __init__(self, tests):
- self.interrupted = False
self.lock = threading.Lock()
self.tests = tests
@@ -86,8 +75,6 @@ def __iter__(self):
def __next__(self):
with self.lock:
- if self.interrupted:
- raise StopIteration('tests interrupted')
return next(self.tests)
@@ -102,143 +89,202 @@ def __init__(self, pending, output, ns):
self.ns = ns
self.current_test_name = None
self.start_time = None
+ self._popen = None
- def _runtest(self):
- try:
- test_name = next(self.pending)
- except StopIteration:
- self.output.put(None)
- return True
+ def kill(self):
+ if not self.is_alive():
+ return
+ if self._popen is not None:
+ self._popen.kill()
+ def _runtest(self, test_name):
try:
self.start_time = time.monotonic()
self.current_test_name = test_name
- retcode, stdout, stderr = run_test_in_subprocess(test_name, self.ns)
+ popen = run_test_in_subprocess(test_name, self.ns)
+ self._popen = popen
+ with popen:
+ try:
+ stdout, stderr = popen.communicate()
+ except:
+ popen.kill()
+ popen.wait()
+ raise
+
+ retcode = popen.wait()
finally:
self.current_test_name = None
+ self._popen = None
+
+ stdout = stdout.strip()
+ stderr = stderr.rstrip()
+ err_msg = None
if retcode != 0:
+ err_msg = "Exit code %s" % retcode
+ else:
+ stdout, _, result = stdout.rpartition("\n")
+ stdout = stdout.rstrip()
+ if not result:
+ err_msg = "Failed to parse worker stdout"
+ else:
+ try:
+ # deserialize run_tests_worker() output
+ result = json.loads(result)
+ result = TestResult(*result)
+ except Exception as exc:
+ err_msg = "Failed to parse worker JSON: %s" % exc
+
+ if err_msg is not None:
test_time = time.monotonic() - self.start_time
result = TestResult(test_name, CHILD_ERROR, test_time, None)
- err_msg = "Exit code %s" % retcode
- mp_result = MultiprocessResult(result, stdout.rstrip(), stderr.rstrip(), err_msg)
- self.output.put(mp_result)
- return False
- stdout, _, result = stdout.strip().rpartition("\n")
- if not result:
- self.output.put(None)
- return True
-
- # deserialize run_tests_worker() output
- result = json.loads(result)
- result = TestResult(*result)
- mp_result = MultiprocessResult(result, stdout.rstrip(), stderr.rstrip(), None)
- self.output.put(mp_result)
- return False
+ return MultiprocessResult(result, stdout, stderr, err_msg)
def run(self):
- try:
- stop = False
- while not stop:
- stop = self._runtest()
- except BaseException:
- self.output.put(None)
- raise
+ while True:
+ try:
+ try:
+ test_name = next(self.pending)
+ except StopIteration:
+ break
+ mp_result = self._runtest(test_name)
+ self.output.put((False, mp_result))
-def run_tests_multiprocess(regrtest):
- output = queue.Queue()
- pending = MultiprocessIterator(regrtest.tests)
- test_timeout = regrtest.ns.timeout
- use_timeout = (test_timeout is not None)
-
- workers = [MultiprocessThread(pending, output, regrtest.ns)
- for i in range(regrtest.ns.use_mp)]
- print("Run tests in parallel using %s child processes"
- % len(workers))
+ if must_stop(mp_result.result):
+ break
+ except BaseException:
+ self.output.put((True, traceback.format_exc()))
+ break
+
+
+def get_running(workers):
+ running = []
for worker in workers:
- worker.start()
-
- def get_running(workers):
- running = []
- for worker in workers:
- current_test_name = worker.current_test_name
- if not current_test_name:
- continue
- dt = time.monotonic() - worker.start_time
- if dt >= PROGRESS_MIN_TIME:
- text = '%s (%s)' % (current_test_name, format_duration(dt))
- running.append(text)
- return running
-
- finished = 0
- test_index = 1
- get_timeout = max(PROGRESS_UPDATE, PROGRESS_MIN_TIME)
- try:
- while finished < regrtest.ns.use_mp:
- if use_timeout:
- faulthandler.dump_traceback_later(test_timeout, exit=True)
+ current_test_name = worker.current_test_name
+ if not current_test_name:
+ continue
+ dt = time.monotonic() - worker.start_time
+ if dt >= PROGRESS_MIN_TIME:
+ text = '%s (%s)' % (current_test_name, format_duration(dt))
+ running.append(text)
+ return running
+
+
+class MultiprocessRunner:
+ def __init__(self, regrtest):
+ self.regrtest = regrtest
+ self.ns = regrtest.ns
+ self.output = queue.Queue()
+ self.pending = MultiprocessIterator(self.regrtest.tests)
+ if self.ns.timeout is not None:
+ self.test_timeout = self.ns.timeout * 1.5
+ else:
+ self.test_timeout = None
+ self.workers = None
+
+ def start_workers(self):
+ self.workers = [MultiprocessThread(self.pending, self.output, self.ns)
+ for _ in range(self.ns.use_mp)]
+ print("Run tests in parallel using %s child processes"
+ % len(self.workers))
+ for worker in self.workers:
+ worker.start()
+
+ def wait_workers(self):
+ for worker in self.workers:
+ worker.kill()
+ for worker in self.workers:
+ worker.join()
+
+ def _get_result(self):
+ if not any(worker.is_alive() for worker in self.workers):
+ # all worker threads are done: consume pending results
+ try:
+ return self.output.get(timeout=0)
+ except queue.Empty:
+ return None
+
+ while True:
+ if self.test_timeout is not None:
+ faulthandler.dump_traceback_later(self.test_timeout, exit=True)
+ # wait for a thread
+ timeout = max(PROGRESS_UPDATE, PROGRESS_MIN_TIME)
try:
- mp_result = output.get(timeout=get_timeout)
+ return self.output.get(timeout=timeout)
except queue.Empty:
- running = get_running(workers)
- if running and not regrtest.ns.pgo:
- print('running: %s' % ', '.join(running), flush=True)
- continue
-
- if mp_result is None:
- finished += 1
- continue
- result = mp_result.result
- regrtest.accumulate_result(result)
-
- # Display progress
- ok = result.result
-
- text = format_test_result(result)
- if (ok not in (CHILD_ERROR, INTERRUPTED)
- and result.test_time >= PROGRESS_MIN_TIME
- and not regrtest.ns.pgo):
- text += ' (%s)' % format_duration(result.test_time)
- elif ok == CHILD_ERROR:
- text = '%s (%s)' % (text, mp_result.error_msg)
- running = get_running(workers)
- if running and not regrtest.ns.pgo:
- text += ' -- running: %s' % ', '.join(running)
- regrtest.display_progress(test_index, text)
-
- # Copy stdout and stderr from the child process
- if mp_result.stdout:
- print(mp_result.stdout, flush=True)
- if mp_result.stderr and not regrtest.ns.pgo:
- print(mp_result.stderr, file=sys.stderr, flush=True)
-
- if result.result == INTERRUPTED:
- raise KeyboardInterrupt
- test_index += 1
- except KeyboardInterrupt:
- regrtest.interrupted = True
- pending.interrupted = True
- print()
- finally:
- if use_timeout:
- faulthandler.cancel_dump_traceback_later()
-
- # If tests are interrupted, wait until tests complete
- wait_start = time.monotonic()
- while True:
- running = [worker.current_test_name for worker in workers]
- running = list(filter(bool, running))
- if not running:
- break
-
- dt = time.monotonic() - wait_start
- line = "Waiting for %s (%s tests)" % (', '.join(running), len(running))
- if dt >= WAIT_PROGRESS:
- line = "%s since %.0f sec" % (line, dt)
- print(line, flush=True)
- for worker in workers:
- worker.join(WAIT_PROGRESS)
+ pass
+
+ # display progress
+ running = get_running(self.workers)
+ if running and not self.ns.pgo:
+ print('running: %s' % ', '.join(running), flush=True)
+
+ def display_result(self, mp_result):
+ result = mp_result.result
+
+ text = format_test_result(result)
+ if mp_result.error_msg is not None:
+ # CHILD_ERROR
+ text += ' (%s)' % mp_result.error_msg
+ elif (result.test_time >= PROGRESS_MIN_TIME and not self.ns.pgo):
+ text += ' (%s)' % format_duration(result.test_time)
+ running = get_running(self.workers)
+ if running and not self.ns.pgo:
+ text += ' -- running: %s' % ', '.join(running)
+ self.regrtest.display_progress(self.test_index, text)
+
+ def _process_result(self, item):
+ if item[0]:
+ # Thread got an exception
+ format_exc = item[1]
+ print(f"regrtest worker thread failed: {format_exc}",
+ file=sys.stderr, flush=True)
+ return True
+
+ self.test_index += 1
+ mp_result = item[1]
+ self.regrtest.accumulate_result(mp_result.result)
+ self.display_result(mp_result)
+
+ if mp_result.stdout:
+ print(mp_result.stdout, flush=True)
+ if mp_result.stderr and not self.ns.pgo:
+ print(mp_result.stderr, file=sys.stderr, flush=True)
+
+ if mp_result.result.result == INTERRUPTED:
+ self.regrtest.interrupted = True
+
+ if must_stop(mp_result.result):
+ return True
+
+ return False
+
+ def run_tests(self):
+ self.start_workers()
+
+ self.test_index = 0
+ try:
+ while True:
+ item = self._get_result()
+ if item is None:
+ break
+
+ stop = self._process_result(item)
+ if stop:
+ break
+ except KeyboardInterrupt:
+ print()
+ self.regrtest.interrupted = True
+ finally:
+ if self.test_timeout is not None:
+ faulthandler.cancel_dump_traceback_later()
+
+ self.wait_workers()
+
+
+def run_tests_multiprocess(regrtest):
+ MultiprocessRunner(regrtest).run_tests()
diff --git a/Lib/test/test_regrtest.py b/Lib/test/test_regrtest.py
index 5c65e6dd8520..e0d1d3cec7c2 100644
--- a/Lib/test/test_regrtest.py
+++ b/Lib/test/test_regrtest.py
@@ -916,13 +916,13 @@ def test_method2(self):
testname)
self.assertEqual(output.splitlines(), all_methods)
+ @support.cpython_only
def test_crashed(self):
# Any code which causes a crash
code = 'import faulthandler; faulthandler._sigsegv()'
crash_test = self.create_test(name="crash", code=code)
- ok_test = self.create_test(name="ok")
- tests = [crash_test, ok_test]
+ tests = [crash_test]
output = self.run_tests("-j2", *tests, exitcode=2)
self.check_executed_tests(output, tests, failed=crash_test,
randomize=True)
diff --git a/Misc/NEWS.d/next/Tests/2019-04-26-04-12-29.bpo-36725.B8-ghi.rst b/Misc/NEWS.d/next/Tests/2019-04-26-04-12-29.bpo-36725.B8-ghi.rst
new file mode 100644
index 000000000000..b632c46d2b67
--- /dev/null
+++ b/Misc/NEWS.d/next/Tests/2019-04-26-04-12-29.bpo-36725.B8-ghi.rst
@@ -0,0 +1,3 @@
+When using mulitprocessing mode (-jN), regrtest now better reports errors if
+a worker process fails, and it exits immediately on a worker thread failure
+or when interrupted.
More information about the Python-checkins
mailing list