[Python-checkins] bpo-44336: Prevent tests hanging on child process handles on Windows (GH-26578)

zooba webhook-mailer at python.org
Mon Mar 21 20:07:04 EDT 2022


https://github.com/python/cpython/commit/19058b9f6271338bcc46b7d30fe79a83990cc35c
commit: 19058b9f6271338bcc46b7d30fe79a83990cc35c
branch: main
author: Jeremy Kloth <jeremy.kloth at gmail.com>
committer: zooba <steve.dower at microsoft.com>
date: 2022-03-22T00:06:55Z
summary:

bpo-44336: Prevent tests hanging on child process handles on Windows (GH-26578)

Replace the child process `typeperf.exe` with a daemon thread that reads the performance counters directly.  This prevents the issues that arise from inherited handles in grandchild processes (see issue37531 for discussion).

We only use the load tracker when running tests in multiprocess mode. This prevents inadvertent interactions with tests expecting a single threaded environment.  Displaying load is really only helpful for buildbots running in multiprocess mode anyway.

files:
M Lib/test/libregrtest/main.py
M Lib/test/libregrtest/win_utils.py

diff --git a/Lib/test/libregrtest/main.py b/Lib/test/libregrtest/main.py
index fc3c2b9692055..af58114ed925c 100644
--- a/Lib/test/libregrtest/main.py
+++ b/Lib/test/libregrtest/main.py
@@ -533,7 +533,24 @@ def run_tests(self):
 
         if self.ns.use_mp:
             from test.libregrtest.runtest_mp import run_tests_multiprocess
-            run_tests_multiprocess(self)
+            # If we're on windows and this is the parent runner (not a worker),
+            # track the load average.
+            if sys.platform == 'win32' and self.worker_test_name is None:
+                from test.libregrtest.win_utils import WindowsLoadTracker
+
+                try:
+                    self.win_load_tracker = WindowsLoadTracker()
+                except PermissionError as error:
+                    # Standard accounts may not have access to the performance
+                    # counters.
+                    print(f'Failed to create WindowsLoadTracker: {error}')
+
+            try:
+                run_tests_multiprocess(self)
+            finally:
+                if self.win_load_tracker is not None:
+                    self.win_load_tracker.close()
+                    self.win_load_tracker = None
         else:
             self.run_tests_sequential()
 
@@ -695,28 +712,11 @@ def _main(self, tests, kwargs):
             self.list_cases()
             sys.exit(0)
 
-        # If we're on windows and this is the parent runner (not a worker),
-        # track the load average.
-        if sys.platform == 'win32' and self.worker_test_name is None:
-            from test.libregrtest.win_utils import WindowsLoadTracker
-
-            try:
-                self.win_load_tracker = WindowsLoadTracker()
-            except FileNotFoundError as error:
-                # Windows IoT Core and Windows Nano Server do not provide
-                # typeperf.exe for x64, x86 or ARM
-                print(f'Failed to create WindowsLoadTracker: {error}')
+        self.run_tests()
+        self.display_result()
 
-        try:
-            self.run_tests()
-            self.display_result()
-
-            if self.ns.verbose2 and self.bad:
-                self.rerun_failed_tests()
-        finally:
-            if self.win_load_tracker is not None:
-                self.win_load_tracker.close()
-                self.win_load_tracker = None
+        if self.ns.verbose2 and self.bad:
+            self.rerun_failed_tests()
 
         self.finalize()
 
diff --git a/Lib/test/libregrtest/win_utils.py b/Lib/test/libregrtest/win_utils.py
index a1cc2201147db..5736cdfd3c729 100644
--- a/Lib/test/libregrtest/win_utils.py
+++ b/Lib/test/libregrtest/win_utils.py
@@ -1,16 +1,11 @@
+import _overlapped
+import _thread
 import _winapi
 import math
-import msvcrt
-import os
-import subprocess
-import uuid
+import struct
 import winreg
-from test.support import os_helper
-from test.libregrtest.utils import print_warning
 
 
-# Max size of asynchronous reads
-BUFSIZE = 8192
 # Seconds per measurement
 SAMPLING_INTERVAL = 1
 # Exponential damping factor to compute exponentially weighted moving average
@@ -19,174 +14,111 @@
 # Initialize the load using the arithmetic mean of the first NVALUE values
 # of the Processor Queue Length
 NVALUE = 5
-# Windows registry subkey of HKEY_LOCAL_MACHINE where the counter names
-# of typeperf are registered
-COUNTER_REGISTRY_KEY = (r"SOFTWARE\Microsoft\Windows NT\CurrentVersion"
-                        r"\Perflib\CurrentLanguage")
 
 
 class WindowsLoadTracker():
     """
-    This class asynchronously interacts with the `typeperf` command to read
-    the system load on Windows. Multiprocessing and threads can't be used
-    here because they interfere with the test suite's cases for those
-    modules.
+    This class asynchronously reads the performance counters to calculate
+    the system load on Windows.  A "raw" thread is used here to prevent
+    interference with the test suite's cases for the threading module.
     """
 
     def __init__(self):
+        # Pre-flight test for access to the performance data;
+        # `PermissionError` will be raised if not allowed
+        winreg.QueryInfoKey(winreg.HKEY_PERFORMANCE_DATA)
+
         self._values = []
         self._load = None
-        self._buffer = ''
-        self._popen = None
-        self.start()
-
-    def start(self):
-        # Create a named pipe which allows for asynchronous IO in Windows
-        pipe_name =  r'\\.\pipe\typeperf_output_' + str(uuid.uuid4())
-
-        open_mode =  _winapi.PIPE_ACCESS_INBOUND
-        open_mode |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
-        open_mode |= _winapi.FILE_FLAG_OVERLAPPED
-
-        # This is the read end of the pipe, where we will be grabbing output
-        self.pipe = _winapi.CreateNamedPipe(
-            pipe_name, open_mode, _winapi.PIPE_WAIT,
-            1, BUFSIZE, BUFSIZE, _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL
-        )
-        # The write end of the pipe which is passed to the created process
-        pipe_write_end = _winapi.CreateFile(
-            pipe_name, _winapi.GENERIC_WRITE, 0, _winapi.NULL,
-            _winapi.OPEN_EXISTING, 0, _winapi.NULL
-        )
-        # Open up the handle as a python file object so we can pass it to
-        # subprocess
-        command_stdout = msvcrt.open_osfhandle(pipe_write_end, 0)
-
-        # Connect to the read end of the pipe in overlap/async mode
-        overlap = _winapi.ConnectNamedPipe(self.pipe, overlapped=True)
-        overlap.GetOverlappedResult(True)
-
-        # Spawn off the load monitor
-        counter_name = self._get_counter_name()
-        command = ['typeperf', counter_name, '-si', str(SAMPLING_INTERVAL)]
-        self._popen = subprocess.Popen(' '.join(command),
-                                       stdout=command_stdout,
-                                       cwd=os_helper.SAVEDCWD)
-
-        # Close our copy of the write end of the pipe
-        os.close(command_stdout)
-
-    def _get_counter_name(self):
-        # accessing the registry to get the counter localization name
-        with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, COUNTER_REGISTRY_KEY) as perfkey:
-            counters = winreg.QueryValueEx(perfkey, 'Counter')[0]
-
-        # Convert [key1, value1, key2, value2, ...] list
-        # to {key1: value1, key2: value2, ...} dict
-        counters = iter(counters)
-        counters_dict = dict(zip(counters, counters))
-
-        # System counter has key '2' and Processor Queue Length has key '44'
-        system = counters_dict['2']
-        process_queue_length = counters_dict['44']
-        return f'"\\{system}\\{process_queue_length}"'
-
-    def close(self, kill=True):
-        if self._popen is None:
+        self._running = _overlapped.CreateEvent(None, True, False, None)
+        self._stopped = _overlapped.CreateEvent(None, True, False, None)
+
+        _thread.start_new_thread(self._update_load, (), {})
+
+    def _update_load(self,
+                    # localize module access to prevent shutdown errors
+                     _wait=_winapi.WaitForSingleObject,
+                     _signal=_overlapped.SetEvent):
+        # run until signaled to stop
+        while _wait(self._running, 1000):
+            self._calculate_load()
+        # notify stopped
+        _signal(self._stopped)
+
+    def _calculate_load(self,
+                        # localize module access to prevent shutdown errors
+                        _query=winreg.QueryValueEx,
+                        _hkey=winreg.HKEY_PERFORMANCE_DATA,
+                        _unpack=struct.unpack_from):
+        # get the 'System' object
+        data, _ = _query(_hkey, '2')
+        # PERF_DATA_BLOCK {
+        #   WCHAR Signature[4]      8 +
+        #   DWOWD LittleEndian      4 +
+        #   DWORD Version           4 +
+        #   DWORD Revision          4 +
+        #   DWORD TotalByteLength   4 +
+        #   DWORD HeaderLength      = 24 byte offset
+        #   ...
+        # }
+        obj_start, = _unpack('L', data, 24)
+        # PERF_OBJECT_TYPE {
+        #   DWORD TotalByteLength
+        #   DWORD DefinitionLength
+        #   DWORD HeaderLength
+        #   ...
+        # }
+        data_start, defn_start = _unpack('4xLL', data, obj_start)
+        data_base = obj_start + data_start
+        defn_base = obj_start + defn_start
+        # find the 'Processor Queue Length' counter (index=44)
+        while defn_base < data_base:
+            # PERF_COUNTER_DEFINITION {
+            #   DWORD ByteLength
+            #   DWORD CounterNameTitleIndex
+            #   ... [7 DWORDs/28 bytes]
+            #   DWORD CounterOffset
+            # }
+            size, idx, offset = _unpack('LL28xL', data, defn_base)
+            defn_base += size
+            if idx == 44:
+                counter_offset = data_base + offset
+                # the counter is known to be PERF_COUNTER_RAWCOUNT (DWORD)
+                processor_queue_length, = _unpack('L', data, counter_offset)
+                break
+        else:
             return
 
-        self._load = None
-
-        if kill:
-            self._popen.kill()
-        self._popen.wait()
-        self._popen = None
-
-    def __del__(self):
-        self.close()
-
-    def _parse_line(self, line):
-        # typeperf outputs in a CSV format like this:
-        # "07/19/2018 01:32:26.605","3.000000"
-        # (date, process queue length)
-        tokens = line.split(',')
-        if len(tokens) != 2:
-            raise ValueError
-
-        value = tokens[1]
-        if not value.startswith('"') or not value.endswith('"'):
-            raise ValueError
-        value = value[1:-1]
-        return float(value)
-
-    def _read_lines(self):
-        overlapped, _ = _winapi.ReadFile(self.pipe, BUFSIZE, True)
-        bytes_read, res = overlapped.GetOverlappedResult(False)
-        if res != 0:
-            return ()
-
-        output = overlapped.getbuffer()
-        output = output.decode('oem', 'replace')
-        output = self._buffer + output
-        lines = output.splitlines(True)
-
-        # bpo-36670: typeperf only writes a newline *before* writing a value,
-        # not after. Sometimes, the written line in incomplete (ex: only
-        # timestamp, without the process queue length). Only pass the last line
-        # to the parser if it's a valid value, otherwise store it in
-        # self._buffer.
-        try:
-            self._parse_line(lines[-1])
-        except ValueError:
-            self._buffer = lines.pop(-1)
+        # We use an exponentially weighted moving average, imitating the
+        # load calculation on Unix systems.
+        # https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation
+        # https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
+        if self._load is not None:
+            self._load = (self._load * LOAD_FACTOR_1
+                            + processor_queue_length  * (1.0 - LOAD_FACTOR_1))
+        elif len(self._values) < NVALUE:
+            self._values.append(processor_queue_length)
         else:
-            self._buffer = ''
+            self._load = sum(self._values) / len(self._values)
 
-        return lines
+    def close(self, kill=True):
+        self.__del__()
+        return
+
+    def __del__(self,
+                # localize module access to prevent shutdown errors
+                _wait=_winapi.WaitForSingleObject,
+                _close=_winapi.CloseHandle,
+                _signal=_overlapped.SetEvent):
+        if self._running is not None:
+            # tell the update thread to quit
+            _signal(self._running)
+            # wait for the update thread to signal done
+            _wait(self._stopped, -1)
+            # cleanup events
+            _close(self._running)
+            _close(self._stopped)
+            self._running = self._stopped = None
 
     def getloadavg(self):
-        if self._popen is None:
-            return None
-
-        returncode = self._popen.poll()
-        if returncode is not None:
-            self.close(kill=False)
-            return None
-
-        try:
-            lines = self._read_lines()
-        except BrokenPipeError:
-            self.close()
-            return None
-
-        for line in lines:
-            line = line.rstrip()
-
-            # Ignore the initial header:
-            # "(PDH-CSV 4.0)","\\\\WIN\\System\\Processor Queue Length"
-            if 'PDH-CSV' in line:
-                continue
-
-            # Ignore blank lines
-            if not line:
-                continue
-
-            try:
-                processor_queue_length = self._parse_line(line)
-            except ValueError:
-                print_warning("Failed to parse typeperf output: %a" % line)
-                continue
-
-            # We use an exponentially weighted moving average, imitating the
-            # load calculation on Unix systems.
-            # https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation
-            # https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
-            if self._load is not None:
-                self._load = (self._load * LOAD_FACTOR_1
-                              + processor_queue_length  * (1.0 - LOAD_FACTOR_1))
-            elif len(self._values) < NVALUE:
-                self._values.append(processor_queue_length)
-            else:
-                self._load = sum(self._values) / len(self._values)
-
         return self._load



More information about the Python-checkins mailing list