[pypy-svn] r77030 - pypy/branch/fast-forward/lib-python/modified-2.7.0/test

afa at codespeak.net afa at codespeak.net
Mon Sep 13 00:59:14 CEST 2010


Author: afa
Date: Mon Sep 13 00:59:12 2010
New Revision: 77030

Added:
   pypy/branch/fast-forward/lib-python/modified-2.7.0/test/__init__.py   (contents, props changed)
   pypy/branch/fast-forward/lib-python/modified-2.7.0/test/test_support.py   (contents, props changed)
Log:
Let the modified tests run instead of the initial ones


Added: pypy/branch/fast-forward/lib-python/modified-2.7.0/test/__init__.py
==============================================================================
--- (empty file)
+++ pypy/branch/fast-forward/lib-python/modified-2.7.0/test/__init__.py	Mon Sep 13 00:59:12 2010
@@ -0,0 +1 @@
+# Dummy file to make this directory a package.

Added: pypy/branch/fast-forward/lib-python/modified-2.7.0/test/test_support.py
==============================================================================
--- (empty file)
+++ pypy/branch/fast-forward/lib-python/modified-2.7.0/test/test_support.py	Mon Sep 13 00:59:12 2010
@@ -0,0 +1,1201 @@
+"""Supporting definitions for the Python regression tests."""
+
+if __name__ != 'test.test_support':
+    raise ImportError('test_support must be imported from the test package')
+
+import contextlib
+import errno
+import functools
+import gc
+import socket
+import sys
+import os
+import platform
+import shutil
+import warnings
+import unittest
+import importlib
+import UserDict
+import re
+import time
+try:
+    import thread
+except ImportError:
+    thread = None
+
+__all__ = ["Error", "TestFailed", "ResourceDenied", "import_module",
+           "verbose", "use_resources", "max_memuse", "record_original_stdout",
+           "get_original_stdout", "unload", "unlink", "rmtree", "forget",
+           "is_resource_enabled", "requires", "find_unused_port", "bind_port",
+           "fcmp", "have_unicode", "is_jython", "TESTFN", "HOST", "FUZZ",
+           "SAVEDCWD", "temp_cwd", "findfile", "sortdict", "check_syntax_error",
+           "open_urlresource", "check_warnings", "check_py3k_warnings",
+           "CleanImport", "EnvironmentVarGuard", "captured_output",
+           "captured_stdout", "TransientResource", "transient_internet",
+           "run_with_locale", "set_memlimit", "bigmemtest", "bigaddrspacetest",
+           "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup",
+           "threading_cleanup", "reap_children", "cpython_only",
+           "check_impl_detail", "get_attribute", "py3k_bytes"]
+
+
+class Error(Exception):
+    """Base class for regression test exceptions."""
+
+class TestFailed(Error):
+    """Test failed."""
+
+class ResourceDenied(unittest.SkipTest):
+    """Test skipped because it requested a disallowed resource.
+
+    This is raised when a test calls requires() for a resource that
+    has not been enabled.  It is used to distinguish between expected
+    and unexpected skips.
+    """
+
+ at contextlib.contextmanager
+def _ignore_deprecated_imports(ignore=True):
+    """Context manager to suppress package and module deprecation
+    warnings when importing them.
+
+    If ignore is False, this context manager has no effect."""
+    if ignore:
+        with warnings.catch_warnings():
+            warnings.filterwarnings("ignore", ".+ (module|package)",
+                                    DeprecationWarning)
+            yield
+    else:
+        yield
+
+
+def import_module(name, deprecated=False):
+    """Import and return the module to be tested, raising SkipTest if
+    it is not available.
+
+    If deprecated is True, any module or package deprecation messages
+    will be suppressed."""
+    with _ignore_deprecated_imports(deprecated):
+        try:
+            return importlib.import_module(name)
+        except ImportError, msg:
+            raise unittest.SkipTest(str(msg))
+
+
+def _save_and_remove_module(name, orig_modules):
+    """Helper function to save and remove a module from sys.modules
+
+       Return value is True if the module was in sys.modules and
+       False otherwise."""
+    saved = True
+    try:
+        orig_modules[name] = sys.modules[name]
+    except KeyError:
+        saved = False
+    else:
+        del sys.modules[name]
+    return saved
+
+
+def _save_and_block_module(name, orig_modules):
+    """Helper function to save and block a module in sys.modules
+
+       Return value is True if the module was in sys.modules and
+       False otherwise."""
+    saved = True
+    try:
+        orig_modules[name] = sys.modules[name]
+    except KeyError:
+        saved = False
+    sys.modules[name] = None
+    return saved
+
+
+def import_fresh_module(name, fresh=(), blocked=(), deprecated=False):
+    """Imports and returns a module, deliberately bypassing the sys.modules cache
+    and importing a fresh copy of the module. Once the import is complete,
+    the sys.modules cache is restored to its original state.
+
+    Modules named in fresh are also imported anew if needed by the import.
+
+    Importing of modules named in blocked is prevented while the fresh import
+    takes place.
+
+    If deprecated is True, any module or package deprecation messages
+    will be suppressed."""
+    # NOTE: test_heapq and test_warnings include extra sanity checks to make
+    # sure that this utility function is working as expected
+    with _ignore_deprecated_imports(deprecated):
+        # Keep track of modules saved for later restoration as well
+        # as those which just need a blocking entry removed
+        orig_modules = {}
+        names_to_remove = []
+        _save_and_remove_module(name, orig_modules)
+        try:
+            for fresh_name in fresh:
+                _save_and_remove_module(fresh_name, orig_modules)
+            for blocked_name in blocked:
+                if not _save_and_block_module(blocked_name, orig_modules):
+                    names_to_remove.append(blocked_name)
+            fresh_module = importlib.import_module(name)
+        finally:
+            for orig_name, module in orig_modules.items():
+                sys.modules[orig_name] = module
+            for name_to_remove in names_to_remove:
+                del sys.modules[name_to_remove]
+        return fresh_module
+
+
+def get_attribute(obj, name):
+    """Get an attribute, raising SkipTest if AttributeError is raised."""
+    try:
+        attribute = getattr(obj, name)
+    except AttributeError:
+        raise unittest.SkipTest("module %s has no attribute %s" % (
+            obj.__name__, name))
+    else:
+        return attribute
+
+
+verbose = 1              # Flag set to 0 by regrtest.py
+use_resources = None     # Flag set to [] by regrtest.py
+max_memuse = 0           # Disable bigmem tests (they will still be run with
+                         # small sizes, to make sure they work.)
+real_max_memuse = 0
+
+# _original_stdout is meant to hold stdout at the time regrtest began.
+# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
+# The point is to have some flavor of stdout the user can actually see.
+_original_stdout = None
+def record_original_stdout(stdout):
+    global _original_stdout
+    _original_stdout = stdout
+
+def get_original_stdout():
+    return _original_stdout or sys.stdout
+
+def unload(name):
+    try:
+        del sys.modules[name]
+    except KeyError:
+        pass
+
+def unlink(filename):
+    try:
+        os.unlink(filename)
+    except OSError:
+        pass
+
+def rmtree(path):
+    try:
+        shutil.rmtree(path)
+    except OSError, e:
+        # Unix returns ENOENT, Windows returns ESRCH.
+        if e.errno not in (errno.ENOENT, errno.ESRCH):
+            raise
+
+def forget(modname):
+    '''"Forget" a module was ever imported by removing it from sys.modules and
+    deleting any .pyc and .pyo files.'''
+    unload(modname)
+    for dirname in sys.path:
+        unlink(os.path.join(dirname, modname + os.extsep + 'pyc'))
+        # Deleting the .pyo file cannot be within the 'try' for the .pyc since
+        # the chance exists that there is no .pyc (and thus the 'try' statement
+        # is exited) but there is a .pyo file.
+        unlink(os.path.join(dirname, modname + os.extsep + 'pyo'))
+
+def is_resource_enabled(resource):
+    """Test whether a resource is enabled.  Known resources are set by
+    regrtest.py."""
+    return use_resources is not None and resource in use_resources
+
+def requires(resource, msg=None):
+    """Raise ResourceDenied if the specified resource is not available.
+
+    If the caller's module is __main__ then automatically return True.  The
+    possibility of False being returned occurs when regrtest.py is executing."""
+    # see if the caller's module is __main__ - if so, treat as if
+    # the resource was set
+    if sys._getframe(1).f_globals.get("__name__") == "__main__":
+        return
+    if not is_resource_enabled(resource):
+        if msg is None:
+            msg = "Use of the `%s' resource not enabled" % resource
+        raise ResourceDenied(msg)
+
+HOST = 'localhost'
+
+def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
+    """Returns an unused port that should be suitable for binding.  This is
+    achieved by creating a temporary socket with the same family and type as
+    the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
+    the specified host address (defaults to 0.0.0.0) with the port set to 0,
+    eliciting an unused ephemeral port from the OS.  The temporary socket is
+    then closed and deleted, and the ephemeral port is returned.
+
+    Either this method or bind_port() should be used for any tests where a
+    server socket needs to be bound to a particular port for the duration of
+    the test.  Which one to use depends on whether the calling code is creating
+    a python socket, or if an unused port needs to be provided in a constructor
+    or passed to an external program (i.e. the -accept argument to openssl's
+    s_server mode).  Always prefer bind_port() over find_unused_port() where
+    possible.  Hard coded ports should *NEVER* be used.  As soon as a server
+    socket is bound to a hard coded port, the ability to run multiple instances
+    of the test simultaneously on the same host is compromised, which makes the
+    test a ticking time bomb in a buildbot environment. On Unix buildbots, this
+    may simply manifest as a failed test, which can be recovered from without
+    intervention in most cases, but on Windows, the entire python process can
+    completely and utterly wedge, requiring someone to log in to the buildbot
+    and manually kill the affected process.
+
+    (This is easy to reproduce on Windows, unfortunately, and can be traced to
+    the SO_REUSEADDR socket option having different semantics on Windows versus
+    Unix/Linux.  On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
+    listen and then accept connections on identical host/ports.  An EADDRINUSE
+    socket.error will be raised at some point (depending on the platform and
+    the order bind and listen were called on each socket).
+
+    However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
+    will ever be raised when attempting to bind two identical host/ports. When
+    accept() is called on each socket, the second caller's process will steal
+    the port from the first caller, leaving them both in an awkwardly wedged
+    state where they'll no longer respond to any signals or graceful kills, and
+    must be forcibly killed via OpenProcess()/TerminateProcess().
+
+    The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
+    instead of SO_REUSEADDR, which effectively affords the same semantics as
+    SO_REUSEADDR on Unix.  Given the propensity of Unix developers in the Open
+    Source world compared to Windows ones, this is a common mistake.  A quick
+    look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
+    openssl.exe is called with the 's_server' option, for example. See
+    http://bugs.python.org/issue2550 for more info.  The following site also
+    has a very thorough description about the implications of both REUSEADDR
+    and EXCLUSIVEADDRUSE on Windows:
+    http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
+
+    XXX: although this approach is a vast improvement on previous attempts to
+    elicit unused ports, it rests heavily on the assumption that the ephemeral
+    port returned to us by the OS won't immediately be dished back out to some
+    other process when we close and delete our temporary socket but before our
+    calling code has a chance to bind the returned port.  We can deal with this
+    issue if/when we come across it."""
+    tempsock = socket.socket(family, socktype)
+    port = bind_port(tempsock)
+    tempsock.close()
+    del tempsock
+    return port
+
+def bind_port(sock, host=HOST):
+    """Bind the socket to a free port and return the port number.  Relies on
+    ephemeral ports in order to ensure we are using an unbound port.  This is
+    important as many tests may be running simultaneously, especially in a
+    buildbot environment.  This method raises an exception if the sock.family
+    is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
+    or SO_REUSEPORT set on it.  Tests should *never* set these socket options
+    for TCP/IP sockets.  The only case for setting these options is testing
+    multicasting via multiple UDP sockets.
+
+    Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
+    on Windows), it will be set on the socket.  This will prevent anyone else
+    from bind()'ing to our host/port for the duration of the test.
+    """
+    if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
+        if hasattr(socket, 'SO_REUSEADDR'):
+            if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
+                raise TestFailed("tests should never set the SO_REUSEADDR "   \
+                                 "socket option on TCP/IP sockets!")
+        if hasattr(socket, 'SO_REUSEPORT'):
+            if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
+                raise TestFailed("tests should never set the SO_REUSEPORT "   \
+                                 "socket option on TCP/IP sockets!")
+        if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
+            sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
+
+    sock.bind((host, 0))
+    port = sock.getsockname()[1]
+    return port
+
+FUZZ = 1e-6
+
+def fcmp(x, y): # fuzzy comparison function
+    if isinstance(x, float) or isinstance(y, float):
+        try:
+            fuzz = (abs(x) + abs(y)) * FUZZ
+            if abs(x-y) <= fuzz:
+                return 0
+        except:
+            pass
+    elif type(x) == type(y) and isinstance(x, (tuple, list)):
+        for i in range(min(len(x), len(y))):
+            outcome = fcmp(x[i], y[i])
+            if outcome != 0:
+                return outcome
+        return (len(x) > len(y)) - (len(x) < len(y))
+    return (x > y) - (x < y)
+
+try:
+    unicode
+    have_unicode = True
+except NameError:
+    have_unicode = False
+
+is_jython = sys.platform.startswith('java')
+
+# Filename used for testing
+if os.name == 'java':
+    # Jython disallows @ in module names
+    TESTFN = '$test'
+elif os.name == 'riscos':
+    TESTFN = 'testfile'
+else:
+    TESTFN = '@test'
+    # Unicode name only used if TEST_FN_ENCODING exists for the platform.
+    if have_unicode:
+        # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding()
+        # TESTFN_UNICODE is a filename that can be encoded using the
+        # file system encoding, but *not* with the default (ascii) encoding
+        if isinstance('', unicode):
+            # python -U
+            # XXX perhaps unicode() should accept Unicode strings?
+            TESTFN_UNICODE = "@test-\xe0\xf2"
+        else:
+            # 2 latin characters.
+            TESTFN_UNICODE = unicode("@test-\xe0\xf2", "latin-1")
+        TESTFN_ENCODING = sys.getfilesystemencoding()
+        # TESTFN_UNENCODABLE is a filename that should *not* be
+        # able to be encoded by *either* the default or filesystem encoding.
+        # This test really only makes sense on Windows NT platforms
+        # which have special Unicode support in posixmodule.
+        if (not hasattr(sys, "getwindowsversion") or
+                sys.getwindowsversion()[3] < 2): #  0=win32s or 1=9x/ME
+            TESTFN_UNENCODABLE = None
+        else:
+            # Japanese characters (I think - from bug 846133)
+            TESTFN_UNENCODABLE = eval('u"@test-\u5171\u6709\u3055\u308c\u308b"')
+            try:
+                # XXX - Note - should be using TESTFN_ENCODING here - but for
+                # Windows, "mbcs" currently always operates as if in
+                # errors=ignore' mode - hence we get '?' characters rather than
+                # the exception.  'Latin1' operates as we expect - ie, fails.
+                # See [ 850997 ] mbcs encoding ignores errors
+                TESTFN_UNENCODABLE.encode("Latin1")
+            except UnicodeEncodeError:
+                pass
+            else:
+                print \
+                'WARNING: The filename %r CAN be encoded by the filesystem.  ' \
+                'Unicode filename tests may not be effective' \
+                % TESTFN_UNENCODABLE
+
+
+# Disambiguate TESTFN for parallel testing, while letting it remain a valid
+# module name.
+TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid())
+
+# Save the initial cwd
+SAVEDCWD = os.getcwd()
+
+ at contextlib.contextmanager
+def temp_cwd(name='tempcwd', quiet=False):
+    """
+    Context manager that creates a temporary directory and set it as CWD.
+
+    The new CWD is created in the current directory and it's named *name*.
+    If *quiet* is False (default) and it's not possible to create or change
+    the CWD, an error is raised.  If it's True, only a warning is raised
+    and the original CWD is used.
+    """
+    if isinstance(name, unicode):
+        try:
+            name = name.encode(sys.getfilesystemencoding() or 'ascii')
+        except UnicodeEncodeError:
+            if not quiet:
+                raise unittest.SkipTest('unable to encode the cwd name with '
+                                        'the filesystem encoding.')
+    saved_dir = os.getcwd()
+    is_temporary = False
+    try:
+        os.mkdir(name)
+        os.chdir(name)
+        is_temporary = True
+    except OSError:
+        if not quiet:
+            raise
+        warnings.warn('tests may fail, unable to change the CWD to ' + name,
+                      RuntimeWarning, stacklevel=3)
+    try:
+        yield os.getcwd()
+    finally:
+        os.chdir(saved_dir)
+        if is_temporary:
+            rmtree(name)
+
+
+def findfile(file, here=__file__, subdir=None):
+    """Try to find a file on sys.path and the working directory.  If it is not
+    found the argument passed to the function is returned (this does not
+    necessarily signal failure; could still be the legitimate path)."""
+    if os.path.isabs(file):
+        return file
+    if subdir is not None:
+        file = os.path.join(subdir, file)
+    path = sys.path
+    path = [os.path.dirname(here)] + path
+    for dn in path:
+        fn = os.path.join(dn, file)
+        if os.path.exists(fn): return fn
+    return file
+
+def sortdict(dict):
+    "Like repr(dict), but in sorted order."
+    items = dict.items()
+    items.sort()
+    reprpairs = ["%r: %r" % pair for pair in items]
+    withcommas = ", ".join(reprpairs)
+    return "{%s}" % withcommas
+
+def make_bad_fd():
+    """
+    Create an invalid file descriptor by opening and closing a file and return
+    its fd.
+    """
+    file = open(TESTFN, "wb")
+    try:
+        return file.fileno()
+    finally:
+        file.close()
+        unlink(TESTFN)
+
+def check_syntax_error(testcase, statement):
+    testcase.assertRaises(SyntaxError, compile, statement,
+                          '<test string>', 'exec')
+
+def open_urlresource(url, check=None):
+    import urlparse, urllib2
+
+    filename = urlparse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
+
+    fn = os.path.join(os.path.dirname(__file__), "data", filename)
+
+    def check_valid_file(fn):
+        f = open(fn)
+        if check is None:
+            return f
+        elif check(f):
+            f.seek(0)
+            return f
+        f.close()
+
+    if os.path.exists(fn):
+        f = check_valid_file(fn)
+        if f is not None:
+            return f
+        unlink(fn)
+
+    # Verify the requirement before downloading the file
+    requires('urlfetch')
+
+    print >> get_original_stdout(), '\tfetching %s ...' % url
+    f = urllib2.urlopen(url, timeout=15)
+    try:
+        with open(fn, "wb") as out:
+            s = f.read()
+            while s:
+                out.write(s)
+                s = f.read()
+    finally:
+        f.close()
+
+    f = check_valid_file(fn)
+    if f is not None:
+        return f
+    raise TestFailed('invalid resource "%s"' % fn)
+
+
+class WarningsRecorder(object):
+    """Convenience wrapper for the warnings list returned on
+       entry to the warnings.catch_warnings() context manager.
+    """
+    def __init__(self, warnings_list):
+        self._warnings = warnings_list
+        self._last = 0
+
+    def __getattr__(self, attr):
+        if len(self._warnings) > self._last:
+            return getattr(self._warnings[-1], attr)
+        elif attr in warnings.WarningMessage._WARNING_DETAILS:
+            return None
+        raise AttributeError("%r has no attribute %r" % (self, attr))
+
+    @property
+    def warnings(self):
+        return self._warnings[self._last:]
+
+    def reset(self):
+        self._last = len(self._warnings)
+
+
+def _filterwarnings(filters, quiet=False):
+    """Catch the warnings, then check if all the expected
+    warnings have been raised and re-raise unexpected warnings.
+    If 'quiet' is True, only re-raise the unexpected warnings.
+    """
+    # Clear the warning registry of the calling module
+    # in order to re-raise the warnings.
+    frame = sys._getframe(2)
+    registry = frame.f_globals.get('__warningregistry__')
+    if registry:
+        registry.clear()
+    with warnings.catch_warnings(record=True) as w:
+        # Set filter "always" to record all warnings.  Because
+        # test_warnings swap the module, we need to look up in
+        # the sys.modules dictionary.
+        sys.modules['warnings'].simplefilter("always")
+        yield WarningsRecorder(w)
+    # Filter the recorded warnings
+    reraise = [warning.message for warning in w]
+    missing = []
+    for msg, cat in filters:
+        seen = False
+        for exc in reraise[:]:
+            message = str(exc)
+            # Filter out the matching messages
+            if (re.match(msg, message, re.I) and
+                issubclass(exc.__class__, cat)):
+                seen = True
+                reraise.remove(exc)
+        if not seen and not quiet:
+            # This filter caught nothing
+            missing.append((msg, cat.__name__))
+    if reraise:
+        raise AssertionError("unhandled warning %r" % reraise[0])
+    if missing:
+        raise AssertionError("filter (%r, %s) did not catch any warning" %
+                             missing[0])
+
+
+ at contextlib.contextmanager
+def check_warnings(*filters, **kwargs):
+    """Context manager to silence warnings.
+
+    Accept 2-tuples as positional arguments:
+        ("message regexp", WarningCategory)
+
+    Optional argument:
+     - if 'quiet' is True, it does not fail if a filter catches nothing
+        (default True without argument,
+         default False if some filters are defined)
+
+    Without argument, it defaults to:
+        check_warnings(("", Warning), quiet=True)
+    """
+    quiet = kwargs.get('quiet')
+    if not filters:
+        filters = (("", Warning),)
+        # Preserve backward compatibility
+        if quiet is None:
+            quiet = True
+    return _filterwarnings(filters, quiet)
+
+
+ at contextlib.contextmanager
+def check_py3k_warnings(*filters, **kwargs):
+    """Context manager to silence py3k warnings.
+
+    Accept 2-tuples as positional arguments:
+        ("message regexp", WarningCategory)
+
+    Optional argument:
+     - if 'quiet' is True, it does not fail if a filter catches nothing
+        (default False)
+
+    Without argument, it defaults to:
+        check_py3k_warnings(("", DeprecationWarning), quiet=False)
+    """
+    if sys.py3kwarning:
+        if not filters:
+            filters = (("", DeprecationWarning),)
+    else:
+        # It should not raise any py3k warning
+        filters = ()
+    return _filterwarnings(filters, kwargs.get('quiet'))
+
+
+class CleanImport(object):
+    """Context manager to force import to return a new module reference.
+
+    This is useful for testing module-level behaviours, such as
+    the emission of a DeprecationWarning on import.
+
+    Use like this:
+
+        with CleanImport("foo"):
+            importlib.import_module("foo") # new reference
+    """
+
+    def __init__(self, *module_names):
+        self.original_modules = sys.modules.copy()
+        for module_name in module_names:
+            if module_name in sys.modules:
+                module = sys.modules[module_name]
+                # It is possible that module_name is just an alias for
+                # another module (e.g. stub for modules renamed in 3.x).
+                # In that case, we also need delete the real module to clear
+                # the import cache.
+                if module.__name__ != module_name:
+                    del sys.modules[module.__name__]
+                del sys.modules[module_name]
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *ignore_exc):
+        sys.modules.update(self.original_modules)
+
+
+class EnvironmentVarGuard(UserDict.DictMixin):
+
+    """Class to help protect the environment variable properly.  Can be used as
+    a context manager."""
+
+    def __init__(self):
+        self._environ = os.environ
+        self._changed = {}
+
+    def __getitem__(self, envvar):
+        return self._environ[envvar]
+
+    def __setitem__(self, envvar, value):
+        # Remember the initial value on the first access
+        if envvar not in self._changed:
+            self._changed[envvar] = self._environ.get(envvar)
+        self._environ[envvar] = value
+
+    def __delitem__(self, envvar):
+        # Remember the initial value on the first access
+        if envvar not in self._changed:
+            self._changed[envvar] = self._environ.get(envvar)
+        if envvar in self._environ:
+            del self._environ[envvar]
+
+    def keys(self):
+        return self._environ.keys()
+
+    def set(self, envvar, value):
+        self[envvar] = value
+
+    def unset(self, envvar):
+        del self[envvar]
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *ignore_exc):
+        for (k, v) in self._changed.items():
+            if v is None:
+                if k in self._environ:
+                    del self._environ[k]
+            else:
+                self._environ[k] = v
+        os.environ = self._environ
+
+
+class DirsOnSysPath(object):
+    """Context manager to temporarily add directories to sys.path.
+
+    This makes a copy of sys.path, appends any directories given
+    as positional arguments, then reverts sys.path to the copied
+    settings when the context ends.
+
+    Note that *all* sys.path modifications in the body of the
+    context manager, including replacement of the object,
+    will be reverted at the end of the block.
+    """
+
+    def __init__(self, *paths):
+        self.original_value = sys.path[:]
+        self.original_object = sys.path
+        sys.path.extend(paths)
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *ignore_exc):
+        sys.path = self.original_object
+        sys.path[:] = self.original_value
+
+
+class TransientResource(object):
+
+    """Raise ResourceDenied if an exception is raised while the context manager
+    is in effect that matches the specified exception and attributes."""
+
+    def __init__(self, exc, **kwargs):
+        self.exc = exc
+        self.attrs = kwargs
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, type_=None, value=None, traceback=None):
+        """If type_ is a subclass of self.exc and value has attributes matching
+        self.attrs, raise ResourceDenied.  Otherwise let the exception
+        propagate (if any)."""
+        if type_ is not None and issubclass(self.exc, type_):
+            for attr, attr_value in self.attrs.iteritems():
+                if not hasattr(value, attr):
+                    break
+                if getattr(value, attr) != attr_value:
+                    break
+            else:
+                raise ResourceDenied("an optional resource is not available")
+
+
+ at contextlib.contextmanager
+def transient_internet(resource_name, timeout=30.0, errnos=()):
+    """Return a context manager that raises ResourceDenied when various issues
+    with the Internet connection manifest themselves as exceptions."""
+    default_errnos = [
+        ('ECONNREFUSED', 111),
+        ('ECONNRESET', 104),
+        ('ENETUNREACH', 101),
+        ('ETIMEDOUT', 110),
+    ]
+    default_gai_errnos = [
+        ('EAI_NONAME', -2),
+        ('EAI_NODATA', -5),
+    ]
+
+    denied = ResourceDenied("Resource '%s' is not available" % resource_name)
+    captured_errnos = errnos
+    gai_errnos = []
+    if not captured_errnos:
+        captured_errnos = [getattr(errno, name, num)
+                           for (name, num) in default_errnos]
+        gai_errnos = [getattr(socket, name, num)
+                      for (name, num) in default_gai_errnos]
+
+    def filter_error(err):
+        n = getattr(err, 'errno', None)
+        if (isinstance(err, socket.timeout) or
+            (isinstance(err, socket.gaierror) and n in gai_errnos) or
+            n in captured_errnos):
+            if not verbose:
+                sys.stderr.write(denied.args[0] + "\n")
+            raise denied
+
+    old_timeout = socket.getdefaulttimeout()
+    try:
+        if timeout is not None:
+            socket.setdefaulttimeout(timeout)
+        yield
+    except IOError as err:
+        # urllib can wrap original socket errors multiple times (!), we must
+        # unwrap to get at the original error.
+        while True:
+            a = err.args
+            if len(a) >= 1 and isinstance(a[0], IOError):
+                err = a[0]
+            # The error can also be wrapped as args[1]:
+            #    except socket.error as msg:
+            #        raise IOError('socket error', msg).with_traceback(sys.exc_info()[2])
+            elif len(a) >= 2 and isinstance(a[1], IOError):
+                err = a[1]
+            else:
+                break
+        filter_error(err)
+        raise
+    # XXX should we catch generic exceptions and look for their
+    # __cause__ or __context__?
+    finally:
+        socket.setdefaulttimeout(old_timeout)
+
+
+ at contextlib.contextmanager
+def captured_output(stream_name):
+    """Run the 'with' statement body using a StringIO object in place of a
+    specific attribute on the sys module.
+    Example use (with 'stream_name=stdout')::
+
+       with captured_stdout() as s:
+           print "hello"
+       assert s.getvalue() == "hello"
+    """
+    import StringIO
+    orig_stdout = getattr(sys, stream_name)
+    setattr(sys, stream_name, StringIO.StringIO())
+    try:
+        yield getattr(sys, stream_name)
+    finally:
+        setattr(sys, stream_name, orig_stdout)
+
+def captured_stdout():
+    return captured_output("stdout")
+
+def captured_stdin():
+    return captured_output("stdin")
+
+def gc_collect():
+    """Force as many objects as possible to be collected.
+
+    In non-CPython implementations of Python, this is needed because timely
+    deallocation is not guaranteed by the garbage collector.  (Even in CPython
+    this can be the case in case of reference cycles.)  This means that __del__
+    methods may be called later than expected and weakrefs may remain alive for
+    longer than expected.  This function tries its best to force all garbage
+    objects to disappear.
+    """
+    gc.collect()
+    if is_jython:
+        time.sleep(0.1)
+    gc.collect()
+    gc.collect()
+
+
+#=======================================================================
+# Decorator for running a function in a different locale, correctly resetting
+# it afterwards.
+
+def run_with_locale(catstr, *locales):
+    def decorator(func):
+        def inner(*args, **kwds):
+            try:
+                import locale
+                category = getattr(locale, catstr)
+                orig_locale = locale.setlocale(category)
+            except AttributeError:
+                # if the test author gives us an invalid category string
+                raise
+            except:
+                # cannot retrieve original locale, so do nothing
+                locale = orig_locale = None
+            else:
+                for loc in locales:
+                    try:
+                        locale.setlocale(category, loc)
+                        break
+                    except:
+                        pass
+
+            # now run the function, resetting the locale on exceptions
+            try:
+                return func(*args, **kwds)
+            finally:
+                if locale and orig_locale:
+                    locale.setlocale(category, orig_locale)
+        inner.func_name = func.func_name
+        inner.__doc__ = func.__doc__
+        return inner
+    return decorator
+
+#=======================================================================
+# Big-memory-test support. Separate from 'resources' because memory use should be configurable.
+
+# Some handy shorthands. Note that these are used for byte-limits as well
+# as size-limits, in the various bigmem tests
+_1M = 1024*1024
+_1G = 1024 * _1M
+_2G = 2 * _1G
+_4G = 4 * _1G
+
+MAX_Py_ssize_t = sys.maxsize
+
+def set_memlimit(limit):
+    global max_memuse
+    global real_max_memuse
+    sizes = {
+        'k': 1024,
+        'm': _1M,
+        'g': _1G,
+        't': 1024*_1G,
+    }
+    m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
+                 re.IGNORECASE | re.VERBOSE)
+    if m is None:
+        raise ValueError('Invalid memory limit %r' % (limit,))
+    memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
+    real_max_memuse = memlimit
+    if memlimit > MAX_Py_ssize_t:
+        memlimit = MAX_Py_ssize_t
+    if memlimit < _2G - 1:
+        raise ValueError('Memory limit %r too low to be useful' % (limit,))
+    max_memuse = memlimit
+
+def bigmemtest(minsize, memuse, overhead=5*_1M):
+    """Decorator for bigmem tests.
+
+    'minsize' is the minimum useful size for the test (in arbitrary,
+    test-interpreted units.) 'memuse' is the number of 'bytes per size' for
+    the test, or a good estimate of it. 'overhead' specifies fixed overhead,
+    independent of the testsize, and defaults to 5Mb.
+
+    The decorator tries to guess a good value for 'size' and passes it to
+    the decorated test function. If minsize * memuse is more than the
+    allowed memory use (as defined by max_memuse), the test is skipped.
+    Otherwise, minsize is adjusted upward to use up to max_memuse.
+    """
+    def decorator(f):
+        def wrapper(self):
+            if not max_memuse:
+                # If max_memuse is 0 (the default),
+                # we still want to run the tests with size set to a few kb,
+                # to make sure they work. We still want to avoid using
+                # too much memory, though, but we do that noisily.
+                maxsize = 5147
+                self.assertFalse(maxsize * memuse + overhead > 20 * _1M)
+            else:
+                maxsize = int((max_memuse - overhead) / memuse)
+                if maxsize < minsize:
+                    # Really ought to print 'test skipped' or something
+                    if verbose:
+                        sys.stderr.write("Skipping %s because of memory "
+                                         "constraint\n" % (f.__name__,))
+                    return
+                # Try to keep some breathing room in memory use
+                maxsize = max(maxsize - 50 * _1M, minsize)
+            return f(self, maxsize)
+        wrapper.minsize = minsize
+        wrapper.memuse = memuse
+        wrapper.overhead = overhead
+        return wrapper
+    return decorator
+
+def precisionbigmemtest(size, memuse, overhead=5*_1M):
+    def decorator(f):
+        def wrapper(self):
+            if not real_max_memuse:
+                maxsize = 5147
+            else:
+                maxsize = size
+
+                if real_max_memuse and real_max_memuse < maxsize * memuse:
+                    if verbose:
+                        sys.stderr.write("Skipping %s because of memory "
+                                         "constraint\n" % (f.__name__,))
+                    return
+
+            return f(self, maxsize)
+        wrapper.size = size
+        wrapper.memuse = memuse
+        wrapper.overhead = overhead
+        return wrapper
+    return decorator
+
+def bigaddrspacetest(f):
+    """Decorator for tests that fill the address space."""
+    def wrapper(self):
+        if max_memuse < MAX_Py_ssize_t:
+            if verbose:
+                sys.stderr.write("Skipping %s because of memory "
+                                 "constraint\n" % (f.__name__,))
+        else:
+            return f(self)
+    return wrapper
+
+#=======================================================================
+# unittest integration.
+
+class BasicTestRunner:
+    def run(self, test):
+        result = unittest.TestResult()
+        test(result)
+        return result
+
+def _id(obj):
+    return obj
+
+def requires_resource(resource):
+    if resource_is_enabled(resource):
+        return _id
+    else:
+        return unittest.skip("resource {0!r} is not enabled".format(resource))
+
+def cpython_only(test):
+    """
+    Decorator for tests only applicable on CPython.
+    """
+    return impl_detail(cpython=True)(test)
+
+def impl_detail(msg=None, **guards):
+    if check_impl_detail(**guards):
+        return _id
+    if msg is None:
+        guardnames, default = _parse_guards(guards)
+        if default:
+            msg = "implementation detail not available on {0}"
+        else:
+            msg = "implementation detail specific to {0}"
+        guardnames = sorted(guardnames.keys())
+        msg = msg.format(' or '.join(guardnames))
+    return unittest.skip(msg)
+
+def _parse_guards(guards):
+    # Returns a tuple ({platform_name: run_me}, default_value)
+    if not guards:
+        return ({'cpython': True}, False)
+    is_true = guards.values()[0]
+    assert guards.values() == [is_true] * len(guards)   # all True or all False
+    return (guards, not is_true)
+
+# Use the following check to guard CPython's implementation-specific tests --
+# or to run them only on the implementation(s) guarded by the arguments.
+def check_impl_detail(**guards):
+    """This function returns True or False depending on the host platform.
+       Examples:
+          if check_impl_detail():               # only on CPython (default)
+          if check_impl_detail(jython=True):    # only on Jython
+          if check_impl_detail(cpython=False):  # everywhere except on CPython
+    """
+    guards, default = _parse_guards(guards)
+    return guards.get(platform.python_implementation().lower(), default)
+
+
+
+def _run_suite(suite):
+    """Run tests from a unittest.TestSuite-derived class."""
+    if verbose:
+        runner = unittest.TextTestRunner(sys.stdout, verbosity=2)
+    else:
+        runner = BasicTestRunner()
+
+    result = runner.run(suite)
+    if not result.wasSuccessful():
+        if len(result.errors) == 1 and not result.failures:
+            err = result.errors[0][1]
+        elif len(result.failures) == 1 and not result.errors:
+            err = result.failures[0][1]
+        else:
+            err = "multiple errors occurred"
+            if not verbose:
+                err += "; run in verbose mode for details"
+        raise TestFailed(err)
+
+
+def run_unittest(*classes):
+    """Run tests from unittest.TestCase-derived classes."""
+    valid_types = (unittest.TestSuite, unittest.TestCase)
+    suite = unittest.TestSuite()
+    for cls in classes:
+        if isinstance(cls, str):
+            if cls in sys.modules:
+                suite.addTest(unittest.findTestCases(sys.modules[cls]))
+            else:
+                raise ValueError("str arguments must be keys in sys.modules")
+        elif isinstance(cls, valid_types):
+            suite.addTest(cls)
+        else:
+            suite.addTest(unittest.makeSuite(cls))
+    _run_suite(suite)
+
+
+#=======================================================================
+# doctest driver.
+
+def run_doctest(module, verbosity=None):
+    """Run doctest on the given module.  Return (#failures, #tests).
+
+    If optional argument verbosity is not specified (or is None), pass
+    test_support's belief about verbosity on to doctest.  Else doctest's
+    usual behavior is used (it searches sys.argv for -v).
+    """
+
+    import doctest
+
+    if verbosity is None:
+        verbosity = verbose
+    else:
+        verbosity = None
+
+    # Direct doctest output (normally just errors) to real stdout; doctest
+    # output shouldn't be compared by regrtest.
+    save_stdout = sys.stdout
+    sys.stdout = get_original_stdout()
+    try:
+        f, t = doctest.testmod(module, verbose=verbosity)
+        if f:
+            raise TestFailed("%d of %d doctests failed" % (f, t))
+    finally:
+        sys.stdout = save_stdout
+    if verbose:
+        print 'doctest (%s) ... %d tests with zero failures' % (module.__name__, t)
+    return f, t
+
+#=======================================================================
+# Threading support to prevent reporting refleaks when running regrtest.py -R
+
+# NOTE: we use thread._count() rather than threading.enumerate() (or the
+# moral equivalent thereof) because a threading.Thread object is still alive
+# until its __bootstrap() method has returned, even after it has been
+# unregistered from the threading module.
+# thread._count(), on the other hand, only gets decremented *after* the
+# __bootstrap() method has returned, which gives us reliable reference counts
+# at the end of a test run.
+
+def threading_setup():
+    if thread:
+        return thread._count(),
+    else:
+        return 1,
+
+def threading_cleanup(nb_threads):
+    if not thread:
+        return
+
+    _MAX_COUNT = 10
+    for count in range(_MAX_COUNT):
+        n = thread._count()
+        if n == nb_threads:
+            break
+        time.sleep(0.1)
+    # XXX print a warning in case of failure?
+
+def reap_threads(func):
+    """Use this function when threads are being used.  This will
+    ensure that the threads are cleaned up even when the test fails.
+    If threading is unavailable this function does nothing.
+    """
+    if not thread:
+        return func
+
+    @functools.wraps(func)
+    def decorator(*args):
+        key = threading_setup()
+        try:
+            return func(*args)
+        finally:
+            threading_cleanup(*key)
+    return decorator
+
+def reap_children():
+    """Use this function at the end of test_main() whenever sub-processes
+    are started.  This will help ensure that no extra children (zombies)
+    stick around to hog resources and create problems when looking
+    for refleaks.
+    """
+
+    # Reap all our dead child processes so we don't leave zombies around.
+    # These hog resources and might be causing some of the buildbots to die.
+    if hasattr(os, 'waitpid'):
+        any_process = -1
+        while True:
+            try:
+                # This will raise an exception on Windows.  That's ok.
+                pid, status = os.waitpid(any_process, os.WNOHANG)
+                if pid == 0:
+                    break
+            except:
+                break
+
+def py3k_bytes(b):
+    """Emulate the py3k bytes() constructor.
+
+    NOTE: This is only a best effort function.
+    """
+    try:
+        # memoryview?
+        return b.tobytes()
+    except AttributeError:
+        try:
+            # iterable of ints?
+            return b"".join(chr(x) for x in b)
+        except TypeError:
+            return bytes(b)



More information about the Pypy-commit mailing list