[Python-checkins] bpo-36670: Multiple regrtest bugfixes (GH-16511)

Miss Islington (bot) webhook-mailer at python.org
Tue Oct 1 06:58:57 EDT 2019


https://github.com/python/cpython/commit/d6a92b55944bf1ef4992e4375f02a7132717bf53
commit: d6a92b55944bf1ef4992e4375f02a7132717bf53
branch: 3.7
author: Miss Islington (bot) <31488909+miss-islington at users.noreply.github.com>
committer: GitHub <noreply at github.com>
date: 2019-10-01T03:58:53-07:00
summary:

bpo-36670: Multiple regrtest bugfixes (GH-16511)


* Windows: Fix counter name in WindowsLoadTracker. Counter names are
  localized: use the registry to get the counter name. Original
  change written by Lorenz Mende.
* Regrtest.main() now ensures that the Windows load tracker is also
  killed if an exception is raised
* TestWorkerProcess now ensures that worker processes are no longer
  running before exiting: kill also worker processes when an
  exception is raised.
* Enhance regrtest messages and warnings: include test name,
  duration, add a worker identifier, etc.
* Rename MultiprocessRunner to TestWorkerProcess
* Use print_warning() to display warnings.

Co-Authored-By: Lorenz Mende <Lorenz.mende at gmail.com>
(cherry picked from commit 982bfa4da07b2e5749a0f4e68f99e972bcc3a549)

Co-authored-by: Victor Stinner <vstinner at redhat.com>

files:
M Lib/test/libregrtest/main.py
M Lib/test/libregrtest/runtest_mp.py
M Lib/test/libregrtest/win_utils.py

diff --git a/Lib/test/libregrtest/main.py b/Lib/test/libregrtest/main.py
index a59a03d4f2512..5ae8878638a5c 100644
--- a/Lib/test/libregrtest/main.py
+++ b/Lib/test/libregrtest/main.py
@@ -503,10 +503,6 @@ def run_tests(self):
             self.run_tests_sequential()
 
     def finalize(self):
-        if self.win_load_tracker is not None:
-            self.win_load_tracker.close()
-            self.win_load_tracker = None
-
         if self.next_single_filename:
             if self.next_single_test:
                 with open(self.next_single_filename, 'w') as fp:
@@ -674,11 +670,16 @@ def _main(self, tests, kwargs):
                 # typeperf.exe for x64, x86 or ARM
                 print(f'Failed to create WindowsLoadTracker: {error}')
 
-        self.run_tests()
-        self.display_result()
-
-        if self.ns.verbose2 and self.bad:
-            self.rerun_failed_tests()
+        try:
+            self.run_tests()
+            self.display_result()
+
+            if self.ns.verbose2 and self.bad:
+                self.rerun_failed_tests()
+        finally:
+            if self.win_load_tracker is not None:
+                self.win_load_tracker.close()
+                self.win_load_tracker = None
 
         self.finalize()
 
diff --git a/Lib/test/libregrtest/runtest_mp.py b/Lib/test/libregrtest/runtest_mp.py
index 9cb5be6bb8ae8..38b05781de5fc 100644
--- a/Lib/test/libregrtest/runtest_mp.py
+++ b/Lib/test/libregrtest/runtest_mp.py
@@ -15,7 +15,7 @@
     runtest, INTERRUPTED, CHILD_ERROR, PROGRESS_MIN_TIME,
     format_test_result, TestResult, is_failed, TIMEOUT)
 from test.libregrtest.setup import setup_tests
-from test.libregrtest.utils import format_duration
+from test.libregrtest.utils import format_duration, print_warning
 
 
 # Display the running tests if nothing happened last N seconds
@@ -103,9 +103,10 @@ class ExitThread(Exception):
     pass
 
 
-class MultiprocessThread(threading.Thread):
-    def __init__(self, pending, output, ns, timeout):
+class TestWorkerProcess(threading.Thread):
+    def __init__(self, worker_id, pending, output, ns, timeout):
         super().__init__()
+        self.worker_id = worker_id
         self.pending = pending
         self.output = output
         self.ns = ns
@@ -114,12 +115,16 @@ def __init__(self, pending, output, ns, timeout):
         self.start_time = None
         self._popen = None
         self._killed = False
+        self._stopped = False
 
     def __repr__(self):
-        info = ['MultiprocessThread']
-        test = self.current_test_name
+        info = [f'TestWorkerProcess #{self.worker_id}']
         if self.is_alive():
-            info.append('alive')
+            dt = time.monotonic() - self.start_time
+            info.append("running for %s" % format_duration(dt))
+        else:
+            info.append('stopped')
+        test = self.current_test_name
         if test:
             info.append(f'test={test}')
         popen = self._popen
@@ -128,53 +133,24 @@ def __repr__(self):
         return '<%s>' % ' '.join(info)
 
     def _kill(self):
-        dt = time.monotonic() - self.start_time
+        if self._killed:
+            return
+        self._killed = True
 
         popen = self._popen
-        pid = popen.pid
-        print("Kill worker process %s running for %.1f sec" % (pid, dt),
-              file=sys.stderr, flush=True)
+        if popen is None:
+            return
 
+        print(f"Kill {self}", file=sys.stderr, flush=True)
         try:
             popen.kill()
-            return True
         except OSError as exc:
-            print("WARNING: Failed to kill worker process %s: %r" % (pid, exc),
-                  file=sys.stderr, flush=True)
-            return False
-
-    def _close_wait(self):
-        popen = self._popen
-
-        # stdout and stderr must be closed to ensure that communicate()
-        # does not hang
-        popen.stdout.close()
-        popen.stderr.close()
-
-        try:
-            popen.wait(JOIN_TIMEOUT)
-        except (subprocess.TimeoutExpired, OSError) as exc:
-            print("WARNING: Failed to wait for worker process %s "
-                  "completion (timeout=%.1f sec): %r"
-                  % (popen.pid, JOIN_TIMEOUT, exc),
-                  file=sys.stderr, flush=True)
-
-    def kill(self):
-        """
-        Kill the current process (if any).
-
-        This method can be called by the thread running the process,
-        or by another thread.
-        """
-        self._killed = True
-
-        if self._popen is None:
-            return
-
-        if not self._kill():
-            return
+            print_warning(f"Failed to kill {self}: {exc!r}")
 
-        self._close_wait()
+    def stop(self):
+        # Method called from a different thread to stop this thread
+        self._stopped = True
+        self._kill()
 
     def mp_result_error(self, test_name, error_type, stdout='', stderr='',
                         err_msg=None):
@@ -190,59 +166,69 @@ def _timedout(self, test_name):
         try:
             stdout, stderr = popen.communicate(timeout=JOIN_TIMEOUT)
         except (subprocess.TimeoutExpired, OSError) as exc:
-            print("WARNING: Failed to read worker process %s output "
-                  "(timeout=%.1f sec): %r"
-                  % (popen.pid, JOIN_TIMEOUT, exc),
-                  file=sys.stderr, flush=True)
-
-        self._close_wait()
+            print_warning(f"Failed to read {self} output "
+                          f"(timeout={format_duration(JOIN_TIMEOUT)}): "
+                          f"{exc!r}")
 
         return self.mp_result_error(test_name, TIMEOUT, stdout, stderr)
 
-    def _runtest(self, test_name):
-        try:
-            self.start_time = time.monotonic()
-            self.current_test_name = test_name
+    def _run_process(self, test_name):
+        self.start_time = time.monotonic()
 
+        self.current_test_name = test_name
+        try:
+            self._killed = False
             self._popen = run_test_in_subprocess(test_name, self.ns)
             popen = self._popen
+        except:
+            self.current_test_name = None
+            raise
+
+        try:
+            if self._stopped:
+                # If kill() has been called before self._popen is set,
+                # self._popen is still running. Call again kill()
+                # to ensure that the process is killed.
+                self._kill()
+                raise ExitThread
+
             try:
-                try:
-                    if self._killed:
-                        # If kill() has been called before self._popen is set,
-                        # self._popen is still running. Call again kill()
-                        # to ensure that the process is killed.
-                        self.kill()
-                        raise ExitThread
-
-                    try:
-                        stdout, stderr = popen.communicate(timeout=self.timeout)
-                    except subprocess.TimeoutExpired:
-                        if self._killed:
-                            # kill() has been called: communicate() fails
-                            # on reading closed stdout/stderr
-                            raise ExitThread
-
-                        return self._timedout(test_name)
-                    except OSError:
-                        if self._killed:
-                            # kill() has been called: communicate() fails
-                            # on reading closed stdout/stderr
-                            raise ExitThread
-                        raise
-                except:
-                    self.kill()
-                    raise
-            finally:
-                self._close_wait()
+                stdout, stderr = popen.communicate(timeout=self.timeout)
+            except subprocess.TimeoutExpired:
+                if self._stopped:
+                    # kill() has been called: communicate() fails
+                    # on reading closed stdout/stderr
+                    raise ExitThread
+
+                return self._timedout(test_name)
+            except OSError:
+                if self._stopped:
+                    # kill() has been called: communicate() fails
+                    # on reading closed stdout/stderr
+                    raise ExitThread
+                raise
 
             retcode = popen.returncode
+            stdout = stdout.strip()
+            stderr = stderr.rstrip()
+
+            return (retcode, stdout, stderr)
+        except:
+            self._kill()
+            raise
         finally:
-            self.current_test_name = None
+            self._wait_completed()
             self._popen = None
+            self.current_test_name = None
+
+    def _runtest(self, test_name):
+        result = self._run_process(test_name)
 
-        stdout = stdout.strip()
-        stderr = stderr.rstrip()
+        if isinstance(result, MultiprocessResult):
+            # _timedout() case
+            return result
+
+        retcode, stdout, stderr = result
 
         err_msg = None
         if retcode != 0:
@@ -266,7 +252,7 @@ def _runtest(self, test_name):
         return MultiprocessResult(result, stdout, stderr, err_msg)
 
     def run(self):
-        while not self._killed:
+        while not self._stopped:
             try:
                 try:
                     test_name = next(self.pending)
@@ -284,6 +270,33 @@ def run(self):
                 self.output.put((True, traceback.format_exc()))
                 break
 
+    def _wait_completed(self):
+        popen = self._popen
+
+        # stdout and stderr must be closed to ensure that communicate()
+        # does not hang
+        popen.stdout.close()
+        popen.stderr.close()
+
+        try:
+            popen.wait(JOIN_TIMEOUT)
+        except (subprocess.TimeoutExpired, OSError) as exc:
+            print_warning(f"Failed to wait for {self} completion "
+                          f"(timeout={format_duration(JOIN_TIMEOUT)}): "
+                          f"{exc!r}")
+
+    def wait_stopped(self, start_time):
+        while True:
+            # Write a message every second
+            self.join(1.0)
+            if not self.is_alive():
+                break
+            dt = time.monotonic() - start_time
+            print(f"Waiting for {self} thread for {format_duration(dt)}", flush=True)
+            if dt > JOIN_TIMEOUT:
+                print_warning(f"Failed to join {self} in {format_duration(dt)}")
+                break
+
 
 def get_running(workers):
     running = []
@@ -298,7 +311,7 @@ def get_running(workers):
     return running
 
 
-class MultiprocessRunner:
+class MultiprocessTestRunner:
     def __init__(self, regrtest):
         self.regrtest = regrtest
         self.ns = regrtest.ns
@@ -311,30 +324,20 @@ def __init__(self, regrtest):
         self.workers = None
 
     def start_workers(self):
-        self.workers = [MultiprocessThread(self.pending, self.output,
-                                           self.ns, self.worker_timeout)
-                        for _ in range(self.ns.use_mp)]
+        self.workers = [TestWorkerProcess(index, self.pending, self.output,
+                                          self.ns, self.worker_timeout)
+                        for index in range(1, self.ns.use_mp + 1)]
         print("Run tests in parallel using %s child processes"
               % len(self.workers))
         for worker in self.workers:
             worker.start()
 
-    def wait_workers(self):
+    def stop_workers(self):
         start_time = time.monotonic()
         for worker in self.workers:
-            worker.kill()
+            worker.stop()
         for worker in self.workers:
-            while True:
-                worker.join(1.0)
-                if not worker.is_alive():
-                    break
-                dt = time.monotonic() - start_time
-                print("Wait for regrtest worker %r for %.1f sec" % (worker, dt),
-                      flush=True)
-                if dt > JOIN_TIMEOUT:
-                    print("Warning -- failed to join a regrtest worker %s"
-                          % worker, flush=True)
-                    break
+            worker.wait_stopped(start_time)
 
     def _get_result(self):
         if not any(worker.is_alive() for worker in self.workers):
@@ -418,10 +421,11 @@ def run_tests(self):
             if self.ns.timeout is not None:
                 faulthandler.cancel_dump_traceback_later()
 
-        # a test failed (and --failfast is set) or all tests completed
-        self.pending.stop()
-        self.wait_workers()
+            # Always ensure that all worker processes are no longer
+            # worker when we exit this function
+            self.pending.stop()
+            self.stop_workers()
 
 
 def run_tests_multiprocess(regrtest):
-    MultiprocessRunner(regrtest).run_tests()
+    MultiprocessTestRunner(regrtest).run_tests()
diff --git a/Lib/test/libregrtest/win_utils.py b/Lib/test/libregrtest/win_utils.py
index ec2d6c663e83f..f0c17b906f519 100644
--- a/Lib/test/libregrtest/win_utils.py
+++ b/Lib/test/libregrtest/win_utils.py
@@ -3,16 +3,22 @@
 import os
 import subprocess
 import uuid
+import winreg
 from test import support
+from test.libregrtest.utils import print_warning
 
 
 # Max size of asynchronous reads
 BUFSIZE = 8192
 # Exponential damping factor (see below)
 LOAD_FACTOR_1 = 0.9200444146293232478931553241
+
 # Seconds per measurement
 SAMPLING_INTERVAL = 5
-COUNTER_NAME = r'\System\Processor Queue Length'
+# Windows registry subkey of HKEY_LOCAL_MACHINE where the counter names
+# of typeperf are registered
+COUNTER_REGISTRY_KEY = (r"SOFTWARE\Microsoft\Windows NT\CurrentVersion"
+                        r"\Perflib\CurrentLanguage")
 
 
 class WindowsLoadTracker():
@@ -25,7 +31,8 @@ class WindowsLoadTracker():
 
     def __init__(self):
         self.load = 0.0
-        self.p = None
+        self.counter_name = ''
+        self.popen = None
         self.start()
 
     def start(self):
@@ -55,31 +62,46 @@ def start(self):
         overlap.GetOverlappedResult(True)
 
         # Spawn off the load monitor
-        command = ['typeperf', COUNTER_NAME, '-si', str(SAMPLING_INTERVAL)]
-        self.p = subprocess.Popen(command, stdout=command_stdout, cwd=support.SAVEDCWD)
+        counter_name = self._get_counter_name()
+        command = ['typeperf', counter_name, '-si', str(SAMPLING_INTERVAL)]
+        self.popen = subprocess.Popen(' '.join(command), stdout=command_stdout, cwd=support.SAVEDCWD)
 
         # Close our copy of the write end of the pipe
         os.close(command_stdout)
 
+    def _get_counter_name(self):
+        # accessing the registry to get the counter localization name
+        with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, COUNTER_REGISTRY_KEY) as perfkey:
+            counters = winreg.QueryValueEx(perfkey, 'Counter')[0]
+
+        # Convert [key1, value1, key2, value2, ...] list
+        # to {key1: value1, key2: value2, ...} dict
+        counters = iter(counters)
+        counters_dict = dict(zip(counters, counters))
+
+        # System counter has key '2' and Processor Queue Length has key '44'
+        system = counters_dict['2']
+        process_queue_length = counters_dict['44']
+        return f'"\\{system}\\{process_queue_length}"'
+
     def close(self):
-        if self.p is None:
+        if self.popen is None:
             return
-        self.p.kill()
-        self.p.wait()
-        self.p = None
+        self.popen.kill()
+        self.popen.wait()
+        self.popen = None
 
     def __del__(self):
         self.close()
 
     def read_output(self):
-        import _winapi
-
         overlapped, _ = _winapi.ReadFile(self.pipe, BUFSIZE, True)
         bytes_read, res = overlapped.GetOverlappedResult(False)
         if res != 0:
             return
 
-        return overlapped.getbuffer().decode()
+        output = overlapped.getbuffer()
+        return output.decode('oem', 'replace')
 
     def getloadavg(self):
         typeperf_output = self.read_output()
@@ -89,14 +111,29 @@ def getloadavg(self):
 
         # Process the backlog of load values
         for line in typeperf_output.splitlines():
+            # Ignore the initial header:
+            # "(PDH-CSV 4.0)","\\\\WIN\\System\\Processor Queue Length"
+            if '\\\\' in line:
+                continue
+
+            # Ignore blank lines
+            if not line.strip():
+                continue
+
             # typeperf outputs in a CSV format like this:
             # "07/19/2018 01:32:26.605","3.000000"
-            toks = line.split(',')
-            # Ignore blank lines and the initial header
-            if line.strip() == '' or (COUNTER_NAME in line) or len(toks) != 2:
+            # (date, process queue length)
+            try:
+                tokens = line.split(',')
+                if len(tokens) != 2:
+                    raise ValueError
+
+                value = tokens[1].replace('"', '')
+                load = float(value)
+            except ValueError:
+                print_warning("Failed to parse typeperf output: %a" % line)
                 continue
 
-            load = float(toks[1].replace('"', ''))
             # We use an exponentially weighted moving average, imitating the
             # load calculation on Unix systems.
             # https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation



More information about the Python-checkins mailing list