[Python-checkins] bpo-40275: Add os_helper submodule in test.support (GH-20765)

Hai Shi webhook-mailer at python.org
Wed Jun 10 08:29:07 EDT 2020


https://github.com/python/cpython/commit/0d00b2a5d74390da7bbeff7dfa73abf2eb46124a
commit: 0d00b2a5d74390da7bbeff7dfa73abf2eb46124a
branch: master
author: Hai Shi <shihai1992 at gmail.com>
committer: GitHub <noreply at github.com>
date: 2020-06-10T14:29:02+02:00
summary:

bpo-40275: Add os_helper submodule in test.support (GH-20765)

files:
A Lib/test/support/os_helper.py
M Doc/library/test.rst
M Lib/test/support/__init__.py

diff --git a/Doc/library/test.rst b/Doc/library/test.rst
index 7580fb5e9b174..11d748466cba2 100644
--- a/Doc/library/test.rst
+++ b/Doc/library/test.rst
@@ -247,41 +247,6 @@ The :mod:`test.support` module defines the following constants:
    Path for shell if not on Windows; otherwise ``None``.
 
 
-.. data:: FS_NONASCII
-
-   A non-ASCII character encodable by :func:`os.fsencode`.
-
-
-.. data:: TESTFN
-
-   Set to a name that is safe to use as the name of a temporary file.  Any
-   temporary file that is created should be closed and unlinked (removed).
-
-
-.. data:: TESTFN_UNICODE
-
-    Set to a non-ASCII name for a temporary file.
-
-
-.. data:: TESTFN_UNENCODABLE
-
-   Set to a filename (str type) that should not be able to be encoded by file
-   system encoding in strict mode.  It may be ``None`` if it's not possible to
-   generate such a filename.
-
-
-.. data:: TESTFN_UNDECODABLE
-
-   Set to a filename (bytes type) that should not be able to be decoded by
-   file system encoding in strict mode.  It may be ``None`` if it's not
-   possible to generate such a filename.
-
-
-.. data:: TESTFN_NONASCII
-
-   Set to a filename containing the :data:`FS_NONASCII` character.
-
-
 .. data:: LOOPBACK_TIMEOUT
 
    Timeout in seconds for tests using a network server listening on the network
@@ -343,11 +308,6 @@ The :mod:`test.support` module defines the following constants:
    :data:`SHORT_TIMEOUT`.
 
 
-.. data:: SAVEDCWD
-
-   Set to :func:`os.getcwd`.
-
-
 .. data:: PGO
 
    Set when tests can be skipped when they are not useful for PGO.
@@ -449,25 +409,6 @@ The :mod:`test.support` module defines the following functions:
    Delete *name* from ``sys.modules``.
 
 
-.. function:: unlink(filename)
-
-   Call :func:`os.unlink` on *filename*.  On Windows platforms, this is
-   wrapped with a wait loop that checks for the existence fo the file.
-
-
-.. function:: rmdir(filename)
-
-   Call :func:`os.rmdir` on *filename*.  On Windows platforms, this is
-   wrapped with a wait loop that checks for the existence of the file.
-
-
-.. function:: rmtree(path)
-
-   Call :func:`shutil.rmtree` on *path* or call :func:`os.lstat` and
-   :func:`os.rmdir` to remove a path and its contents.  On Windows platforms,
-   this is wrapped with a wait loop that checks for the existence of the files.
-
-
 .. function:: make_legacy_pyc(source)
 
    Move a :pep:`3147`/:pep:`488` pyc file to its legacy pyc location and return the file
@@ -521,16 +462,6 @@ The :mod:`test.support` module defines the following functions:
    rather than looking directly in the path directories.
 
 
-.. function:: create_empty_file(filename)
-
-   Create an empty file with *filename*.  If it already exists, truncate it.
-
-
-.. function:: fd_count()
-
-   Count the number of open file descriptors.
-
-
 .. function:: match_test(test)
 
    Match *test* to patterns set in :func:`set_match_tests`.
@@ -713,47 +644,6 @@ The :mod:`test.support` module defines the following functions:
       self.assertEqual(captured, "hello")
 
 
-.. function:: temp_dir(path=None, quiet=False)
-
-   A context manager that creates a temporary directory at *path* and
-   yields the directory.
-
-   If *path* is ``None``, the temporary directory is created using
-   :func:`tempfile.mkdtemp`.  If *quiet* is ``False``, the context manager
-   raises an exception on error.  Otherwise, if *path* is specified and
-   cannot be created, only a warning is issued.
-
-
-.. function:: change_cwd(path, quiet=False)
-
-   A context manager that temporarily changes the current working
-   directory to *path* and yields the directory.
-
-   If *quiet* is ``False``, the context manager raises an exception
-   on error.  Otherwise, it issues only a warning and keeps the current
-   working directory the same.
-
-
-.. function:: temp_cwd(name='tempcwd', quiet=False)
-
-   A context manager that temporarily creates a new directory and
-   changes the current working directory (CWD).
-
-   The context manager creates a temporary directory in the current
-   directory with name *name* before temporarily changing the current
-   working directory.  If *name* is ``None``, the temporary directory is
-   created using :func:`tempfile.mkdtemp`.
-
-   If *quiet* is ``False`` and it is not possible to create or change
-   the CWD, an error is raised.  Otherwise, only a warning is raised
-   and the original CWD is used.
-
-
-.. function:: temp_umask(umask)
-
-   A context manager that temporarily sets the process umask.
-
-
 .. function:: disable_faulthandler()
 
    A context manager that replaces ``sys.stderr`` with ``sys.__stderr__``.
@@ -851,28 +741,6 @@ The :mod:`test.support` module defines the following functions:
    header size equals *size*.
 
 
-.. function:: can_symlink()
-
-   Return ``True`` if the OS supports symbolic links, ``False``
-   otherwise.
-
-
-.. function:: can_xattr()
-
-   Return ``True`` if the OS supports xattr, ``False``
-   otherwise.
-
-
-.. decorator:: skip_unless_symlink
-
-   A decorator for running tests that require support for symbolic links.
-
-
-.. decorator:: skip_unless_xattr
-
-   A decorator for running tests that require support for xattr.
-
-
 .. decorator:: anticipate_failure(condition)
 
    A decorator to conditionally mark tests with
@@ -992,12 +860,6 @@ The :mod:`test.support` module defines the following functions:
    wrap.
 
 
-.. function:: make_bad_fd()
-
-   Create an invalid file descriptor by opening and closing a temporary file,
-   and returning its descriptor.
-
-
 .. function:: check_syntax_error(testcase, statement, errtext='', *, lineno=None, offset=None)
 
    Test for syntax errors in *statement* by attempting to compile *statement*.
@@ -1144,11 +1006,6 @@ The :mod:`test.support` module defines the following functions:
           return load_package_tests(os.path.dirname(__file__), *args)
 
 
-.. function:: fs_is_case_insensitive(directory)
-
-   Return ``True`` if the file system for *directory* is case-insensitive.
-
-
 .. function:: detect_api_mismatch(ref_api, other_api, *, ignore=())
 
    Returns the set of attributes, functions or methods of *ref_api* not
@@ -1241,28 +1098,6 @@ The :mod:`test.support` module defines the following classes:
    attributes on the exception is :exc:`ResourceDenied` raised.
 
 
-.. class:: EnvironmentVarGuard()
-
-   Class used to temporarily set or unset environment variables.  Instances can
-   be used as a context manager and have a complete dictionary interface for
-   querying/modifying the underlying ``os.environ``. After exit from the
-   context manager all changes to environment variables done through this
-   instance will be rolled back.
-
-   .. versionchanged:: 3.1
-      Added dictionary interface.
-
-.. method:: EnvironmentVarGuard.set(envvar, value)
-
-   Temporarily set the environment variable ``envvar`` to the value of
-   ``value``.
-
-
-.. method:: EnvironmentVarGuard.unset(envvar)
-
-   Temporarily unset the environment variable ``envvar``.
-
-
 .. class:: SuppressCrashReport()
 
    A context manager used to try to prevent crash dialog popups on tests that
@@ -1332,13 +1167,6 @@ The :mod:`test.support` module defines the following classes:
       Run *test* and return the result.
 
 
-.. class:: FakePath(path)
-
-   Simple :term:`path-like object`.  It implements the :meth:`__fspath__`
-   method which just returns the *path* argument.  If *path* is an exception,
-   it will be raised in :meth:`!__fspath__`.
-
-
 :mod:`test.support.socket_helper` --- Utilities for socket tests
 ================================================================
 
@@ -1634,3 +1462,187 @@ The :mod:`test.support.threading_helper` module provides support for threading t
        # (to avoid reference cycles)
 
    .. versionadded:: 3.8
+
+
+:mod:`test.support.os_helper` --- Utilities for os tests
+========================================================================
+
+.. module:: test.support.os_helper
+   :synopsis: Support for os tests.
+
+The :mod:`test.support.os_helper` module provides support for os tests.
+
+.. versionadded:: 3.10
+
+
+.. data:: FS_NONASCII
+
+   A non-ASCII character encodable by :func:`os.fsencode`.
+
+
+.. data:: SAVEDCWD
+
+   Set to :func:`os.getcwd`.
+
+
+.. data:: TESTFN
+
+   Set to a name that is safe to use as the name of a temporary file.  Any
+   temporary file that is created should be closed and unlinked (removed).
+
+
+.. data:: TESTFN_NONASCII
+
+   Set to a filename containing the :data:`FS_NONASCII` character.
+
+
+.. data:: TESTFN_UNENCODABLE
+
+   Set to a filename (str type) that should not be able to be encoded by file
+   system encoding in strict mode.  It may be ``None`` if it's not possible to
+   generate such a filename.
+
+
+.. data:: TESTFN_UNDECODABLE
+
+   Set to a filename (bytes type) that should not be able to be decoded by
+   file system encoding in strict mode.  It may be ``None`` if it's not
+   possible to generate such a filename.
+
+
+.. data:: TESTFN_UNICODE
+
+    Set to a non-ASCII name for a temporary file.
+
+
+.. class:: EnvironmentVarGuard()
+
+   Class used to temporarily set or unset environment variables.  Instances can
+   be used as a context manager and have a complete dictionary interface for
+   querying/modifying the underlying ``os.environ``. After exit from the
+   context manager all changes to environment variables done through this
+   instance will be rolled back.
+
+   .. versionchanged:: 3.1
+      Added dictionary interface.
+
+
+.. class:: FakePath(path)
+
+   Simple :term:`path-like object`.  It implements the :meth:`__fspath__`
+   method which just returns the *path* argument.  If *path* is an exception,
+   it will be raised in :meth:`!__fspath__`.
+
+
+.. method:: EnvironmentVarGuard.set(envvar, value)
+
+   Temporarily set the environment variable ``envvar`` to the value of
+   ``value``.
+
+
+.. method:: EnvironmentVarGuard.unset(envvar)
+
+   Temporarily unset the environment variable ``envvar``.
+
+
+.. function:: can_symlink()
+
+   Return ``True`` if the OS supports symbolic links, ``False``
+   otherwise.
+
+
+.. function:: can_xattr()
+
+   Return ``True`` if the OS supports xattr, ``False``
+   otherwise.
+
+
+.. function:: change_cwd(path, quiet=False)
+
+   A context manager that temporarily changes the current working
+   directory to *path* and yields the directory.
+
+   If *quiet* is ``False``, the context manager raises an exception
+   on error.  Otherwise, it issues only a warning and keeps the current
+   working directory the same.
+
+
+.. function:: create_empty_file(filename)
+
+   Create an empty file with *filename*.  If it already exists, truncate it.
+
+
+.. function:: fd_count()
+
+   Count the number of open file descriptors.
+
+
+.. function:: fs_is_case_insensitive(directory)
+
+   Return ``True`` if the file system for *directory* is case-insensitive.
+
+
+.. function:: make_bad_fd()
+
+   Create an invalid file descriptor by opening and closing a temporary file,
+   and returning its descriptor.
+
+
+.. function:: rmdir(filename)
+
+   Call :func:`os.rmdir` on *filename*.  On Windows platforms, this is
+   wrapped with a wait loop that checks for the existence of the file.
+
+
+.. function:: rmtree(path)
+
+   Call :func:`shutil.rmtree` on *path* or call :func:`os.lstat` and
+   :func:`os.rmdir` to remove a path and its contents.  On Windows platforms,
+   this is wrapped with a wait loop that checks for the existence of the files.
+
+
+.. decorator:: skip_unless_symlink
+
+   A decorator for running tests that require support for symbolic links.
+
+
+.. decorator:: skip_unless_xattr
+
+   A decorator for running tests that require support for xattr.
+
+
+.. function:: temp_cwd(name='tempcwd', quiet=False)
+
+   A context manager that temporarily creates a new directory and
+   changes the current working directory (CWD).
+
+   The context manager creates a temporary directory in the current
+   directory with name *name* before temporarily changing the current
+   working directory.  If *name* is ``None``, the temporary directory is
+   created using :func:`tempfile.mkdtemp`.
+
+   If *quiet* is ``False`` and it is not possible to create or change
+   the CWD, an error is raised.  Otherwise, only a warning is raised
+   and the original CWD is used.
+
+
+.. function:: temp_dir(path=None, quiet=False)
+
+   A context manager that creates a temporary directory at *path* and
+   yields the directory.
+
+   If *path* is ``None``, the temporary directory is created using
+   :func:`tempfile.mkdtemp`.  If *quiet* is ``False``, the context manager
+   raises an exception on error.  Otherwise, if *path* is specified and
+   cannot be created, only a warning is issued.
+
+
+.. function:: temp_umask(umask)
+
+   A context manager that temporarily sets the process umask.
+
+
+.. function:: unlink(filename)
+
+   Call :func:`os.unlink` on *filename*.  On Windows platforms, this is
+   wrapped with a wait loop that checks for the existence fo the file.
diff --git a/Lib/test/support/__init__.py b/Lib/test/support/__init__.py
index bb905bd895de8..3a5f7b556d767 100644
--- a/Lib/test/support/__init__.py
+++ b/Lib/test/support/__init__.py
@@ -3,7 +3,6 @@
 if __name__ != 'test.support':
     raise ImportError('support must be imported from the test package')
 
-import collections.abc
 import contextlib
 import errno
 import fnmatch
@@ -22,8 +21,19 @@
 import unittest
 import warnings
 
+from .os_helper import (
+    FS_NONASCII, SAVEDCWD, TESTFN, TESTFN_NONASCII,
+    TESTFN_UNENCODABLE, TESTFN_UNDECODABLE,
+    TESTFN_UNICODE, can_symlink, can_xattr,
+    change_cwd, create_empty_file, fd_count,
+    fs_is_case_insensitive, make_bad_fd, rmdir,
+    rmtree, skip_unless_symlink, skip_unless_xattr,
+    temp_cwd, temp_dir, temp_umask, unlink,
+    EnvironmentVarGuard, FakePath, _longpath)
+
 from .testresult import get_test_runner
 
+
 __all__ = [
     # globals
     "PIPE_MAX_SIZE", "verbose", "max_memuse", "use_resources", "failfast",
@@ -36,18 +46,15 @@
     # io
     "record_original_stdout", "get_original_stdout", "captured_stdout",
     "captured_stdin", "captured_stderr",
-    # filesystem
-    "TESTFN", "SAVEDCWD", "unlink", "rmtree", "temp_cwd", "findfile",
-    "create_empty_file", "can_symlink", "fs_is_case_insensitive",
     # unittest
     "is_resource_enabled", "requires", "requires_freebsd_version",
     "requires_linux_version", "requires_mac_ver",
     "check_syntax_error", "check_syntax_warning",
     "TransientResource", "time_out", "socket_peer_reset", "ioerror_peer_reset",
     "BasicTestRunner", "run_unittest", "run_doctest",
-    "skip_unless_symlink", "requires_gzip", "requires_bz2", "requires_lzma",
+    "requires_gzip", "requires_bz2", "requires_lzma",
     "bigmemtest", "bigaddrspacetest", "cpython_only", "get_attribute",
-    "requires_IEEE_754", "skip_unless_xattr", "requires_zlib",
+    "requires_IEEE_754", "requires_zlib",
     "anticipate_failure", "load_package_tests", "detect_api_mismatch",
     "check__all__", "skip_if_buggy_ucrt_strfptime",
     "ignore_warnings",
@@ -57,13 +64,12 @@
     # network
     "open_urlresource",
     # processes
-    'temp_umask', "reap_children",
+    "reap_children",
     # miscellaneous
     "check_warnings", "check_no_resource_warning", "check_no_warnings",
-    "EnvironmentVarGuard",
-    "run_with_locale", "swap_item",
+    "run_with_locale", "swap_item", "findfile",
     "swap_attr", "Matcher", "set_memlimit", "SuppressCrashReport", "sortdict",
-    "run_with_tz", "PGO", "missing_compiler_executable", "fd_count",
+    "run_with_tz", "PGO", "missing_compiler_executable",
     "ALWAYS_EQ", "NEVER_EQ", "LARGEST", "SMALLEST",
     "LOOPBACK_TIMEOUT", "INTERNET_TIMEOUT", "SHORT_TIMEOUT", "LONG_TIMEOUT",
     ]
@@ -318,6 +324,7 @@ def unload(name):
     except KeyError:
         pass
 
+
 def _force_run(path, func, *args):
     try:
         return func(*args)
@@ -328,124 +335,6 @@ def _force_run(path, func, *args):
         os.chmod(path, stat.S_IRWXU)
         return func(*args)
 
-if sys.platform.startswith("win"):
-    def _waitfor(func, pathname, waitall=False):
-        # Perform the operation
-        func(pathname)
-        # Now setup the wait loop
-        if waitall:
-            dirname = pathname
-        else:
-            dirname, name = os.path.split(pathname)
-            dirname = dirname or '.'
-        # Check for `pathname` to be removed from the filesystem.
-        # The exponential backoff of the timeout amounts to a total
-        # of ~1 second after which the deletion is probably an error
-        # anyway.
-        # Testing on an i7 at 4.3GHz shows that usually only 1 iteration is
-        # required when contention occurs.
-        timeout = 0.001
-        while timeout < 1.0:
-            # Note we are only testing for the existence of the file(s) in
-            # the contents of the directory regardless of any security or
-            # access rights.  If we have made it this far, we have sufficient
-            # permissions to do that much using Python's equivalent of the
-            # Windows API FindFirstFile.
-            # Other Windows APIs can fail or give incorrect results when
-            # dealing with files that are pending deletion.
-            L = os.listdir(dirname)
-            if not (L if waitall else name in L):
-                return
-            # Increase the timeout and try again
-            time.sleep(timeout)
-            timeout *= 2
-        warnings.warn('tests may fail, delete still pending for ' + pathname,
-                      RuntimeWarning, stacklevel=4)
-
-    def _unlink(filename):
-        _waitfor(os.unlink, filename)
-
-    def _rmdir(dirname):
-        _waitfor(os.rmdir, dirname)
-
-    def _rmtree(path):
-        def _rmtree_inner(path):
-            for name in _force_run(path, os.listdir, path):
-                fullname = os.path.join(path, name)
-                try:
-                    mode = os.lstat(fullname).st_mode
-                except OSError as exc:
-                    print("support.rmtree(): os.lstat(%r) failed with %s" % (fullname, exc),
-                          file=sys.__stderr__)
-                    mode = 0
-                if stat.S_ISDIR(mode):
-                    _waitfor(_rmtree_inner, fullname, waitall=True)
-                    _force_run(fullname, os.rmdir, fullname)
-                else:
-                    _force_run(fullname, os.unlink, fullname)
-        _waitfor(_rmtree_inner, path, waitall=True)
-        _waitfor(lambda p: _force_run(p, os.rmdir, p), path)
-
-    def _longpath(path):
-        try:
-            import ctypes
-        except ImportError:
-            # No ctypes means we can't expands paths.
-            pass
-        else:
-            buffer = ctypes.create_unicode_buffer(len(path) * 2)
-            length = ctypes.windll.kernel32.GetLongPathNameW(path, buffer,
-                                                             len(buffer))
-            if length:
-                return buffer[:length]
-        return path
-else:
-    _unlink = os.unlink
-    _rmdir = os.rmdir
-
-    def _rmtree(path):
-        import shutil
-        try:
-            shutil.rmtree(path)
-            return
-        except OSError:
-            pass
-
-        def _rmtree_inner(path):
-            for name in _force_run(path, os.listdir, path):
-                fullname = os.path.join(path, name)
-                try:
-                    mode = os.lstat(fullname).st_mode
-                except OSError:
-                    mode = 0
-                if stat.S_ISDIR(mode):
-                    _rmtree_inner(fullname)
-                    _force_run(path, os.rmdir, fullname)
-                else:
-                    _force_run(path, os.unlink, fullname)
-        _rmtree_inner(path)
-        os.rmdir(path)
-
-    def _longpath(path):
-        return path
-
-def unlink(filename):
-    try:
-        _unlink(filename)
-    except (FileNotFoundError, NotADirectoryError):
-        pass
-
-def rmdir(dirname):
-    try:
-        _rmdir(dirname)
-    except FileNotFoundError:
-        pass
-
-def rmtree(path):
-    try:
-        _rmtree(path)
-    except FileNotFoundError:
-        pass
 
 def make_legacy_pyc(source):
     """Move a PEP 3147/488 pyc file to its legacy pyc location.
@@ -714,149 +603,10 @@ def requires_lzma(reason='requires lzma'):
 else:
     unix_shell = None
 
-# Filename used for testing
-if os.name == 'java':
-    # Jython disallows @ in module names
-    TESTFN = '$test'
-else:
-    TESTFN = '@test'
-
-# Disambiguate TESTFN for parallel testing, while letting it remain a valid
-# module name.
-TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid())
-
 # Define the URL of a dedicated HTTP server for the network tests.
 # The URL must use clear-text HTTP: no redirection to encrypted HTTPS.
 TEST_HTTP_URL = "http://www.pythontest.net"
 
-# FS_NONASCII: non-ASCII character encodable by os.fsencode(),
-# or None if there is no such character.
-FS_NONASCII = None
-for character in (
-    # First try printable and common characters to have a readable filename.
-    # For each character, the encoding list are just example of encodings able
-    # to encode the character (the list is not exhaustive).
-
-    # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1
-    '\u00E6',
-    # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3
-    '\u0130',
-    # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257
-    '\u0141',
-    # U+03C6 (Greek Small Letter Phi): cp1253
-    '\u03C6',
-    # U+041A (Cyrillic Capital Letter Ka): cp1251
-    '\u041A',
-    # U+05D0 (Hebrew Letter Alef): Encodable to cp424
-    '\u05D0',
-    # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic
-    '\u060C',
-    # U+062A (Arabic Letter Teh): cp720
-    '\u062A',
-    # U+0E01 (Thai Character Ko Kai): cp874
-    '\u0E01',
-
-    # Then try more "special" characters. "special" because they may be
-    # interpreted or displayed differently depending on the exact locale
-    # encoding and the font.
-
-    # U+00A0 (No-Break Space)
-    '\u00A0',
-    # U+20AC (Euro Sign)
-    '\u20AC',
-):
-    try:
-        # If Python is set up to use the legacy 'mbcs' in Windows,
-        # 'replace' error mode is used, and encode() returns b'?'
-        # for characters missing in the ANSI codepage
-        if os.fsdecode(os.fsencode(character)) != character:
-            raise UnicodeError
-    except UnicodeError:
-        pass
-    else:
-        FS_NONASCII = character
-        break
-
-# TESTFN_UNICODE is a non-ascii filename
-TESTFN_UNICODE = TESTFN + "-\xe0\xf2\u0258\u0141\u011f"
-if sys.platform == 'darwin':
-    # In Mac OS X's VFS API file names are, by definition, canonically
-    # decomposed Unicode, encoded using UTF-8. See QA1173:
-    # http://developer.apple.com/mac/library/qa/qa2001/qa1173.html
-    import unicodedata
-    TESTFN_UNICODE = unicodedata.normalize('NFD', TESTFN_UNICODE)
-
-# TESTFN_UNENCODABLE is a filename (str type) that should *not* be able to be
-# encoded by the filesystem encoding (in strict mode). It can be None if we
-# cannot generate such filename.
-TESTFN_UNENCODABLE = None
-if os.name == 'nt':
-    # skip win32s (0) or Windows 9x/ME (1)
-    if sys.getwindowsversion().platform >= 2:
-        # Different kinds of characters from various languages to minimize the
-        # probability that the whole name is encodable to MBCS (issue #9819)
-        TESTFN_UNENCODABLE = TESTFN + "-\u5171\u0141\u2661\u0363\uDC80"
-        try:
-            TESTFN_UNENCODABLE.encode(sys.getfilesystemencoding())
-        except UnicodeEncodeError:
-            pass
-        else:
-            print('WARNING: The filename %r CAN be encoded by the filesystem encoding (%s). '
-                  'Unicode filename tests may not be effective'
-                  % (TESTFN_UNENCODABLE, sys.getfilesystemencoding()))
-            TESTFN_UNENCODABLE = None
-# Mac OS X denies unencodable filenames (invalid utf-8)
-elif sys.platform != 'darwin':
-    try:
-        # ascii and utf-8 cannot encode the byte 0xff
-        b'\xff'.decode(sys.getfilesystemencoding())
-    except UnicodeDecodeError:
-        # 0xff will be encoded using the surrogate character u+DCFF
-        TESTFN_UNENCODABLE = TESTFN \
-            + b'-\xff'.decode(sys.getfilesystemencoding(), 'surrogateescape')
-    else:
-        # File system encoding (eg. ISO-8859-* encodings) can encode
-        # the byte 0xff. Skip some unicode filename tests.
-        pass
-
-# TESTFN_UNDECODABLE is a filename (bytes type) that should *not* be able to be
-# decoded from the filesystem encoding (in strict mode). It can be None if we
-# cannot generate such filename (ex: the latin1 encoding can decode any byte
-# sequence). On UNIX, TESTFN_UNDECODABLE can be decoded by os.fsdecode() thanks
-# to the surrogateescape error handler (PEP 383), but not from the filesystem
-# encoding in strict mode.
-TESTFN_UNDECODABLE = None
-for name in (
-    # b'\xff' is not decodable by os.fsdecode() with code page 932. Windows
-    # accepts it to create a file or a directory, or don't accept to enter to
-    # such directory (when the bytes name is used). So test b'\xe7' first: it is
-    # not decodable from cp932.
-    b'\xe7w\xf0',
-    # undecodable from ASCII, UTF-8
-    b'\xff',
-    # undecodable from iso8859-3, iso8859-6, iso8859-7, cp424, iso8859-8, cp856
-    # and cp857
-    b'\xae\xd5'
-    # undecodable from UTF-8 (UNIX and Mac OS X)
-    b'\xed\xb2\x80', b'\xed\xb4\x80',
-    # undecodable from shift_jis, cp869, cp874, cp932, cp1250, cp1251, cp1252,
-    # cp1253, cp1254, cp1255, cp1257, cp1258
-    b'\x81\x98',
-):
-    try:
-        name.decode(sys.getfilesystemencoding())
-    except UnicodeDecodeError:
-        TESTFN_UNDECODABLE = os.fsencode(TESTFN) + name
-        break
-
-if FS_NONASCII:
-    TESTFN_NONASCII = TESTFN + '-' + FS_NONASCII
-else:
-    TESTFN_NONASCII = None
-
-# Save the initial cwd
-SAVEDCWD = os.getcwd()
-
 # Set by libregrtest/main.py so we can skip tests that are not
 # useful for PGO
 PGO = False
@@ -865,103 +615,6 @@ def requires_lzma(reason='requires lzma'):
 # PGO task.  If this is True, PGO is also True.
 PGO_EXTENDED = False
 
- at contextlib.contextmanager
-def temp_dir(path=None, quiet=False):
-    """Return a context manager that creates a temporary directory.
-
-    Arguments:
-
-      path: the directory to create temporarily.  If omitted or None,
-        defaults to creating a temporary directory using tempfile.mkdtemp.
-
-      quiet: if False (the default), the context manager raises an exception
-        on error.  Otherwise, if the path is specified and cannot be
-        created, only a warning is issued.
-
-    """
-    import tempfile
-    dir_created = False
-    if path is None:
-        path = tempfile.mkdtemp()
-        dir_created = True
-        path = os.path.realpath(path)
-    else:
-        try:
-            os.mkdir(path)
-            dir_created = True
-        except OSError as exc:
-            if not quiet:
-                raise
-            warnings.warn(f'tests may fail, unable to create '
-                          f'temporary directory {path!r}: {exc}',
-                          RuntimeWarning, stacklevel=3)
-    if dir_created:
-        pid = os.getpid()
-    try:
-        yield path
-    finally:
-        # In case the process forks, let only the parent remove the
-        # directory. The child has a different process id. (bpo-30028)
-        if dir_created and pid == os.getpid():
-            rmtree(path)
-
- at contextlib.contextmanager
-def change_cwd(path, quiet=False):
-    """Return a context manager that changes the current working directory.
-
-    Arguments:
-
-      path: the directory to use as the temporary current working directory.
-
-      quiet: if False (the default), the context manager raises an exception
-        on error.  Otherwise, it issues only a warning and keeps the current
-        working directory the same.
-
-    """
-    saved_dir = os.getcwd()
-    try:
-        os.chdir(os.path.realpath(path))
-    except OSError as exc:
-        if not quiet:
-            raise
-        warnings.warn(f'tests may fail, unable to change the current working '
-                      f'directory to {path!r}: {exc}',
-                      RuntimeWarning, stacklevel=3)
-    try:
-        yield os.getcwd()
-    finally:
-        os.chdir(saved_dir)
-
-
- at contextlib.contextmanager
-def temp_cwd(name='tempcwd', quiet=False):
-    """
-    Context manager that temporarily creates and changes the CWD.
-
-    The function temporarily changes the current working directory
-    after creating a temporary directory in the current directory with
-    name *name*.  If *name* is None, the temporary directory is
-    created using tempfile.mkdtemp.
-
-    If *quiet* is False (default) and it is not possible to
-    create or change the CWD, an error is raised.  If *quiet* is True,
-    only a warning is raised and the original CWD is used.
-
-    """
-    with temp_dir(path=name, quiet=quiet) as temp_path:
-        with change_cwd(temp_path, quiet=quiet) as cwd_dir:
-            yield cwd_dir
-
-if hasattr(os, "umask"):
-    @contextlib.contextmanager
-    def temp_umask(umask):
-        """Context manager that temporarily sets the process umask."""
-        oldmask = os.umask(umask)
-        try:
-            yield
-        finally:
-            os.umask(oldmask)
-
 # TEST_HOME_DIR refers to the top level directory of the "test" package
 # that contains Python's regression test suite
 TEST_SUPPORT_DIR = os.path.dirname(os.path.abspath(__file__))
@@ -970,6 +623,7 @@ def temp_umask(umask):
 # TEST_DATA_DIR is used as a target download location for remote resources
 TEST_DATA_DIR = os.path.join(TEST_HOME_DIR, "data")
 
+
 def findfile(filename, subdir=None):
     """Try to find a file on sys.path or in the test directory.  If it is not
     found the argument passed to the function is returned (this does not
@@ -988,10 +642,6 @@ def findfile(filename, subdir=None):
         if os.path.exists(fn): return fn
     return filename
 
-def create_empty_file(filename):
-    """Create an empty file. If the file already exists, truncate it."""
-    fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
-    os.close(fd)
 
 def sortdict(dict):
     "Like repr(dict), but in sorted order."
@@ -1000,19 +650,6 @@ def sortdict(dict):
     withcommas = ", ".join(reprpairs)
     return "{%s}" % withcommas
 
-def make_bad_fd():
-    """
-    Create an invalid file descriptor by opening and closing a file and return
-    its fd.
-    """
-    file = open(TESTFN, "wb")
-    try:
-        return file.fileno()
-    finally:
-        file.close()
-        unlink(TESTFN)
-
-
 def check_syntax_error(testcase, statement, errtext='', *, lineno=None, offset=None):
     with testcase.assertRaisesRegex(SyntaxError, errtext) as cm:
         compile(statement, '<test string>', 'exec')
@@ -1265,59 +902,6 @@ def __exit__(self, *ignore_exc):
         sys.modules.update(self.original_modules)
 
 
-class EnvironmentVarGuard(collections.abc.MutableMapping):
-
-    """Class to help protect the environment variable properly.  Can be used as
-    a context manager."""
-
-    def __init__(self):
-        self._environ = os.environ
-        self._changed = {}
-
-    def __getitem__(self, envvar):
-        return self._environ[envvar]
-
-    def __setitem__(self, envvar, value):
-        # Remember the initial value on the first access
-        if envvar not in self._changed:
-            self._changed[envvar] = self._environ.get(envvar)
-        self._environ[envvar] = value
-
-    def __delitem__(self, envvar):
-        # Remember the initial value on the first access
-        if envvar not in self._changed:
-            self._changed[envvar] = self._environ.get(envvar)
-        if envvar in self._environ:
-            del self._environ[envvar]
-
-    def keys(self):
-        return self._environ.keys()
-
-    def __iter__(self):
-        return iter(self._environ)
-
-    def __len__(self):
-        return len(self._environ)
-
-    def set(self, envvar, value):
-        self[envvar] = value
-
-    def unset(self, envvar):
-        del self[envvar]
-
-    def __enter__(self):
-        return self
-
-    def __exit__(self, *ignore_exc):
-        for (k, v) in self._changed.items():
-            if v is None:
-                if k in self._environ:
-                    del self._environ[k]
-            else:
-                self._environ[k] = v
-        os.environ = self._environ
-
-
 class DirsOnSysPath(object):
     """Context manager to temporarily add directories to sys.path.
 
@@ -2133,28 +1717,6 @@ def match_value(self, k, dv, v):
         return result
 
 
-_can_symlink = None
-def can_symlink():
-    global _can_symlink
-    if _can_symlink is not None:
-        return _can_symlink
-    symlink_path = TESTFN + "can_symlink"
-    try:
-        os.symlink(TESTFN, symlink_path)
-        can = True
-    except (OSError, NotImplementedError, AttributeError):
-        can = False
-    else:
-        os.remove(symlink_path)
-    _can_symlink = can
-    return can
-
-def skip_unless_symlink(test):
-    """Skip decorator for tests that require functional symlink"""
-    ok = can_symlink()
-    msg = "Requires functional symlink implementation"
-    return test if ok else unittest.skip(msg)(test)
-
 _buggy_ucrt = None
 def skip_if_buggy_ucrt_strfptime(test):
     """
@@ -2256,45 +1818,6 @@ def call_link(self, *args, returncode=0):
         return self._call(self.link, args, self._env, returncode)
 
 
-_can_xattr = None
-def can_xattr():
-    import tempfile
-    global _can_xattr
-    if _can_xattr is not None:
-        return _can_xattr
-    if not hasattr(os, "setxattr"):
-        can = False
-    else:
-        import platform
-        tmp_dir = tempfile.mkdtemp()
-        tmp_fp, tmp_name = tempfile.mkstemp(dir=tmp_dir)
-        try:
-            with open(TESTFN, "wb") as fp:
-                try:
-                    # TESTFN & tempfile may use different file systems with
-                    # different capabilities
-                    os.setxattr(tmp_fp, b"user.test", b"")
-                    os.setxattr(tmp_name, b"trusted.foo", b"42")
-                    os.setxattr(fp.fileno(), b"user.test", b"")
-                    # Kernels < 2.6.39 don't respect setxattr flags.
-                    kernel_version = platform.release()
-                    m = re.match(r"2.6.(\d{1,2})", kernel_version)
-                    can = m is None or int(m.group(1)) >= 39
-                except OSError:
-                    can = False
-        finally:
-            unlink(TESTFN)
-            unlink(tmp_name)
-            rmdir(tmp_dir)
-    _can_xattr = can
-    return can
-
-def skip_unless_xattr(test):
-    """Skip decorator for tests that require functional extended attributes"""
-    ok = can_xattr()
-    msg = "no non-broken extended attribute support"
-    return test if ok else unittest.skip(msg)(test)
-
 def skip_if_pgo_task(test):
     """Skip decorator for tests not run in (non-extended) PGO task"""
     ok = not PGO or PGO_EXTENDED
@@ -2302,20 +1825,6 @@ def skip_if_pgo_task(test):
     return test if ok else unittest.skip(msg)(test)
 
 
-def fs_is_case_insensitive(directory):
-    """Detects if the file system for the specified directory is case-insensitive."""
-    import tempfile
-    with tempfile.NamedTemporaryFile(dir=directory) as base:
-        base_path = base.name
-        case_path = base_path.upper()
-        if case_path == base_path:
-            case_path = base_path.lower()
-        try:
-            return os.path.samefile(base_path, case_path)
-        except FileNotFoundError:
-            return False
-
-
 def detect_api_mismatch(ref_api, other_api, *, ignore=()):
     """Returns the set of items in ref_api not in other_api, except for a
     defined list of items to be ignored in this check.
@@ -2623,65 +2132,6 @@ def disable_faulthandler():
             faulthandler.enable(file=fd, all_threads=True)
 
 
-def fd_count():
-    """Count the number of open file descriptors.
-    """
-    if sys.platform.startswith(('linux', 'freebsd')):
-        try:
-            names = os.listdir("/proc/self/fd")
-            # Subtract one because listdir() internally opens a file
-            # descriptor to list the content of the /proc/self/fd/ directory.
-            return len(names) - 1
-        except FileNotFoundError:
-            pass
-
-    MAXFD = 256
-    if hasattr(os, 'sysconf'):
-        try:
-            MAXFD = os.sysconf("SC_OPEN_MAX")
-        except OSError:
-            pass
-
-    old_modes = None
-    if sys.platform == 'win32':
-        # bpo-25306, bpo-31009: Call CrtSetReportMode() to not kill the process
-        # on invalid file descriptor if Python is compiled in debug mode
-        try:
-            import msvcrt
-            msvcrt.CrtSetReportMode
-        except (AttributeError, ImportError):
-            # no msvcrt or a release build
-            pass
-        else:
-            old_modes = {}
-            for report_type in (msvcrt.CRT_WARN,
-                                msvcrt.CRT_ERROR,
-                                msvcrt.CRT_ASSERT):
-                old_modes[report_type] = msvcrt.CrtSetReportMode(report_type, 0)
-
-    try:
-        count = 0
-        for fd in range(MAXFD):
-            try:
-                # Prefer dup() over fstat(). fstat() can require input/output
-                # whereas dup() doesn't.
-                fd2 = os.dup(fd)
-            except OSError as e:
-                if e.errno != errno.EBADF:
-                    raise
-            else:
-                os.close(fd2)
-                count += 1
-    finally:
-        if old_modes is not None:
-            for report_type in (msvcrt.CRT_WARN,
-                                msvcrt.CRT_ERROR,
-                                msvcrt.CRT_ASSERT):
-                msvcrt.CrtSetReportMode(report_type, old_modes[report_type])
-
-    return count
-
-
 class SaveSignals:
     """
     Save and restore signal handlers.
@@ -2726,24 +2176,6 @@ def with_pymalloc():
     return _testcapi.WITH_PYMALLOC
 
 
-class FakePath:
-    """Simple implementing of the path protocol.
-    """
-    def __init__(self, path):
-        self.path = path
-
-    def __repr__(self):
-        return f'<FakePath {self.path!r}>'
-
-    def __fspath__(self):
-        if (isinstance(self.path, BaseException) or
-            isinstance(self.path, type) and
-                issubclass(self.path, BaseException)):
-            raise self.path
-        else:
-            return self.path
-
-
 class _ALWAYS_EQ:
     """
     Object that is equal to anything.
diff --git a/Lib/test/support/os_helper.py b/Lib/test/support/os_helper.py
new file mode 100644
index 0000000000000..d3347027cf204
--- /dev/null
+++ b/Lib/test/support/os_helper.py
@@ -0,0 +1,611 @@
+import collections.abc
+import contextlib
+import errno
+import os
+import re
+import stat
+import sys
+import time
+import unittest
+import warnings
+
+
+# Filename used for testing
+if os.name == 'java':
+    # Jython disallows @ in module names
+    TESTFN = '$test'
+else:
+    TESTFN = '@test'
+
+# Disambiguate TESTFN for parallel testing, while letting it remain a valid
+# module name.
+TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid())
+
+# TESTFN_UNICODE is a non-ascii filename
+TESTFN_UNICODE = TESTFN + "-\xe0\xf2\u0258\u0141\u011f"
+if sys.platform == 'darwin':
+    # In Mac OS X's VFS API file names are, by definition, canonically
+    # decomposed Unicode, encoded using UTF-8. See QA1173:
+    # http://developer.apple.com/mac/library/qa/qa2001/qa1173.html
+    import unicodedata
+    TESTFN_UNICODE = unicodedata.normalize('NFD', TESTFN_UNICODE)
+
+# TESTFN_UNENCODABLE is a filename (str type) that should *not* be able to be
+# encoded by the filesystem encoding (in strict mode). It can be None if we
+# cannot generate such filename.
+TESTFN_UNENCODABLE = None
+if os.name == 'nt':
+    # skip win32s (0) or Windows 9x/ME (1)
+    if sys.getwindowsversion().platform >= 2:
+        # Different kinds of characters from various languages to minimize the
+        # probability that the whole name is encodable to MBCS (issue #9819)
+        TESTFN_UNENCODABLE = TESTFN + "-\u5171\u0141\u2661\u0363\uDC80"
+        try:
+            TESTFN_UNENCODABLE.encode(sys.getfilesystemencoding())
+        except UnicodeEncodeError:
+            pass
+        else:
+            print('WARNING: The filename %r CAN be encoded by the filesystem '
+                  'encoding (%s). Unicode filename tests may not be effective'
+                  % (TESTFN_UNENCODABLE, sys.getfilesystemencoding()))
+            TESTFN_UNENCODABLE = None
+# Mac OS X denies unencodable filenames (invalid utf-8)
+elif sys.platform != 'darwin':
+    try:
+        # ascii and utf-8 cannot encode the byte 0xff
+        b'\xff'.decode(sys.getfilesystemencoding())
+    except UnicodeDecodeError:
+        # 0xff will be encoded using the surrogate character u+DCFF
+        TESTFN_UNENCODABLE = TESTFN \
+            + b'-\xff'.decode(sys.getfilesystemencoding(), 'surrogateescape')
+    else:
+        # File system encoding (eg. ISO-8859-* encodings) can encode
+        # the byte 0xff. Skip some unicode filename tests.
+        pass
+
+# FS_NONASCII: non-ASCII character encodable by os.fsencode(),
+# or None if there is no such character.
+FS_NONASCII = None
+for character in (
+    # First try printable and common characters to have a readable filename.
+    # For each character, the encoding list are just example of encodings able
+    # to encode the character (the list is not exhaustive).
+
+    # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1
+    '\u00E6',
+    # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3
+    '\u0130',
+    # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257
+    '\u0141',
+    # U+03C6 (Greek Small Letter Phi): cp1253
+    '\u03C6',
+    # U+041A (Cyrillic Capital Letter Ka): cp1251
+    '\u041A',
+    # U+05D0 (Hebrew Letter Alef): Encodable to cp424
+    '\u05D0',
+    # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic
+    '\u060C',
+    # U+062A (Arabic Letter Teh): cp720
+    '\u062A',
+    # U+0E01 (Thai Character Ko Kai): cp874
+    '\u0E01',
+
+    # Then try more "special" characters. "special" because they may be
+    # interpreted or displayed differently depending on the exact locale
+    # encoding and the font.
+
+    # U+00A0 (No-Break Space)
+    '\u00A0',
+    # U+20AC (Euro Sign)
+    '\u20AC',
+):
+    try:
+        # If Python is set up to use the legacy 'mbcs' in Windows,
+        # 'replace' error mode is used, and encode() returns b'?'
+        # for characters missing in the ANSI codepage
+        if os.fsdecode(os.fsencode(character)) != character:
+            raise UnicodeError
+    except UnicodeError:
+        pass
+    else:
+        FS_NONASCII = character
+        break
+
+# Save the initial cwd
+SAVEDCWD = os.getcwd()
+
+# TESTFN_UNDECODABLE is a filename (bytes type) that should *not* be able to be
+# decoded from the filesystem encoding (in strict mode). It can be None if we
+# cannot generate such filename (ex: the latin1 encoding can decode any byte
+# sequence). On UNIX, TESTFN_UNDECODABLE can be decoded by os.fsdecode() thanks
+# to the surrogateescape error handler (PEP 383), but not from the filesystem
+# encoding in strict mode.
+TESTFN_UNDECODABLE = None
+for name in (
+    # b'\xff' is not decodable by os.fsdecode() with code page 932. Windows
+    # accepts it to create a file or a directory, or don't accept to enter to
+    # such directory (when the bytes name is used). So test b'\xe7' first:
+    # it is not decodable from cp932.
+    b'\xe7w\xf0',
+    # undecodable from ASCII, UTF-8
+    b'\xff',
+    # undecodable from iso8859-3, iso8859-6, iso8859-7, cp424, iso8859-8, cp856
+    # and cp857
+    b'\xae\xd5'
+    # undecodable from UTF-8 (UNIX and Mac OS X)
+    b'\xed\xb2\x80', b'\xed\xb4\x80',
+    # undecodable from shift_jis, cp869, cp874, cp932, cp1250, cp1251, cp1252,
+    # cp1253, cp1254, cp1255, cp1257, cp1258
+    b'\x81\x98',
+):
+    try:
+        name.decode(sys.getfilesystemencoding())
+    except UnicodeDecodeError:
+        TESTFN_UNDECODABLE = os.fsencode(TESTFN) + name
+        break
+
+if FS_NONASCII:
+    TESTFN_NONASCII = TESTFN + '-' + FS_NONASCII
+else:
+    TESTFN_NONASCII = None
+
+
+def make_bad_fd():
+    """
+    Create an invalid file descriptor by opening and closing a file and return
+    its fd.
+    """
+    file = open(TESTFN, "wb")
+    try:
+        return file.fileno()
+    finally:
+        file.close()
+        unlink(TESTFN)
+
+
+_can_symlink = None
+
+
+def can_symlink():
+    global _can_symlink
+    if _can_symlink is not None:
+        return _can_symlink
+    symlink_path = TESTFN + "can_symlink"
+    try:
+        os.symlink(TESTFN, symlink_path)
+        can = True
+    except (OSError, NotImplementedError, AttributeError):
+        can = False
+    else:
+        os.remove(symlink_path)
+    _can_symlink = can
+    return can
+
+
+def skip_unless_symlink(test):
+    """Skip decorator for tests that require functional symlink"""
+    ok = can_symlink()
+    msg = "Requires functional symlink implementation"
+    return test if ok else unittest.skip(msg)(test)
+
+
+_can_xattr = None
+
+
+def can_xattr():
+    import tempfile
+    global _can_xattr
+    if _can_xattr is not None:
+        return _can_xattr
+    if not hasattr(os, "setxattr"):
+        can = False
+    else:
+        import platform
+        tmp_dir = tempfile.mkdtemp()
+        tmp_fp, tmp_name = tempfile.mkstemp(dir=tmp_dir)
+        try:
+            with open(TESTFN, "wb") as fp:
+                try:
+                    # TESTFN & tempfile may use different file systems with
+                    # different capabilities
+                    os.setxattr(tmp_fp, b"user.test", b"")
+                    os.setxattr(tmp_name, b"trusted.foo", b"42")
+                    os.setxattr(fp.fileno(), b"user.test", b"")
+                    # Kernels < 2.6.39 don't respect setxattr flags.
+                    kernel_version = platform.release()
+                    m = re.match(r"2.6.(\d{1,2})", kernel_version)
+                    can = m is None or int(m.group(1)) >= 39
+                except OSError:
+                    can = False
+        finally:
+            unlink(TESTFN)
+            unlink(tmp_name)
+            rmdir(tmp_dir)
+    _can_xattr = can
+    return can
+
+
+def skip_unless_xattr(test):
+    """Skip decorator for tests that require functional extended attributes"""
+    ok = can_xattr()
+    msg = "no non-broken extended attribute support"
+    return test if ok else unittest.skip(msg)(test)
+
+
+def unlink(filename):
+    try:
+        _unlink(filename)
+    except (FileNotFoundError, NotADirectoryError):
+        pass
+
+
+if sys.platform.startswith("win"):
+    def _waitfor(func, pathname, waitall=False):
+        # Perform the operation
+        func(pathname)
+        # Now setup the wait loop
+        if waitall:
+            dirname = pathname
+        else:
+            dirname, name = os.path.split(pathname)
+            dirname = dirname or '.'
+        # Check for `pathname` to be removed from the filesystem.
+        # The exponential backoff of the timeout amounts to a total
+        # of ~1 second after which the deletion is probably an error
+        # anyway.
+        # Testing on an i7 at 4.3GHz shows that usually only 1 iteration is
+        # required when contention occurs.
+        timeout = 0.001
+        while timeout < 1.0:
+            # Note we are only testing for the existence of the file(s) in
+            # the contents of the directory regardless of any security or
+            # access rights.  If we have made it this far, we have sufficient
+            # permissions to do that much using Python's equivalent of the
+            # Windows API FindFirstFile.
+            # Other Windows APIs can fail or give incorrect results when
+            # dealing with files that are pending deletion.
+            L = os.listdir(dirname)
+            if not (L if waitall else name in L):
+                return
+            # Increase the timeout and try again
+            time.sleep(timeout)
+            timeout *= 2
+        warnings.warn('tests may fail, delete still pending for ' + pathname,
+                      RuntimeWarning, stacklevel=4)
+
+    def _unlink(filename):
+        _waitfor(os.unlink, filename)
+
+    def _rmdir(dirname):
+        _waitfor(os.rmdir, dirname)
+
+    def _rmtree(path):
+        from test.support import _force_run
+
+        def _rmtree_inner(path):
+            for name in _force_run(path, os.listdir, path):
+                fullname = os.path.join(path, name)
+                try:
+                    mode = os.lstat(fullname).st_mode
+                except OSError as exc:
+                    print("support.rmtree(): os.lstat(%r) failed with %s"
+                          % (fullname, exc),
+                          file=sys.__stderr__)
+                    mode = 0
+                if stat.S_ISDIR(mode):
+                    _waitfor(_rmtree_inner, fullname, waitall=True)
+                    _force_run(fullname, os.rmdir, fullname)
+                else:
+                    _force_run(fullname, os.unlink, fullname)
+        _waitfor(_rmtree_inner, path, waitall=True)
+        _waitfor(lambda p: _force_run(p, os.rmdir, p), path)
+
+    def _longpath(path):
+        try:
+            import ctypes
+        except ImportError:
+            # No ctypes means we can't expands paths.
+            pass
+        else:
+            buffer = ctypes.create_unicode_buffer(len(path) * 2)
+            length = ctypes.windll.kernel32.GetLongPathNameW(path, buffer,
+                                                             len(buffer))
+            if length:
+                return buffer[:length]
+        return path
+else:
+    _unlink = os.unlink
+    _rmdir = os.rmdir
+
+    def _rmtree(path):
+        import shutil
+        try:
+            shutil.rmtree(path)
+            return
+        except OSError:
+            pass
+
+        def _rmtree_inner(path):
+            from test.support import _force_run
+            for name in _force_run(path, os.listdir, path):
+                fullname = os.path.join(path, name)
+                try:
+                    mode = os.lstat(fullname).st_mode
+                except OSError:
+                    mode = 0
+                if stat.S_ISDIR(mode):
+                    _rmtree_inner(fullname)
+                    _force_run(path, os.rmdir, fullname)
+                else:
+                    _force_run(path, os.unlink, fullname)
+        _rmtree_inner(path)
+        os.rmdir(path)
+
+    def _longpath(path):
+        return path
+
+
+def rmdir(dirname):
+    try:
+        _rmdir(dirname)
+    except FileNotFoundError:
+        pass
+
+
+def rmtree(path):
+    try:
+        _rmtree(path)
+    except FileNotFoundError:
+        pass
+
+
+ at contextlib.contextmanager
+def temp_dir(path=None, quiet=False):
+    """Return a context manager that creates a temporary directory.
+
+    Arguments:
+
+      path: the directory to create temporarily.  If omitted or None,
+        defaults to creating a temporary directory using tempfile.mkdtemp.
+
+      quiet: if False (the default), the context manager raises an exception
+        on error.  Otherwise, if the path is specified and cannot be
+        created, only a warning is issued.
+
+    """
+    import tempfile
+    dir_created = False
+    if path is None:
+        path = tempfile.mkdtemp()
+        dir_created = True
+        path = os.path.realpath(path)
+    else:
+        try:
+            os.mkdir(path)
+            dir_created = True
+        except OSError as exc:
+            if not quiet:
+                raise
+            warnings.warn(f'tests may fail, unable to create '
+                          f'temporary directory {path!r}: {exc}',
+                          RuntimeWarning, stacklevel=3)
+    if dir_created:
+        pid = os.getpid()
+    try:
+        yield path
+    finally:
+        # In case the process forks, let only the parent remove the
+        # directory. The child has a different process id. (bpo-30028)
+        if dir_created and pid == os.getpid():
+            rmtree(path)
+
+
+ at contextlib.contextmanager
+def change_cwd(path, quiet=False):
+    """Return a context manager that changes the current working directory.
+
+    Arguments:
+
+      path: the directory to use as the temporary current working directory.
+
+      quiet: if False (the default), the context manager raises an exception
+        on error.  Otherwise, it issues only a warning and keeps the current
+        working directory the same.
+
+    """
+    saved_dir = os.getcwd()
+    try:
+        os.chdir(os.path.realpath(path))
+    except OSError as exc:
+        if not quiet:
+            raise
+        warnings.warn(f'tests may fail, unable to change the current working '
+                      f'directory to {path!r}: {exc}',
+                      RuntimeWarning, stacklevel=3)
+    try:
+        yield os.getcwd()
+    finally:
+        os.chdir(saved_dir)
+
+
+ at contextlib.contextmanager
+def temp_cwd(name='tempcwd', quiet=False):
+    """
+    Context manager that temporarily creates and changes the CWD.
+
+    The function temporarily changes the current working directory
+    after creating a temporary directory in the current directory with
+    name *name*.  If *name* is None, the temporary directory is
+    created using tempfile.mkdtemp.
+
+    If *quiet* is False (default) and it is not possible to
+    create or change the CWD, an error is raised.  If *quiet* is True,
+    only a warning is raised and the original CWD is used.
+
+    """
+    with temp_dir(path=name, quiet=quiet) as temp_path:
+        with change_cwd(temp_path, quiet=quiet) as cwd_dir:
+            yield cwd_dir
+
+
+def create_empty_file(filename):
+    """Create an empty file. If the file already exists, truncate it."""
+    fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
+    os.close(fd)
+
+
+def fs_is_case_insensitive(directory):
+    """Detects if the file system for the specified directory
+    is case-insensitive."""
+    import tempfile
+    with tempfile.NamedTemporaryFile(dir=directory) as base:
+        base_path = base.name
+        case_path = base_path.upper()
+        if case_path == base_path:
+            case_path = base_path.lower()
+        try:
+            return os.path.samefile(base_path, case_path)
+        except FileNotFoundError:
+            return False
+
+
+class FakePath:
+    """Simple implementing of the path protocol.
+    """
+    def __init__(self, path):
+        self.path = path
+
+    def __repr__(self):
+        return f'<FakePath {self.path!r}>'
+
+    def __fspath__(self):
+        if (isinstance(self.path, BaseException) or
+            isinstance(self.path, type) and
+                issubclass(self.path, BaseException)):
+            raise self.path
+        else:
+            return self.path
+
+
+def fd_count():
+    """Count the number of open file descriptors.
+    """
+    if sys.platform.startswith(('linux', 'freebsd')):
+        try:
+            names = os.listdir("/proc/self/fd")
+            # Subtract one because listdir() internally opens a file
+            # descriptor to list the content of the /proc/self/fd/ directory.
+            return len(names) - 1
+        except FileNotFoundError:
+            pass
+
+    MAXFD = 256
+    if hasattr(os, 'sysconf'):
+        try:
+            MAXFD = os.sysconf("SC_OPEN_MAX")
+        except OSError:
+            pass
+
+    old_modes = None
+    if sys.platform == 'win32':
+        # bpo-25306, bpo-31009: Call CrtSetReportMode() to not kill the process
+        # on invalid file descriptor if Python is compiled in debug mode
+        try:
+            import msvcrt
+            msvcrt.CrtSetReportMode
+        except (AttributeError, ImportError):
+            # no msvcrt or a release build
+            pass
+        else:
+            old_modes = {}
+            for report_type in (msvcrt.CRT_WARN,
+                                msvcrt.CRT_ERROR,
+                                msvcrt.CRT_ASSERT):
+                old_modes[report_type] = msvcrt.CrtSetReportMode(report_type,
+                                                                 0)
+
+    try:
+        count = 0
+        for fd in range(MAXFD):
+            try:
+                # Prefer dup() over fstat(). fstat() can require input/output
+                # whereas dup() doesn't.
+                fd2 = os.dup(fd)
+            except OSError as e:
+                if e.errno != errno.EBADF:
+                    raise
+            else:
+                os.close(fd2)
+                count += 1
+    finally:
+        if old_modes is not None:
+            for report_type in (msvcrt.CRT_WARN,
+                                msvcrt.CRT_ERROR,
+                                msvcrt.CRT_ASSERT):
+                msvcrt.CrtSetReportMode(report_type, old_modes[report_type])
+
+    return count
+
+
+if hasattr(os, "umask"):
+    @contextlib.contextmanager
+    def temp_umask(umask):
+        """Context manager that temporarily sets the process umask."""
+        oldmask = os.umask(umask)
+        try:
+            yield
+        finally:
+            os.umask(oldmask)
+
+
+class EnvironmentVarGuard(collections.abc.MutableMapping):
+
+    """Class to help protect the environment variable properly.  Can be used as
+    a context manager."""
+
+    def __init__(self):
+        self._environ = os.environ
+        self._changed = {}
+
+    def __getitem__(self, envvar):
+        return self._environ[envvar]
+
+    def __setitem__(self, envvar, value):
+        # Remember the initial value on the first access
+        if envvar not in self._changed:
+            self._changed[envvar] = self._environ.get(envvar)
+        self._environ[envvar] = value
+
+    def __delitem__(self, envvar):
+        # Remember the initial value on the first access
+        if envvar not in self._changed:
+            self._changed[envvar] = self._environ.get(envvar)
+        if envvar in self._environ:
+            del self._environ[envvar]
+
+    def keys(self):
+        return self._environ.keys()
+
+    def __iter__(self):
+        return iter(self._environ)
+
+    def __len__(self):
+        return len(self._environ)
+
+    def set(self, envvar, value):
+        self[envvar] = value
+
+    def unset(self, envvar):
+        del self[envvar]
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *ignore_exc):
+        for (k, v) in self._changed.items():
+            if v is None:
+                if k in self._environ:
+                    del self._environ[k]
+            else:
+                self._environ[k] = v
+        os.environ = self._environ



More information about the Python-checkins mailing list