[Python-checkins] bpo-40275: Add os_helper submodule in test.support (GH-20765)
Hai Shi
webhook-mailer at python.org
Wed Jun 10 08:29:07 EDT 2020
https://github.com/python/cpython/commit/0d00b2a5d74390da7bbeff7dfa73abf2eb46124a
commit: 0d00b2a5d74390da7bbeff7dfa73abf2eb46124a
branch: master
author: Hai Shi <shihai1992 at gmail.com>
committer: GitHub <noreply at github.com>
date: 2020-06-10T14:29:02+02:00
summary:
bpo-40275: Add os_helper submodule in test.support (GH-20765)
files:
A Lib/test/support/os_helper.py
M Doc/library/test.rst
M Lib/test/support/__init__.py
diff --git a/Doc/library/test.rst b/Doc/library/test.rst
index 7580fb5e9b174..11d748466cba2 100644
--- a/Doc/library/test.rst
+++ b/Doc/library/test.rst
@@ -247,41 +247,6 @@ The :mod:`test.support` module defines the following constants:
Path for shell if not on Windows; otherwise ``None``.
-.. data:: FS_NONASCII
-
- A non-ASCII character encodable by :func:`os.fsencode`.
-
-
-.. data:: TESTFN
-
- Set to a name that is safe to use as the name of a temporary file. Any
- temporary file that is created should be closed and unlinked (removed).
-
-
-.. data:: TESTFN_UNICODE
-
- Set to a non-ASCII name for a temporary file.
-
-
-.. data:: TESTFN_UNENCODABLE
-
- Set to a filename (str type) that should not be able to be encoded by file
- system encoding in strict mode. It may be ``None`` if it's not possible to
- generate such a filename.
-
-
-.. data:: TESTFN_UNDECODABLE
-
- Set to a filename (bytes type) that should not be able to be decoded by
- file system encoding in strict mode. It may be ``None`` if it's not
- possible to generate such a filename.
-
-
-.. data:: TESTFN_NONASCII
-
- Set to a filename containing the :data:`FS_NONASCII` character.
-
-
.. data:: LOOPBACK_TIMEOUT
Timeout in seconds for tests using a network server listening on the network
@@ -343,11 +308,6 @@ The :mod:`test.support` module defines the following constants:
:data:`SHORT_TIMEOUT`.
-.. data:: SAVEDCWD
-
- Set to :func:`os.getcwd`.
-
-
.. data:: PGO
Set when tests can be skipped when they are not useful for PGO.
@@ -449,25 +409,6 @@ The :mod:`test.support` module defines the following functions:
Delete *name* from ``sys.modules``.
-.. function:: unlink(filename)
-
- Call :func:`os.unlink` on *filename*. On Windows platforms, this is
- wrapped with a wait loop that checks for the existence fo the file.
-
-
-.. function:: rmdir(filename)
-
- Call :func:`os.rmdir` on *filename*. On Windows platforms, this is
- wrapped with a wait loop that checks for the existence of the file.
-
-
-.. function:: rmtree(path)
-
- Call :func:`shutil.rmtree` on *path* or call :func:`os.lstat` and
- :func:`os.rmdir` to remove a path and its contents. On Windows platforms,
- this is wrapped with a wait loop that checks for the existence of the files.
-
-
.. function:: make_legacy_pyc(source)
Move a :pep:`3147`/:pep:`488` pyc file to its legacy pyc location and return the file
@@ -521,16 +462,6 @@ The :mod:`test.support` module defines the following functions:
rather than looking directly in the path directories.
-.. function:: create_empty_file(filename)
-
- Create an empty file with *filename*. If it already exists, truncate it.
-
-
-.. function:: fd_count()
-
- Count the number of open file descriptors.
-
-
.. function:: match_test(test)
Match *test* to patterns set in :func:`set_match_tests`.
@@ -713,47 +644,6 @@ The :mod:`test.support` module defines the following functions:
self.assertEqual(captured, "hello")
-.. function:: temp_dir(path=None, quiet=False)
-
- A context manager that creates a temporary directory at *path* and
- yields the directory.
-
- If *path* is ``None``, the temporary directory is created using
- :func:`tempfile.mkdtemp`. If *quiet* is ``False``, the context manager
- raises an exception on error. Otherwise, if *path* is specified and
- cannot be created, only a warning is issued.
-
-
-.. function:: change_cwd(path, quiet=False)
-
- A context manager that temporarily changes the current working
- directory to *path* and yields the directory.
-
- If *quiet* is ``False``, the context manager raises an exception
- on error. Otherwise, it issues only a warning and keeps the current
- working directory the same.
-
-
-.. function:: temp_cwd(name='tempcwd', quiet=False)
-
- A context manager that temporarily creates a new directory and
- changes the current working directory (CWD).
-
- The context manager creates a temporary directory in the current
- directory with name *name* before temporarily changing the current
- working directory. If *name* is ``None``, the temporary directory is
- created using :func:`tempfile.mkdtemp`.
-
- If *quiet* is ``False`` and it is not possible to create or change
- the CWD, an error is raised. Otherwise, only a warning is raised
- and the original CWD is used.
-
-
-.. function:: temp_umask(umask)
-
- A context manager that temporarily sets the process umask.
-
-
.. function:: disable_faulthandler()
A context manager that replaces ``sys.stderr`` with ``sys.__stderr__``.
@@ -851,28 +741,6 @@ The :mod:`test.support` module defines the following functions:
header size equals *size*.
-.. function:: can_symlink()
-
- Return ``True`` if the OS supports symbolic links, ``False``
- otherwise.
-
-
-.. function:: can_xattr()
-
- Return ``True`` if the OS supports xattr, ``False``
- otherwise.
-
-
-.. decorator:: skip_unless_symlink
-
- A decorator for running tests that require support for symbolic links.
-
-
-.. decorator:: skip_unless_xattr
-
- A decorator for running tests that require support for xattr.
-
-
.. decorator:: anticipate_failure(condition)
A decorator to conditionally mark tests with
@@ -992,12 +860,6 @@ The :mod:`test.support` module defines the following functions:
wrap.
-.. function:: make_bad_fd()
-
- Create an invalid file descriptor by opening and closing a temporary file,
- and returning its descriptor.
-
-
.. function:: check_syntax_error(testcase, statement, errtext='', *, lineno=None, offset=None)
Test for syntax errors in *statement* by attempting to compile *statement*.
@@ -1144,11 +1006,6 @@ The :mod:`test.support` module defines the following functions:
return load_package_tests(os.path.dirname(__file__), *args)
-.. function:: fs_is_case_insensitive(directory)
-
- Return ``True`` if the file system for *directory* is case-insensitive.
-
-
.. function:: detect_api_mismatch(ref_api, other_api, *, ignore=())
Returns the set of attributes, functions or methods of *ref_api* not
@@ -1241,28 +1098,6 @@ The :mod:`test.support` module defines the following classes:
attributes on the exception is :exc:`ResourceDenied` raised.
-.. class:: EnvironmentVarGuard()
-
- Class used to temporarily set or unset environment variables. Instances can
- be used as a context manager and have a complete dictionary interface for
- querying/modifying the underlying ``os.environ``. After exit from the
- context manager all changes to environment variables done through this
- instance will be rolled back.
-
- .. versionchanged:: 3.1
- Added dictionary interface.
-
-.. method:: EnvironmentVarGuard.set(envvar, value)
-
- Temporarily set the environment variable ``envvar`` to the value of
- ``value``.
-
-
-.. method:: EnvironmentVarGuard.unset(envvar)
-
- Temporarily unset the environment variable ``envvar``.
-
-
.. class:: SuppressCrashReport()
A context manager used to try to prevent crash dialog popups on tests that
@@ -1332,13 +1167,6 @@ The :mod:`test.support` module defines the following classes:
Run *test* and return the result.
-.. class:: FakePath(path)
-
- Simple :term:`path-like object`. It implements the :meth:`__fspath__`
- method which just returns the *path* argument. If *path* is an exception,
- it will be raised in :meth:`!__fspath__`.
-
-
:mod:`test.support.socket_helper` --- Utilities for socket tests
================================================================
@@ -1634,3 +1462,187 @@ The :mod:`test.support.threading_helper` module provides support for threading t
# (to avoid reference cycles)
.. versionadded:: 3.8
+
+
+:mod:`test.support.os_helper` --- Utilities for os tests
+========================================================================
+
+.. module:: test.support.os_helper
+ :synopsis: Support for os tests.
+
+The :mod:`test.support.os_helper` module provides support for os tests.
+
+.. versionadded:: 3.10
+
+
+.. data:: FS_NONASCII
+
+ A non-ASCII character encodable by :func:`os.fsencode`.
+
+
+.. data:: SAVEDCWD
+
+ Set to :func:`os.getcwd`.
+
+
+.. data:: TESTFN
+
+ Set to a name that is safe to use as the name of a temporary file. Any
+ temporary file that is created should be closed and unlinked (removed).
+
+
+.. data:: TESTFN_NONASCII
+
+ Set to a filename containing the :data:`FS_NONASCII` character.
+
+
+.. data:: TESTFN_UNENCODABLE
+
+ Set to a filename (str type) that should not be able to be encoded by file
+ system encoding in strict mode. It may be ``None`` if it's not possible to
+ generate such a filename.
+
+
+.. data:: TESTFN_UNDECODABLE
+
+ Set to a filename (bytes type) that should not be able to be decoded by
+ file system encoding in strict mode. It may be ``None`` if it's not
+ possible to generate such a filename.
+
+
+.. data:: TESTFN_UNICODE
+
+ Set to a non-ASCII name for a temporary file.
+
+
+.. class:: EnvironmentVarGuard()
+
+ Class used to temporarily set or unset environment variables. Instances can
+ be used as a context manager and have a complete dictionary interface for
+ querying/modifying the underlying ``os.environ``. After exit from the
+ context manager all changes to environment variables done through this
+ instance will be rolled back.
+
+ .. versionchanged:: 3.1
+ Added dictionary interface.
+
+
+.. class:: FakePath(path)
+
+ Simple :term:`path-like object`. It implements the :meth:`__fspath__`
+ method which just returns the *path* argument. If *path* is an exception,
+ it will be raised in :meth:`!__fspath__`.
+
+
+.. method:: EnvironmentVarGuard.set(envvar, value)
+
+ Temporarily set the environment variable ``envvar`` to the value of
+ ``value``.
+
+
+.. method:: EnvironmentVarGuard.unset(envvar)
+
+ Temporarily unset the environment variable ``envvar``.
+
+
+.. function:: can_symlink()
+
+ Return ``True`` if the OS supports symbolic links, ``False``
+ otherwise.
+
+
+.. function:: can_xattr()
+
+ Return ``True`` if the OS supports xattr, ``False``
+ otherwise.
+
+
+.. function:: change_cwd(path, quiet=False)
+
+ A context manager that temporarily changes the current working
+ directory to *path* and yields the directory.
+
+ If *quiet* is ``False``, the context manager raises an exception
+ on error. Otherwise, it issues only a warning and keeps the current
+ working directory the same.
+
+
+.. function:: create_empty_file(filename)
+
+ Create an empty file with *filename*. If it already exists, truncate it.
+
+
+.. function:: fd_count()
+
+ Count the number of open file descriptors.
+
+
+.. function:: fs_is_case_insensitive(directory)
+
+ Return ``True`` if the file system for *directory* is case-insensitive.
+
+
+.. function:: make_bad_fd()
+
+ Create an invalid file descriptor by opening and closing a temporary file,
+ and returning its descriptor.
+
+
+.. function:: rmdir(filename)
+
+ Call :func:`os.rmdir` on *filename*. On Windows platforms, this is
+ wrapped with a wait loop that checks for the existence of the file.
+
+
+.. function:: rmtree(path)
+
+ Call :func:`shutil.rmtree` on *path* or call :func:`os.lstat` and
+ :func:`os.rmdir` to remove a path and its contents. On Windows platforms,
+ this is wrapped with a wait loop that checks for the existence of the files.
+
+
+.. decorator:: skip_unless_symlink
+
+ A decorator for running tests that require support for symbolic links.
+
+
+.. decorator:: skip_unless_xattr
+
+ A decorator for running tests that require support for xattr.
+
+
+.. function:: temp_cwd(name='tempcwd', quiet=False)
+
+ A context manager that temporarily creates a new directory and
+ changes the current working directory (CWD).
+
+ The context manager creates a temporary directory in the current
+ directory with name *name* before temporarily changing the current
+ working directory. If *name* is ``None``, the temporary directory is
+ created using :func:`tempfile.mkdtemp`.
+
+ If *quiet* is ``False`` and it is not possible to create or change
+ the CWD, an error is raised. Otherwise, only a warning is raised
+ and the original CWD is used.
+
+
+.. function:: temp_dir(path=None, quiet=False)
+
+ A context manager that creates a temporary directory at *path* and
+ yields the directory.
+
+ If *path* is ``None``, the temporary directory is created using
+ :func:`tempfile.mkdtemp`. If *quiet* is ``False``, the context manager
+ raises an exception on error. Otherwise, if *path* is specified and
+ cannot be created, only a warning is issued.
+
+
+.. function:: temp_umask(umask)
+
+ A context manager that temporarily sets the process umask.
+
+
+.. function:: unlink(filename)
+
+ Call :func:`os.unlink` on *filename*. On Windows platforms, this is
+ wrapped with a wait loop that checks for the existence fo the file.
diff --git a/Lib/test/support/__init__.py b/Lib/test/support/__init__.py
index bb905bd895de8..3a5f7b556d767 100644
--- a/Lib/test/support/__init__.py
+++ b/Lib/test/support/__init__.py
@@ -3,7 +3,6 @@
if __name__ != 'test.support':
raise ImportError('support must be imported from the test package')
-import collections.abc
import contextlib
import errno
import fnmatch
@@ -22,8 +21,19 @@
import unittest
import warnings
+from .os_helper import (
+ FS_NONASCII, SAVEDCWD, TESTFN, TESTFN_NONASCII,
+ TESTFN_UNENCODABLE, TESTFN_UNDECODABLE,
+ TESTFN_UNICODE, can_symlink, can_xattr,
+ change_cwd, create_empty_file, fd_count,
+ fs_is_case_insensitive, make_bad_fd, rmdir,
+ rmtree, skip_unless_symlink, skip_unless_xattr,
+ temp_cwd, temp_dir, temp_umask, unlink,
+ EnvironmentVarGuard, FakePath, _longpath)
+
from .testresult import get_test_runner
+
__all__ = [
# globals
"PIPE_MAX_SIZE", "verbose", "max_memuse", "use_resources", "failfast",
@@ -36,18 +46,15 @@
# io
"record_original_stdout", "get_original_stdout", "captured_stdout",
"captured_stdin", "captured_stderr",
- # filesystem
- "TESTFN", "SAVEDCWD", "unlink", "rmtree", "temp_cwd", "findfile",
- "create_empty_file", "can_symlink", "fs_is_case_insensitive",
# unittest
"is_resource_enabled", "requires", "requires_freebsd_version",
"requires_linux_version", "requires_mac_ver",
"check_syntax_error", "check_syntax_warning",
"TransientResource", "time_out", "socket_peer_reset", "ioerror_peer_reset",
"BasicTestRunner", "run_unittest", "run_doctest",
- "skip_unless_symlink", "requires_gzip", "requires_bz2", "requires_lzma",
+ "requires_gzip", "requires_bz2", "requires_lzma",
"bigmemtest", "bigaddrspacetest", "cpython_only", "get_attribute",
- "requires_IEEE_754", "skip_unless_xattr", "requires_zlib",
+ "requires_IEEE_754", "requires_zlib",
"anticipate_failure", "load_package_tests", "detect_api_mismatch",
"check__all__", "skip_if_buggy_ucrt_strfptime",
"ignore_warnings",
@@ -57,13 +64,12 @@
# network
"open_urlresource",
# processes
- 'temp_umask', "reap_children",
+ "reap_children",
# miscellaneous
"check_warnings", "check_no_resource_warning", "check_no_warnings",
- "EnvironmentVarGuard",
- "run_with_locale", "swap_item",
+ "run_with_locale", "swap_item", "findfile",
"swap_attr", "Matcher", "set_memlimit", "SuppressCrashReport", "sortdict",
- "run_with_tz", "PGO", "missing_compiler_executable", "fd_count",
+ "run_with_tz", "PGO", "missing_compiler_executable",
"ALWAYS_EQ", "NEVER_EQ", "LARGEST", "SMALLEST",
"LOOPBACK_TIMEOUT", "INTERNET_TIMEOUT", "SHORT_TIMEOUT", "LONG_TIMEOUT",
]
@@ -318,6 +324,7 @@ def unload(name):
except KeyError:
pass
+
def _force_run(path, func, *args):
try:
return func(*args)
@@ -328,124 +335,6 @@ def _force_run(path, func, *args):
os.chmod(path, stat.S_IRWXU)
return func(*args)
-if sys.platform.startswith("win"):
- def _waitfor(func, pathname, waitall=False):
- # Perform the operation
- func(pathname)
- # Now setup the wait loop
- if waitall:
- dirname = pathname
- else:
- dirname, name = os.path.split(pathname)
- dirname = dirname or '.'
- # Check for `pathname` to be removed from the filesystem.
- # The exponential backoff of the timeout amounts to a total
- # of ~1 second after which the deletion is probably an error
- # anyway.
- # Testing on an i7 at 4.3GHz shows that usually only 1 iteration is
- # required when contention occurs.
- timeout = 0.001
- while timeout < 1.0:
- # Note we are only testing for the existence of the file(s) in
- # the contents of the directory regardless of any security or
- # access rights. If we have made it this far, we have sufficient
- # permissions to do that much using Python's equivalent of the
- # Windows API FindFirstFile.
- # Other Windows APIs can fail or give incorrect results when
- # dealing with files that are pending deletion.
- L = os.listdir(dirname)
- if not (L if waitall else name in L):
- return
- # Increase the timeout and try again
- time.sleep(timeout)
- timeout *= 2
- warnings.warn('tests may fail, delete still pending for ' + pathname,
- RuntimeWarning, stacklevel=4)
-
- def _unlink(filename):
- _waitfor(os.unlink, filename)
-
- def _rmdir(dirname):
- _waitfor(os.rmdir, dirname)
-
- def _rmtree(path):
- def _rmtree_inner(path):
- for name in _force_run(path, os.listdir, path):
- fullname = os.path.join(path, name)
- try:
- mode = os.lstat(fullname).st_mode
- except OSError as exc:
- print("support.rmtree(): os.lstat(%r) failed with %s" % (fullname, exc),
- file=sys.__stderr__)
- mode = 0
- if stat.S_ISDIR(mode):
- _waitfor(_rmtree_inner, fullname, waitall=True)
- _force_run(fullname, os.rmdir, fullname)
- else:
- _force_run(fullname, os.unlink, fullname)
- _waitfor(_rmtree_inner, path, waitall=True)
- _waitfor(lambda p: _force_run(p, os.rmdir, p), path)
-
- def _longpath(path):
- try:
- import ctypes
- except ImportError:
- # No ctypes means we can't expands paths.
- pass
- else:
- buffer = ctypes.create_unicode_buffer(len(path) * 2)
- length = ctypes.windll.kernel32.GetLongPathNameW(path, buffer,
- len(buffer))
- if length:
- return buffer[:length]
- return path
-else:
- _unlink = os.unlink
- _rmdir = os.rmdir
-
- def _rmtree(path):
- import shutil
- try:
- shutil.rmtree(path)
- return
- except OSError:
- pass
-
- def _rmtree_inner(path):
- for name in _force_run(path, os.listdir, path):
- fullname = os.path.join(path, name)
- try:
- mode = os.lstat(fullname).st_mode
- except OSError:
- mode = 0
- if stat.S_ISDIR(mode):
- _rmtree_inner(fullname)
- _force_run(path, os.rmdir, fullname)
- else:
- _force_run(path, os.unlink, fullname)
- _rmtree_inner(path)
- os.rmdir(path)
-
- def _longpath(path):
- return path
-
-def unlink(filename):
- try:
- _unlink(filename)
- except (FileNotFoundError, NotADirectoryError):
- pass
-
-def rmdir(dirname):
- try:
- _rmdir(dirname)
- except FileNotFoundError:
- pass
-
-def rmtree(path):
- try:
- _rmtree(path)
- except FileNotFoundError:
- pass
def make_legacy_pyc(source):
"""Move a PEP 3147/488 pyc file to its legacy pyc location.
@@ -714,149 +603,10 @@ def requires_lzma(reason='requires lzma'):
else:
unix_shell = None
-# Filename used for testing
-if os.name == 'java':
- # Jython disallows @ in module names
- TESTFN = '$test'
-else:
- TESTFN = '@test'
-
-# Disambiguate TESTFN for parallel testing, while letting it remain a valid
-# module name.
-TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid())
-
# Define the URL of a dedicated HTTP server for the network tests.
# The URL must use clear-text HTTP: no redirection to encrypted HTTPS.
TEST_HTTP_URL = "http://www.pythontest.net"
-# FS_NONASCII: non-ASCII character encodable by os.fsencode(),
-# or None if there is no such character.
-FS_NONASCII = None
-for character in (
- # First try printable and common characters to have a readable filename.
- # For each character, the encoding list are just example of encodings able
- # to encode the character (the list is not exhaustive).
-
- # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1
- '\u00E6',
- # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3
- '\u0130',
- # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257
- '\u0141',
- # U+03C6 (Greek Small Letter Phi): cp1253
- '\u03C6',
- # U+041A (Cyrillic Capital Letter Ka): cp1251
- '\u041A',
- # U+05D0 (Hebrew Letter Alef): Encodable to cp424
- '\u05D0',
- # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic
- '\u060C',
- # U+062A (Arabic Letter Teh): cp720
- '\u062A',
- # U+0E01 (Thai Character Ko Kai): cp874
- '\u0E01',
-
- # Then try more "special" characters. "special" because they may be
- # interpreted or displayed differently depending on the exact locale
- # encoding and the font.
-
- # U+00A0 (No-Break Space)
- '\u00A0',
- # U+20AC (Euro Sign)
- '\u20AC',
-):
- try:
- # If Python is set up to use the legacy 'mbcs' in Windows,
- # 'replace' error mode is used, and encode() returns b'?'
- # for characters missing in the ANSI codepage
- if os.fsdecode(os.fsencode(character)) != character:
- raise UnicodeError
- except UnicodeError:
- pass
- else:
- FS_NONASCII = character
- break
-
-# TESTFN_UNICODE is a non-ascii filename
-TESTFN_UNICODE = TESTFN + "-\xe0\xf2\u0258\u0141\u011f"
-if sys.platform == 'darwin':
- # In Mac OS X's VFS API file names are, by definition, canonically
- # decomposed Unicode, encoded using UTF-8. See QA1173:
- # http://developer.apple.com/mac/library/qa/qa2001/qa1173.html
- import unicodedata
- TESTFN_UNICODE = unicodedata.normalize('NFD', TESTFN_UNICODE)
-
-# TESTFN_UNENCODABLE is a filename (str type) that should *not* be able to be
-# encoded by the filesystem encoding (in strict mode). It can be None if we
-# cannot generate such filename.
-TESTFN_UNENCODABLE = None
-if os.name == 'nt':
- # skip win32s (0) or Windows 9x/ME (1)
- if sys.getwindowsversion().platform >= 2:
- # Different kinds of characters from various languages to minimize the
- # probability that the whole name is encodable to MBCS (issue #9819)
- TESTFN_UNENCODABLE = TESTFN + "-\u5171\u0141\u2661\u0363\uDC80"
- try:
- TESTFN_UNENCODABLE.encode(sys.getfilesystemencoding())
- except UnicodeEncodeError:
- pass
- else:
- print('WARNING: The filename %r CAN be encoded by the filesystem encoding (%s). '
- 'Unicode filename tests may not be effective'
- % (TESTFN_UNENCODABLE, sys.getfilesystemencoding()))
- TESTFN_UNENCODABLE = None
-# Mac OS X denies unencodable filenames (invalid utf-8)
-elif sys.platform != 'darwin':
- try:
- # ascii and utf-8 cannot encode the byte 0xff
- b'\xff'.decode(sys.getfilesystemencoding())
- except UnicodeDecodeError:
- # 0xff will be encoded using the surrogate character u+DCFF
- TESTFN_UNENCODABLE = TESTFN \
- + b'-\xff'.decode(sys.getfilesystemencoding(), 'surrogateescape')
- else:
- # File system encoding (eg. ISO-8859-* encodings) can encode
- # the byte 0xff. Skip some unicode filename tests.
- pass
-
-# TESTFN_UNDECODABLE is a filename (bytes type) that should *not* be able to be
-# decoded from the filesystem encoding (in strict mode). It can be None if we
-# cannot generate such filename (ex: the latin1 encoding can decode any byte
-# sequence). On UNIX, TESTFN_UNDECODABLE can be decoded by os.fsdecode() thanks
-# to the surrogateescape error handler (PEP 383), but not from the filesystem
-# encoding in strict mode.
-TESTFN_UNDECODABLE = None
-for name in (
- # b'\xff' is not decodable by os.fsdecode() with code page 932. Windows
- # accepts it to create a file or a directory, or don't accept to enter to
- # such directory (when the bytes name is used). So test b'\xe7' first: it is
- # not decodable from cp932.
- b'\xe7w\xf0',
- # undecodable from ASCII, UTF-8
- b'\xff',
- # undecodable from iso8859-3, iso8859-6, iso8859-7, cp424, iso8859-8, cp856
- # and cp857
- b'\xae\xd5'
- # undecodable from UTF-8 (UNIX and Mac OS X)
- b'\xed\xb2\x80', b'\xed\xb4\x80',
- # undecodable from shift_jis, cp869, cp874, cp932, cp1250, cp1251, cp1252,
- # cp1253, cp1254, cp1255, cp1257, cp1258
- b'\x81\x98',
-):
- try:
- name.decode(sys.getfilesystemencoding())
- except UnicodeDecodeError:
- TESTFN_UNDECODABLE = os.fsencode(TESTFN) + name
- break
-
-if FS_NONASCII:
- TESTFN_NONASCII = TESTFN + '-' + FS_NONASCII
-else:
- TESTFN_NONASCII = None
-
-# Save the initial cwd
-SAVEDCWD = os.getcwd()
-
# Set by libregrtest/main.py so we can skip tests that are not
# useful for PGO
PGO = False
@@ -865,103 +615,6 @@ def requires_lzma(reason='requires lzma'):
# PGO task. If this is True, PGO is also True.
PGO_EXTENDED = False
- at contextlib.contextmanager
-def temp_dir(path=None, quiet=False):
- """Return a context manager that creates a temporary directory.
-
- Arguments:
-
- path: the directory to create temporarily. If omitted or None,
- defaults to creating a temporary directory using tempfile.mkdtemp.
-
- quiet: if False (the default), the context manager raises an exception
- on error. Otherwise, if the path is specified and cannot be
- created, only a warning is issued.
-
- """
- import tempfile
- dir_created = False
- if path is None:
- path = tempfile.mkdtemp()
- dir_created = True
- path = os.path.realpath(path)
- else:
- try:
- os.mkdir(path)
- dir_created = True
- except OSError as exc:
- if not quiet:
- raise
- warnings.warn(f'tests may fail, unable to create '
- f'temporary directory {path!r}: {exc}',
- RuntimeWarning, stacklevel=3)
- if dir_created:
- pid = os.getpid()
- try:
- yield path
- finally:
- # In case the process forks, let only the parent remove the
- # directory. The child has a different process id. (bpo-30028)
- if dir_created and pid == os.getpid():
- rmtree(path)
-
- at contextlib.contextmanager
-def change_cwd(path, quiet=False):
- """Return a context manager that changes the current working directory.
-
- Arguments:
-
- path: the directory to use as the temporary current working directory.
-
- quiet: if False (the default), the context manager raises an exception
- on error. Otherwise, it issues only a warning and keeps the current
- working directory the same.
-
- """
- saved_dir = os.getcwd()
- try:
- os.chdir(os.path.realpath(path))
- except OSError as exc:
- if not quiet:
- raise
- warnings.warn(f'tests may fail, unable to change the current working '
- f'directory to {path!r}: {exc}',
- RuntimeWarning, stacklevel=3)
- try:
- yield os.getcwd()
- finally:
- os.chdir(saved_dir)
-
-
- at contextlib.contextmanager
-def temp_cwd(name='tempcwd', quiet=False):
- """
- Context manager that temporarily creates and changes the CWD.
-
- The function temporarily changes the current working directory
- after creating a temporary directory in the current directory with
- name *name*. If *name* is None, the temporary directory is
- created using tempfile.mkdtemp.
-
- If *quiet* is False (default) and it is not possible to
- create or change the CWD, an error is raised. If *quiet* is True,
- only a warning is raised and the original CWD is used.
-
- """
- with temp_dir(path=name, quiet=quiet) as temp_path:
- with change_cwd(temp_path, quiet=quiet) as cwd_dir:
- yield cwd_dir
-
-if hasattr(os, "umask"):
- @contextlib.contextmanager
- def temp_umask(umask):
- """Context manager that temporarily sets the process umask."""
- oldmask = os.umask(umask)
- try:
- yield
- finally:
- os.umask(oldmask)
-
# TEST_HOME_DIR refers to the top level directory of the "test" package
# that contains Python's regression test suite
TEST_SUPPORT_DIR = os.path.dirname(os.path.abspath(__file__))
@@ -970,6 +623,7 @@ def temp_umask(umask):
# TEST_DATA_DIR is used as a target download location for remote resources
TEST_DATA_DIR = os.path.join(TEST_HOME_DIR, "data")
+
def findfile(filename, subdir=None):
"""Try to find a file on sys.path or in the test directory. If it is not
found the argument passed to the function is returned (this does not
@@ -988,10 +642,6 @@ def findfile(filename, subdir=None):
if os.path.exists(fn): return fn
return filename
-def create_empty_file(filename):
- """Create an empty file. If the file already exists, truncate it."""
- fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
- os.close(fd)
def sortdict(dict):
"Like repr(dict), but in sorted order."
@@ -1000,19 +650,6 @@ def sortdict(dict):
withcommas = ", ".join(reprpairs)
return "{%s}" % withcommas
-def make_bad_fd():
- """
- Create an invalid file descriptor by opening and closing a file and return
- its fd.
- """
- file = open(TESTFN, "wb")
- try:
- return file.fileno()
- finally:
- file.close()
- unlink(TESTFN)
-
-
def check_syntax_error(testcase, statement, errtext='', *, lineno=None, offset=None):
with testcase.assertRaisesRegex(SyntaxError, errtext) as cm:
compile(statement, '<test string>', 'exec')
@@ -1265,59 +902,6 @@ def __exit__(self, *ignore_exc):
sys.modules.update(self.original_modules)
-class EnvironmentVarGuard(collections.abc.MutableMapping):
-
- """Class to help protect the environment variable properly. Can be used as
- a context manager."""
-
- def __init__(self):
- self._environ = os.environ
- self._changed = {}
-
- def __getitem__(self, envvar):
- return self._environ[envvar]
-
- def __setitem__(self, envvar, value):
- # Remember the initial value on the first access
- if envvar not in self._changed:
- self._changed[envvar] = self._environ.get(envvar)
- self._environ[envvar] = value
-
- def __delitem__(self, envvar):
- # Remember the initial value on the first access
- if envvar not in self._changed:
- self._changed[envvar] = self._environ.get(envvar)
- if envvar in self._environ:
- del self._environ[envvar]
-
- def keys(self):
- return self._environ.keys()
-
- def __iter__(self):
- return iter(self._environ)
-
- def __len__(self):
- return len(self._environ)
-
- def set(self, envvar, value):
- self[envvar] = value
-
- def unset(self, envvar):
- del self[envvar]
-
- def __enter__(self):
- return self
-
- def __exit__(self, *ignore_exc):
- for (k, v) in self._changed.items():
- if v is None:
- if k in self._environ:
- del self._environ[k]
- else:
- self._environ[k] = v
- os.environ = self._environ
-
-
class DirsOnSysPath(object):
"""Context manager to temporarily add directories to sys.path.
@@ -2133,28 +1717,6 @@ def match_value(self, k, dv, v):
return result
-_can_symlink = None
-def can_symlink():
- global _can_symlink
- if _can_symlink is not None:
- return _can_symlink
- symlink_path = TESTFN + "can_symlink"
- try:
- os.symlink(TESTFN, symlink_path)
- can = True
- except (OSError, NotImplementedError, AttributeError):
- can = False
- else:
- os.remove(symlink_path)
- _can_symlink = can
- return can
-
-def skip_unless_symlink(test):
- """Skip decorator for tests that require functional symlink"""
- ok = can_symlink()
- msg = "Requires functional symlink implementation"
- return test if ok else unittest.skip(msg)(test)
-
_buggy_ucrt = None
def skip_if_buggy_ucrt_strfptime(test):
"""
@@ -2256,45 +1818,6 @@ def call_link(self, *args, returncode=0):
return self._call(self.link, args, self._env, returncode)
-_can_xattr = None
-def can_xattr():
- import tempfile
- global _can_xattr
- if _can_xattr is not None:
- return _can_xattr
- if not hasattr(os, "setxattr"):
- can = False
- else:
- import platform
- tmp_dir = tempfile.mkdtemp()
- tmp_fp, tmp_name = tempfile.mkstemp(dir=tmp_dir)
- try:
- with open(TESTFN, "wb") as fp:
- try:
- # TESTFN & tempfile may use different file systems with
- # different capabilities
- os.setxattr(tmp_fp, b"user.test", b"")
- os.setxattr(tmp_name, b"trusted.foo", b"42")
- os.setxattr(fp.fileno(), b"user.test", b"")
- # Kernels < 2.6.39 don't respect setxattr flags.
- kernel_version = platform.release()
- m = re.match(r"2.6.(\d{1,2})", kernel_version)
- can = m is None or int(m.group(1)) >= 39
- except OSError:
- can = False
- finally:
- unlink(TESTFN)
- unlink(tmp_name)
- rmdir(tmp_dir)
- _can_xattr = can
- return can
-
-def skip_unless_xattr(test):
- """Skip decorator for tests that require functional extended attributes"""
- ok = can_xattr()
- msg = "no non-broken extended attribute support"
- return test if ok else unittest.skip(msg)(test)
-
def skip_if_pgo_task(test):
"""Skip decorator for tests not run in (non-extended) PGO task"""
ok = not PGO or PGO_EXTENDED
@@ -2302,20 +1825,6 @@ def skip_if_pgo_task(test):
return test if ok else unittest.skip(msg)(test)
-def fs_is_case_insensitive(directory):
- """Detects if the file system for the specified directory is case-insensitive."""
- import tempfile
- with tempfile.NamedTemporaryFile(dir=directory) as base:
- base_path = base.name
- case_path = base_path.upper()
- if case_path == base_path:
- case_path = base_path.lower()
- try:
- return os.path.samefile(base_path, case_path)
- except FileNotFoundError:
- return False
-
-
def detect_api_mismatch(ref_api, other_api, *, ignore=()):
"""Returns the set of items in ref_api not in other_api, except for a
defined list of items to be ignored in this check.
@@ -2623,65 +2132,6 @@ def disable_faulthandler():
faulthandler.enable(file=fd, all_threads=True)
-def fd_count():
- """Count the number of open file descriptors.
- """
- if sys.platform.startswith(('linux', 'freebsd')):
- try:
- names = os.listdir("/proc/self/fd")
- # Subtract one because listdir() internally opens a file
- # descriptor to list the content of the /proc/self/fd/ directory.
- return len(names) - 1
- except FileNotFoundError:
- pass
-
- MAXFD = 256
- if hasattr(os, 'sysconf'):
- try:
- MAXFD = os.sysconf("SC_OPEN_MAX")
- except OSError:
- pass
-
- old_modes = None
- if sys.platform == 'win32':
- # bpo-25306, bpo-31009: Call CrtSetReportMode() to not kill the process
- # on invalid file descriptor if Python is compiled in debug mode
- try:
- import msvcrt
- msvcrt.CrtSetReportMode
- except (AttributeError, ImportError):
- # no msvcrt or a release build
- pass
- else:
- old_modes = {}
- for report_type in (msvcrt.CRT_WARN,
- msvcrt.CRT_ERROR,
- msvcrt.CRT_ASSERT):
- old_modes[report_type] = msvcrt.CrtSetReportMode(report_type, 0)
-
- try:
- count = 0
- for fd in range(MAXFD):
- try:
- # Prefer dup() over fstat(). fstat() can require input/output
- # whereas dup() doesn't.
- fd2 = os.dup(fd)
- except OSError as e:
- if e.errno != errno.EBADF:
- raise
- else:
- os.close(fd2)
- count += 1
- finally:
- if old_modes is not None:
- for report_type in (msvcrt.CRT_WARN,
- msvcrt.CRT_ERROR,
- msvcrt.CRT_ASSERT):
- msvcrt.CrtSetReportMode(report_type, old_modes[report_type])
-
- return count
-
-
class SaveSignals:
"""
Save and restore signal handlers.
@@ -2726,24 +2176,6 @@ def with_pymalloc():
return _testcapi.WITH_PYMALLOC
-class FakePath:
- """Simple implementing of the path protocol.
- """
- def __init__(self, path):
- self.path = path
-
- def __repr__(self):
- return f'<FakePath {self.path!r}>'
-
- def __fspath__(self):
- if (isinstance(self.path, BaseException) or
- isinstance(self.path, type) and
- issubclass(self.path, BaseException)):
- raise self.path
- else:
- return self.path
-
-
class _ALWAYS_EQ:
"""
Object that is equal to anything.
diff --git a/Lib/test/support/os_helper.py b/Lib/test/support/os_helper.py
new file mode 100644
index 0000000000000..d3347027cf204
--- /dev/null
+++ b/Lib/test/support/os_helper.py
@@ -0,0 +1,611 @@
+import collections.abc
+import contextlib
+import errno
+import os
+import re
+import stat
+import sys
+import time
+import unittest
+import warnings
+
+
+# Filename used for testing
+if os.name == 'java':
+ # Jython disallows @ in module names
+ TESTFN = '$test'
+else:
+ TESTFN = '@test'
+
+# Disambiguate TESTFN for parallel testing, while letting it remain a valid
+# module name.
+TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid())
+
+# TESTFN_UNICODE is a non-ascii filename
+TESTFN_UNICODE = TESTFN + "-\xe0\xf2\u0258\u0141\u011f"
+if sys.platform == 'darwin':
+ # In Mac OS X's VFS API file names are, by definition, canonically
+ # decomposed Unicode, encoded using UTF-8. See QA1173:
+ # http://developer.apple.com/mac/library/qa/qa2001/qa1173.html
+ import unicodedata
+ TESTFN_UNICODE = unicodedata.normalize('NFD', TESTFN_UNICODE)
+
+# TESTFN_UNENCODABLE is a filename (str type) that should *not* be able to be
+# encoded by the filesystem encoding (in strict mode). It can be None if we
+# cannot generate such filename.
+TESTFN_UNENCODABLE = None
+if os.name == 'nt':
+ # skip win32s (0) or Windows 9x/ME (1)
+ if sys.getwindowsversion().platform >= 2:
+ # Different kinds of characters from various languages to minimize the
+ # probability that the whole name is encodable to MBCS (issue #9819)
+ TESTFN_UNENCODABLE = TESTFN + "-\u5171\u0141\u2661\u0363\uDC80"
+ try:
+ TESTFN_UNENCODABLE.encode(sys.getfilesystemencoding())
+ except UnicodeEncodeError:
+ pass
+ else:
+ print('WARNING: The filename %r CAN be encoded by the filesystem '
+ 'encoding (%s). Unicode filename tests may not be effective'
+ % (TESTFN_UNENCODABLE, sys.getfilesystemencoding()))
+ TESTFN_UNENCODABLE = None
+# Mac OS X denies unencodable filenames (invalid utf-8)
+elif sys.platform != 'darwin':
+ try:
+ # ascii and utf-8 cannot encode the byte 0xff
+ b'\xff'.decode(sys.getfilesystemencoding())
+ except UnicodeDecodeError:
+ # 0xff will be encoded using the surrogate character u+DCFF
+ TESTFN_UNENCODABLE = TESTFN \
+ + b'-\xff'.decode(sys.getfilesystemencoding(), 'surrogateescape')
+ else:
+ # File system encoding (eg. ISO-8859-* encodings) can encode
+ # the byte 0xff. Skip some unicode filename tests.
+ pass
+
+# FS_NONASCII: non-ASCII character encodable by os.fsencode(),
+# or None if there is no such character.
+FS_NONASCII = None
+for character in (
+ # First try printable and common characters to have a readable filename.
+ # For each character, the encoding list are just example of encodings able
+ # to encode the character (the list is not exhaustive).
+
+ # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1
+ '\u00E6',
+ # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3
+ '\u0130',
+ # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257
+ '\u0141',
+ # U+03C6 (Greek Small Letter Phi): cp1253
+ '\u03C6',
+ # U+041A (Cyrillic Capital Letter Ka): cp1251
+ '\u041A',
+ # U+05D0 (Hebrew Letter Alef): Encodable to cp424
+ '\u05D0',
+ # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic
+ '\u060C',
+ # U+062A (Arabic Letter Teh): cp720
+ '\u062A',
+ # U+0E01 (Thai Character Ko Kai): cp874
+ '\u0E01',
+
+ # Then try more "special" characters. "special" because they may be
+ # interpreted or displayed differently depending on the exact locale
+ # encoding and the font.
+
+ # U+00A0 (No-Break Space)
+ '\u00A0',
+ # U+20AC (Euro Sign)
+ '\u20AC',
+):
+ try:
+ # If Python is set up to use the legacy 'mbcs' in Windows,
+ # 'replace' error mode is used, and encode() returns b'?'
+ # for characters missing in the ANSI codepage
+ if os.fsdecode(os.fsencode(character)) != character:
+ raise UnicodeError
+ except UnicodeError:
+ pass
+ else:
+ FS_NONASCII = character
+ break
+
+# Save the initial cwd
+SAVEDCWD = os.getcwd()
+
+# TESTFN_UNDECODABLE is a filename (bytes type) that should *not* be able to be
+# decoded from the filesystem encoding (in strict mode). It can be None if we
+# cannot generate such filename (ex: the latin1 encoding can decode any byte
+# sequence). On UNIX, TESTFN_UNDECODABLE can be decoded by os.fsdecode() thanks
+# to the surrogateescape error handler (PEP 383), but not from the filesystem
+# encoding in strict mode.
+TESTFN_UNDECODABLE = None
+for name in (
+ # b'\xff' is not decodable by os.fsdecode() with code page 932. Windows
+ # accepts it to create a file or a directory, or don't accept to enter to
+ # such directory (when the bytes name is used). So test b'\xe7' first:
+ # it is not decodable from cp932.
+ b'\xe7w\xf0',
+ # undecodable from ASCII, UTF-8
+ b'\xff',
+ # undecodable from iso8859-3, iso8859-6, iso8859-7, cp424, iso8859-8, cp856
+ # and cp857
+ b'\xae\xd5'
+ # undecodable from UTF-8 (UNIX and Mac OS X)
+ b'\xed\xb2\x80', b'\xed\xb4\x80',
+ # undecodable from shift_jis, cp869, cp874, cp932, cp1250, cp1251, cp1252,
+ # cp1253, cp1254, cp1255, cp1257, cp1258
+ b'\x81\x98',
+):
+ try:
+ name.decode(sys.getfilesystemencoding())
+ except UnicodeDecodeError:
+ TESTFN_UNDECODABLE = os.fsencode(TESTFN) + name
+ break
+
+if FS_NONASCII:
+ TESTFN_NONASCII = TESTFN + '-' + FS_NONASCII
+else:
+ TESTFN_NONASCII = None
+
+
+def make_bad_fd():
+ """
+ Create an invalid file descriptor by opening and closing a file and return
+ its fd.
+ """
+ file = open(TESTFN, "wb")
+ try:
+ return file.fileno()
+ finally:
+ file.close()
+ unlink(TESTFN)
+
+
+_can_symlink = None
+
+
+def can_symlink():
+ global _can_symlink
+ if _can_symlink is not None:
+ return _can_symlink
+ symlink_path = TESTFN + "can_symlink"
+ try:
+ os.symlink(TESTFN, symlink_path)
+ can = True
+ except (OSError, NotImplementedError, AttributeError):
+ can = False
+ else:
+ os.remove(symlink_path)
+ _can_symlink = can
+ return can
+
+
+def skip_unless_symlink(test):
+ """Skip decorator for tests that require functional symlink"""
+ ok = can_symlink()
+ msg = "Requires functional symlink implementation"
+ return test if ok else unittest.skip(msg)(test)
+
+
+_can_xattr = None
+
+
+def can_xattr():
+ import tempfile
+ global _can_xattr
+ if _can_xattr is not None:
+ return _can_xattr
+ if not hasattr(os, "setxattr"):
+ can = False
+ else:
+ import platform
+ tmp_dir = tempfile.mkdtemp()
+ tmp_fp, tmp_name = tempfile.mkstemp(dir=tmp_dir)
+ try:
+ with open(TESTFN, "wb") as fp:
+ try:
+ # TESTFN & tempfile may use different file systems with
+ # different capabilities
+ os.setxattr(tmp_fp, b"user.test", b"")
+ os.setxattr(tmp_name, b"trusted.foo", b"42")
+ os.setxattr(fp.fileno(), b"user.test", b"")
+ # Kernels < 2.6.39 don't respect setxattr flags.
+ kernel_version = platform.release()
+ m = re.match(r"2.6.(\d{1,2})", kernel_version)
+ can = m is None or int(m.group(1)) >= 39
+ except OSError:
+ can = False
+ finally:
+ unlink(TESTFN)
+ unlink(tmp_name)
+ rmdir(tmp_dir)
+ _can_xattr = can
+ return can
+
+
+def skip_unless_xattr(test):
+ """Skip decorator for tests that require functional extended attributes"""
+ ok = can_xattr()
+ msg = "no non-broken extended attribute support"
+ return test if ok else unittest.skip(msg)(test)
+
+
+def unlink(filename):
+ try:
+ _unlink(filename)
+ except (FileNotFoundError, NotADirectoryError):
+ pass
+
+
+if sys.platform.startswith("win"):
+ def _waitfor(func, pathname, waitall=False):
+ # Perform the operation
+ func(pathname)
+ # Now setup the wait loop
+ if waitall:
+ dirname = pathname
+ else:
+ dirname, name = os.path.split(pathname)
+ dirname = dirname or '.'
+ # Check for `pathname` to be removed from the filesystem.
+ # The exponential backoff of the timeout amounts to a total
+ # of ~1 second after which the deletion is probably an error
+ # anyway.
+ # Testing on an i7 at 4.3GHz shows that usually only 1 iteration is
+ # required when contention occurs.
+ timeout = 0.001
+ while timeout < 1.0:
+ # Note we are only testing for the existence of the file(s) in
+ # the contents of the directory regardless of any security or
+ # access rights. If we have made it this far, we have sufficient
+ # permissions to do that much using Python's equivalent of the
+ # Windows API FindFirstFile.
+ # Other Windows APIs can fail or give incorrect results when
+ # dealing with files that are pending deletion.
+ L = os.listdir(dirname)
+ if not (L if waitall else name in L):
+ return
+ # Increase the timeout and try again
+ time.sleep(timeout)
+ timeout *= 2
+ warnings.warn('tests may fail, delete still pending for ' + pathname,
+ RuntimeWarning, stacklevel=4)
+
+ def _unlink(filename):
+ _waitfor(os.unlink, filename)
+
+ def _rmdir(dirname):
+ _waitfor(os.rmdir, dirname)
+
+ def _rmtree(path):
+ from test.support import _force_run
+
+ def _rmtree_inner(path):
+ for name in _force_run(path, os.listdir, path):
+ fullname = os.path.join(path, name)
+ try:
+ mode = os.lstat(fullname).st_mode
+ except OSError as exc:
+ print("support.rmtree(): os.lstat(%r) failed with %s"
+ % (fullname, exc),
+ file=sys.__stderr__)
+ mode = 0
+ if stat.S_ISDIR(mode):
+ _waitfor(_rmtree_inner, fullname, waitall=True)
+ _force_run(fullname, os.rmdir, fullname)
+ else:
+ _force_run(fullname, os.unlink, fullname)
+ _waitfor(_rmtree_inner, path, waitall=True)
+ _waitfor(lambda p: _force_run(p, os.rmdir, p), path)
+
+ def _longpath(path):
+ try:
+ import ctypes
+ except ImportError:
+ # No ctypes means we can't expands paths.
+ pass
+ else:
+ buffer = ctypes.create_unicode_buffer(len(path) * 2)
+ length = ctypes.windll.kernel32.GetLongPathNameW(path, buffer,
+ len(buffer))
+ if length:
+ return buffer[:length]
+ return path
+else:
+ _unlink = os.unlink
+ _rmdir = os.rmdir
+
+ def _rmtree(path):
+ import shutil
+ try:
+ shutil.rmtree(path)
+ return
+ except OSError:
+ pass
+
+ def _rmtree_inner(path):
+ from test.support import _force_run
+ for name in _force_run(path, os.listdir, path):
+ fullname = os.path.join(path, name)
+ try:
+ mode = os.lstat(fullname).st_mode
+ except OSError:
+ mode = 0
+ if stat.S_ISDIR(mode):
+ _rmtree_inner(fullname)
+ _force_run(path, os.rmdir, fullname)
+ else:
+ _force_run(path, os.unlink, fullname)
+ _rmtree_inner(path)
+ os.rmdir(path)
+
+ def _longpath(path):
+ return path
+
+
+def rmdir(dirname):
+ try:
+ _rmdir(dirname)
+ except FileNotFoundError:
+ pass
+
+
+def rmtree(path):
+ try:
+ _rmtree(path)
+ except FileNotFoundError:
+ pass
+
+
+ at contextlib.contextmanager
+def temp_dir(path=None, quiet=False):
+ """Return a context manager that creates a temporary directory.
+
+ Arguments:
+
+ path: the directory to create temporarily. If omitted or None,
+ defaults to creating a temporary directory using tempfile.mkdtemp.
+
+ quiet: if False (the default), the context manager raises an exception
+ on error. Otherwise, if the path is specified and cannot be
+ created, only a warning is issued.
+
+ """
+ import tempfile
+ dir_created = False
+ if path is None:
+ path = tempfile.mkdtemp()
+ dir_created = True
+ path = os.path.realpath(path)
+ else:
+ try:
+ os.mkdir(path)
+ dir_created = True
+ except OSError as exc:
+ if not quiet:
+ raise
+ warnings.warn(f'tests may fail, unable to create '
+ f'temporary directory {path!r}: {exc}',
+ RuntimeWarning, stacklevel=3)
+ if dir_created:
+ pid = os.getpid()
+ try:
+ yield path
+ finally:
+ # In case the process forks, let only the parent remove the
+ # directory. The child has a different process id. (bpo-30028)
+ if dir_created and pid == os.getpid():
+ rmtree(path)
+
+
+ at contextlib.contextmanager
+def change_cwd(path, quiet=False):
+ """Return a context manager that changes the current working directory.
+
+ Arguments:
+
+ path: the directory to use as the temporary current working directory.
+
+ quiet: if False (the default), the context manager raises an exception
+ on error. Otherwise, it issues only a warning and keeps the current
+ working directory the same.
+
+ """
+ saved_dir = os.getcwd()
+ try:
+ os.chdir(os.path.realpath(path))
+ except OSError as exc:
+ if not quiet:
+ raise
+ warnings.warn(f'tests may fail, unable to change the current working '
+ f'directory to {path!r}: {exc}',
+ RuntimeWarning, stacklevel=3)
+ try:
+ yield os.getcwd()
+ finally:
+ os.chdir(saved_dir)
+
+
+ at contextlib.contextmanager
+def temp_cwd(name='tempcwd', quiet=False):
+ """
+ Context manager that temporarily creates and changes the CWD.
+
+ The function temporarily changes the current working directory
+ after creating a temporary directory in the current directory with
+ name *name*. If *name* is None, the temporary directory is
+ created using tempfile.mkdtemp.
+
+ If *quiet* is False (default) and it is not possible to
+ create or change the CWD, an error is raised. If *quiet* is True,
+ only a warning is raised and the original CWD is used.
+
+ """
+ with temp_dir(path=name, quiet=quiet) as temp_path:
+ with change_cwd(temp_path, quiet=quiet) as cwd_dir:
+ yield cwd_dir
+
+
+def create_empty_file(filename):
+ """Create an empty file. If the file already exists, truncate it."""
+ fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
+ os.close(fd)
+
+
+def fs_is_case_insensitive(directory):
+ """Detects if the file system for the specified directory
+ is case-insensitive."""
+ import tempfile
+ with tempfile.NamedTemporaryFile(dir=directory) as base:
+ base_path = base.name
+ case_path = base_path.upper()
+ if case_path == base_path:
+ case_path = base_path.lower()
+ try:
+ return os.path.samefile(base_path, case_path)
+ except FileNotFoundError:
+ return False
+
+
+class FakePath:
+ """Simple implementing of the path protocol.
+ """
+ def __init__(self, path):
+ self.path = path
+
+ def __repr__(self):
+ return f'<FakePath {self.path!r}>'
+
+ def __fspath__(self):
+ if (isinstance(self.path, BaseException) or
+ isinstance(self.path, type) and
+ issubclass(self.path, BaseException)):
+ raise self.path
+ else:
+ return self.path
+
+
+def fd_count():
+ """Count the number of open file descriptors.
+ """
+ if sys.platform.startswith(('linux', 'freebsd')):
+ try:
+ names = os.listdir("/proc/self/fd")
+ # Subtract one because listdir() internally opens a file
+ # descriptor to list the content of the /proc/self/fd/ directory.
+ return len(names) - 1
+ except FileNotFoundError:
+ pass
+
+ MAXFD = 256
+ if hasattr(os, 'sysconf'):
+ try:
+ MAXFD = os.sysconf("SC_OPEN_MAX")
+ except OSError:
+ pass
+
+ old_modes = None
+ if sys.platform == 'win32':
+ # bpo-25306, bpo-31009: Call CrtSetReportMode() to not kill the process
+ # on invalid file descriptor if Python is compiled in debug mode
+ try:
+ import msvcrt
+ msvcrt.CrtSetReportMode
+ except (AttributeError, ImportError):
+ # no msvcrt or a release build
+ pass
+ else:
+ old_modes = {}
+ for report_type in (msvcrt.CRT_WARN,
+ msvcrt.CRT_ERROR,
+ msvcrt.CRT_ASSERT):
+ old_modes[report_type] = msvcrt.CrtSetReportMode(report_type,
+ 0)
+
+ try:
+ count = 0
+ for fd in range(MAXFD):
+ try:
+ # Prefer dup() over fstat(). fstat() can require input/output
+ # whereas dup() doesn't.
+ fd2 = os.dup(fd)
+ except OSError as e:
+ if e.errno != errno.EBADF:
+ raise
+ else:
+ os.close(fd2)
+ count += 1
+ finally:
+ if old_modes is not None:
+ for report_type in (msvcrt.CRT_WARN,
+ msvcrt.CRT_ERROR,
+ msvcrt.CRT_ASSERT):
+ msvcrt.CrtSetReportMode(report_type, old_modes[report_type])
+
+ return count
+
+
+if hasattr(os, "umask"):
+ @contextlib.contextmanager
+ def temp_umask(umask):
+ """Context manager that temporarily sets the process umask."""
+ oldmask = os.umask(umask)
+ try:
+ yield
+ finally:
+ os.umask(oldmask)
+
+
+class EnvironmentVarGuard(collections.abc.MutableMapping):
+
+ """Class to help protect the environment variable properly. Can be used as
+ a context manager."""
+
+ def __init__(self):
+ self._environ = os.environ
+ self._changed = {}
+
+ def __getitem__(self, envvar):
+ return self._environ[envvar]
+
+ def __setitem__(self, envvar, value):
+ # Remember the initial value on the first access
+ if envvar not in self._changed:
+ self._changed[envvar] = self._environ.get(envvar)
+ self._environ[envvar] = value
+
+ def __delitem__(self, envvar):
+ # Remember the initial value on the first access
+ if envvar not in self._changed:
+ self._changed[envvar] = self._environ.get(envvar)
+ if envvar in self._environ:
+ del self._environ[envvar]
+
+ def keys(self):
+ return self._environ.keys()
+
+ def __iter__(self):
+ return iter(self._environ)
+
+ def __len__(self):
+ return len(self._environ)
+
+ def set(self, envvar, value):
+ self[envvar] = value
+
+ def unset(self, envvar):
+ del self[envvar]
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *ignore_exc):
+ for (k, v) in self._changed.items():
+ if v is None:
+ if k in self._environ:
+ del self._environ[k]
+ else:
+ self._environ[k] = v
+ os.environ = self._environ
More information about the Python-checkins
mailing list