[Python-checkins] bpo-36670: Enhance regrtest (GH-16556)

Victor Stinner webhook-mailer at python.org
Thu Oct 3 10:15:35 EDT 2019


https://github.com/python/cpython/commit/098e25672f1c3578855d5ded4f5147795c9ed956
commit: 098e25672f1c3578855d5ded4f5147795c9ed956
branch: master
author: Victor Stinner <vstinner at python.org>
committer: GitHub <noreply at github.com>
date: 2019-10-03T16:15:16+02:00
summary:

bpo-36670: Enhance regrtest (GH-16556)

* Add log() method: add timestamp and load average prefixes
  to main messages.
* WindowsLoadTracker:

  * LOAD_FACTOR_1 is now computed using SAMPLING_INTERVAL
  * Initialize the load to the arithmetic mean of the first 5 values
    of the Processor Queue Length value (so over 5 seconds), rather
    than 0.0.
  * Handle BrokenPipeError and when typeperf exit.

* format_duration(1.5) now returns '1.5 sec', rather than
  '1 sec 500 ms'

files:
M Lib/test/libregrtest/main.py
M Lib/test/libregrtest/runtest_mp.py
M Lib/test/libregrtest/utils.py
M Lib/test/libregrtest/win_utils.py
M Lib/test/test_regrtest.py

diff --git a/Lib/test/libregrtest/main.py b/Lib/test/libregrtest/main.py
index fd701c452ceb5..76ad3359f2d7a 100644
--- a/Lib/test/libregrtest/main.py
+++ b/Lib/test/libregrtest/main.py
@@ -139,16 +139,8 @@ def accumulate_result(self, result, rerun=False):
                     print(xml_data, file=sys.__stderr__)
                     raise
 
-    def display_progress(self, test_index, text):
-        if self.ns.quiet:
-            return
-
-        # "[ 51/405/1] test_tcl passed"
-        line = f"{test_index:{self.test_count_width}}{self.test_count}"
-        fails = len(self.bad) + len(self.environment_changed)
-        if fails and not self.ns.pgo:
-            line = f"{line}/{fails}"
-        line = f"[{line}] {text}"
+    def log(self, line=''):
+        empty = not line
 
         # add the system load prefix: "load avg: 1.80 "
         load_avg = self.getloadavg()
@@ -159,8 +151,23 @@ def display_progress(self, test_index, text):
         test_time = time.monotonic() - self.start_time
         test_time = datetime.timedelta(seconds=int(test_time))
         line = f"{test_time} {line}"
+
+        if empty:
+            line = line[:-1]
+
         print(line, flush=True)
 
+    def display_progress(self, test_index, text):
+        if self.ns.quiet:
+            return
+
+        # "[ 51/405/1] test_tcl passed"
+        line = f"{test_index:{self.test_count_width}}{self.test_count}"
+        fails = len(self.bad) + len(self.environment_changed)
+        if fails and not self.ns.pgo:
+            line = f"{line}/{fails}"
+        self.log(f"[{line}] {text}")
+
     def parse_args(self, kwargs):
         ns = _parse_args(sys.argv[1:], **kwargs)
 
@@ -302,11 +309,11 @@ def rerun_failed_tests(self):
 
         self.first_result = self.get_tests_result()
 
-        print()
-        print("Re-running failed tests in verbose mode")
+        self.log()
+        self.log("Re-running failed tests in verbose mode")
         self.rerun = self.bad[:]
         for test_name in self.rerun:
-            print(f"Re-running {test_name} in verbose mode", flush=True)
+            self.log(f"Re-running {test_name} in verbose mode")
             self.ns.verbose = True
             result = runtest(self.ns, test_name)
 
@@ -387,7 +394,7 @@ def run_tests_sequential(self):
 
         save_modules = sys.modules.keys()
 
-        print("Run tests sequentially")
+        self.log("Run tests sequentially")
 
         previous_test = None
         for test_index, test_name in enumerate(self.tests, 1):
diff --git a/Lib/test/libregrtest/runtest_mp.py b/Lib/test/libregrtest/runtest_mp.py
index a46c78248de39..79afa29fa05bf 100644
--- a/Lib/test/libregrtest/runtest_mp.py
+++ b/Lib/test/libregrtest/runtest_mp.py
@@ -104,13 +104,14 @@ class ExitThread(Exception):
 
 
 class TestWorkerProcess(threading.Thread):
-    def __init__(self, worker_id, pending, output, ns, timeout):
+    def __init__(self, worker_id, runner):
         super().__init__()
         self.worker_id = worker_id
-        self.pending = pending
-        self.output = output
-        self.ns = ns
-        self.timeout = timeout
+        self.pending = runner.pending
+        self.output = runner.output
+        self.ns = runner.ns
+        self.timeout = runner.worker_timeout
+        self.regrtest = runner.regrtest
         self.current_test_name = None
         self.start_time = None
         self._popen = None
@@ -294,7 +295,8 @@ def wait_stopped(self, start_time):
             if not self.is_alive():
                 break
             dt = time.monotonic() - start_time
-            print(f"Waiting for {self} thread for {format_duration(dt)}", flush=True)
+            self.regrtest.log(f"Waiting for {self} thread "
+                              f"for {format_duration(dt)}")
             if dt > JOIN_TIMEOUT:
                 print_warning(f"Failed to join {self} in {format_duration(dt)}")
                 break
@@ -316,6 +318,7 @@ def get_running(workers):
 class MultiprocessTestRunner:
     def __init__(self, regrtest):
         self.regrtest = regrtest
+        self.log = self.regrtest.log
         self.ns = regrtest.ns
         self.output = queue.Queue()
         self.pending = MultiprocessIterator(self.regrtest.tests)
@@ -326,11 +329,10 @@ def __init__(self, regrtest):
         self.workers = None
 
     def start_workers(self):
-        self.workers = [TestWorkerProcess(index, self.pending, self.output,
-                                          self.ns, self.worker_timeout)
+        self.workers = [TestWorkerProcess(index, self)
                         for index in range(1, self.ns.use_mp + 1)]
-        print("Run tests in parallel using %s child processes"
-              % len(self.workers))
+        self.log("Run tests in parallel using %s child processes"
+                 % len(self.workers))
         for worker in self.workers:
             worker.start()
 
@@ -364,7 +366,7 @@ def _get_result(self):
             # display progress
             running = get_running(self.workers)
             if running and not self.ns.pgo:
-                print('running: %s' % ', '.join(running), flush=True)
+                self.log('running: %s' % ', '.join(running))
 
     def display_result(self, mp_result):
         result = mp_result.result
@@ -384,8 +386,7 @@ def _process_result(self, item):
         if item[0]:
             # Thread got an exception
             format_exc = item[1]
-            print(f"regrtest worker thread failed: {format_exc}",
-                  file=sys.stderr, flush=True)
+            print_warning(f"regrtest worker thread failed: {format_exc}")
             return True
 
         self.test_index += 1
diff --git a/Lib/test/libregrtest/utils.py b/Lib/test/libregrtest/utils.py
index 2691a2c30ce80..40faed832c11b 100644
--- a/Lib/test/libregrtest/utils.py
+++ b/Lib/test/libregrtest/utils.py
@@ -17,11 +17,14 @@ def format_duration(seconds):
     if minutes:
         parts.append('%s min' % minutes)
     if seconds:
-        parts.append('%s sec' % seconds)
-    if ms:
-        parts.append('%s ms' % ms)
+        if parts:
+            # 2 min 1 sec
+            parts.append('%s sec' % seconds)
+        else:
+            # 1.0 sec
+            parts.append('%.1f sec' % (seconds + ms / 1000))
     if not parts:
-        return '0 ms'
+        return '%s ms' % ms
 
     parts = parts[:2]
     return ' '.join(parts)
diff --git a/Lib/test/libregrtest/win_utils.py b/Lib/test/libregrtest/win_utils.py
index 0b73a946307ee..028c01106dee0 100644
--- a/Lib/test/libregrtest/win_utils.py
+++ b/Lib/test/libregrtest/win_utils.py
@@ -1,4 +1,5 @@
 import _winapi
+import math
 import msvcrt
 import os
 import subprocess
@@ -10,11 +11,14 @@
 
 # Max size of asynchronous reads
 BUFSIZE = 8192
-# Exponential damping factor (see below)
-LOAD_FACTOR_1 = 0.9200444146293232478931553241
-
 # Seconds per measurement
 SAMPLING_INTERVAL = 1
+# Exponential damping factor to compute exponentially weighted moving average
+# on 1 minute (60 seconds)
+LOAD_FACTOR_1 = 1 / math.exp(SAMPLING_INTERVAL / 60)
+# Initialize the load using the arithmetic mean of the first NVALUE values
+# of the Processor Queue Length
+NVALUE = 5
 # Windows registry subkey of HKEY_LOCAL_MACHINE where the counter names
 # of typeperf are registered
 COUNTER_REGISTRY_KEY = (r"SOFTWARE\Microsoft\Windows NT\CurrentVersion"
@@ -30,10 +34,10 @@ class WindowsLoadTracker():
     """
 
     def __init__(self):
-        self.load = 0.0
-        self.counter_name = ''
+        self._values = []
+        self._load = None
         self._buffer = ''
-        self.popen = None
+        self._popen = None
         self.start()
 
     def start(self):
@@ -65,7 +69,7 @@ def start(self):
         # Spawn off the load monitor
         counter_name = self._get_counter_name()
         command = ['typeperf', counter_name, '-si', str(SAMPLING_INTERVAL)]
-        self.popen = subprocess.Popen(' '.join(command), stdout=command_stdout, cwd=support.SAVEDCWD)
+        self._popen = subprocess.Popen(' '.join(command), stdout=command_stdout, cwd=support.SAVEDCWD)
 
         # Close our copy of the write end of the pipe
         os.close(command_stdout)
@@ -85,12 +89,16 @@ def _get_counter_name(self):
         process_queue_length = counters_dict['44']
         return f'"\\{system}\\{process_queue_length}"'
 
-    def close(self):
-        if self.popen is None:
+    def close(self, kill=True):
+        if self._popen is None:
             return
-        self.popen.kill()
-        self.popen.wait()
-        self.popen = None
+
+        self._load = None
+
+        if kill:
+            self._popen.kill()
+        self._popen.wait()
+        self._popen = None
 
     def __del__(self):
         self.close()
@@ -109,7 +117,7 @@ def _parse_line(self, line):
         value = value[1:-1]
         return float(value)
 
-    def read_lines(self):
+    def _read_lines(self):
         overlapped, _ = _winapi.ReadFile(self.pipe, BUFSIZE, True)
         bytes_read, res = overlapped.GetOverlappedResult(False)
         if res != 0:
@@ -135,7 +143,21 @@ def read_lines(self):
         return lines
 
     def getloadavg(self):
-        for line in self.read_lines():
+        if self._popen is None:
+            return None
+
+        returncode = self._popen.poll()
+        if returncode is not None:
+            self.close(kill=False)
+            return None
+
+        try:
+            lines = self._read_lines()
+        except BrokenPipeError:
+            self.close()
+            return None
+
+        for line in lines:
             line = line.rstrip()
 
             # Ignore the initial header:
@@ -148,7 +170,7 @@ def getloadavg(self):
                 continue
 
             try:
-                load = self._parse_line(line)
+                processor_queue_length = self._parse_line(line)
             except ValueError:
                 print_warning("Failed to parse typeperf output: %a" % line)
                 continue
@@ -156,7 +178,13 @@ def getloadavg(self):
             # We use an exponentially weighted moving average, imitating the
             # load calculation on Unix systems.
             # https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation
-            new_load = self.load * LOAD_FACTOR_1 + load * (1.0 - LOAD_FACTOR_1)
-            self.load = new_load
-
-        return self.load
+            # https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
+            if self._load is not None:
+                self._load = (self._load * LOAD_FACTOR_1
+                              + processor_queue_length  * (1.0 - LOAD_FACTOR_1))
+            elif len(self._values) < NVALUE:
+                self._values.append(processor_queue_length)
+            else:
+                self._load = sum(self._values) / len(self._values)
+
+        return self._load
diff --git a/Lib/test/test_regrtest.py b/Lib/test/test_regrtest.py
index fa37ecde37623..aaa97e040804a 100644
--- a/Lib/test/test_regrtest.py
+++ b/Lib/test/test_regrtest.py
@@ -25,6 +25,7 @@
 Py_DEBUG = hasattr(sys, 'gettotalrefcount')
 ROOT_DIR = os.path.join(os.path.dirname(__file__), '..', '..')
 ROOT_DIR = os.path.abspath(os.path.normpath(ROOT_DIR))
+LOG_PREFIX = r'[0-9]+:[0-9]+:[0-9]+ (?:load avg: [0-9]+\.[0-9]{2} )?'
 
 TEST_INTERRUPTED = textwrap.dedent("""
     from signal import SIGINT, raise_signal
@@ -388,8 +389,8 @@ def check_line(self, output, regex):
         self.assertRegex(output, regex)
 
     def parse_executed_tests(self, output):
-        regex = (r'^[0-9]+:[0-9]+:[0-9]+ (?:load avg: [0-9]+\.[0-9]{2} )?\[ *[0-9]+(?:/ *[0-9]+)*\] (%s)'
-                 % self.TESTNAME_REGEX)
+        regex = (r'^%s\[ *[0-9]+(?:/ *[0-9]+)*\] (%s)'
+                 % (LOG_PREFIX, self.TESTNAME_REGEX))
         parser = re.finditer(regex, output, re.MULTILINE)
         return list(match.group(1) for match in parser)
 
@@ -449,9 +450,10 @@ def list_regex(line_format, tests):
         if rerun:
             regex = list_regex('%s re-run test%s', rerun)
             self.check_line(output, regex)
-            self.check_line(output, "Re-running failed tests in verbose mode")
+            regex = LOG_PREFIX + r"Re-running failed tests in verbose mode"
+            self.check_line(output, regex)
             for test_name in rerun:
-                regex = f"Re-running {test_name} in verbose mode"
+                regex = LOG_PREFIX + f"Re-running {test_name} in verbose mode"
                 self.check_line(output, regex)
 
         if no_test_ran:
@@ -1232,9 +1234,9 @@ def test_format_duration(self):
         self.assertEqual(utils.format_duration(10e-3),
                          '10 ms')
         self.assertEqual(utils.format_duration(1.5),
-                         '1 sec 500 ms')
+                         '1.5 sec')
         self.assertEqual(utils.format_duration(1),
-                         '1 sec')
+                         '1.0 sec')
         self.assertEqual(utils.format_duration(2 * 60),
                          '2 min')
         self.assertEqual(utils.format_duration(2 * 60 + 1),



More information about the Python-checkins mailing list