[pypy-commit] pypy default: merge branch pytest-25: Update our py.test to 2.5.2
rlamy
noreply at buildbot.pypy.org
Sun Aug 10 20:56:17 CEST 2014
Author: Ronan Lamy <ronan.lamy at gmail.com>
Branch:
Changeset: r72746:c1f4835ed3c3
Date: 2014-08-10 19:55 +0100
http://bitbucket.org/pypy/pypy/changeset/c1f4835ed3c3/
Log: merge branch pytest-25: Update our py.test to 2.5.2
diff too long, truncating to 2000 out of 10234 lines
diff --git a/_pytest/__init__.py b/_pytest/__init__.py
--- a/_pytest/__init__.py
+++ b/_pytest/__init__.py
@@ -1,2 +1,2 @@
#
-__version__ = '2.2.4.dev2'
+__version__ = '2.5.2'
diff --git a/_pytest/_argcomplete.py b/_pytest/_argcomplete.py
new file mode 100644
--- /dev/null
+++ b/_pytest/_argcomplete.py
@@ -0,0 +1,104 @@
+
+"""allow bash-completion for argparse with argcomplete if installed
+needs argcomplete>=0.5.6 for python 3.2/3.3 (older versions fail
+to find the magic string, so _ARGCOMPLETE env. var is never set, and
+this does not need special code.
+
+argcomplete does not support python 2.5 (although the changes for that
+are minor).
+
+Function try_argcomplete(parser) should be called directly before
+the call to ArgumentParser.parse_args().
+
+The filescompleter is what you normally would use on the positional
+arguments specification, in order to get "dirname/" after "dirn<TAB>"
+instead of the default "dirname ":
+
+ optparser.add_argument(Config._file_or_dir, nargs='*'
+ ).completer=filescompleter
+
+Other, application specific, completers should go in the file
+doing the add_argument calls as they need to be specified as .completer
+attributes as well. (If argcomplete is not installed, the function the
+attribute points to will not be used).
+
+SPEEDUP
+=======
+The generic argcomplete script for bash-completion
+(/etc/bash_completion.d/python-argcomplete.sh )
+uses a python program to determine startup script generated by pip.
+You can speed up completion somewhat by changing this script to include
+ # PYTHON_ARGCOMPLETE_OK
+so the the python-argcomplete-check-easy-install-script does not
+need to be called to find the entry point of the code and see if that is
+marked with PYTHON_ARGCOMPLETE_OK
+
+INSTALL/DEBUGGING
+=================
+To include this support in another application that has setup.py generated
+scripts:
+- add the line:
+ # PYTHON_ARGCOMPLETE_OK
+ near the top of the main python entry point
+- include in the file calling parse_args():
+ from _argcomplete import try_argcomplete, filescompleter
+ , call try_argcomplete just before parse_args(), and optionally add
+ filescompleter to the positional arguments' add_argument()
+If things do not work right away:
+- switch on argcomplete debugging with (also helpful when doing custom
+ completers):
+ export _ARC_DEBUG=1
+- run:
+ python-argcomplete-check-easy-install-script $(which appname)
+ echo $?
+ will echo 0 if the magic line has been found, 1 if not
+- sometimes it helps to find early on errors using:
+ _ARGCOMPLETE=1 _ARC_DEBUG=1 appname
+ which should throw a KeyError: 'COMPLINE' (which is properly set by the
+ global argcomplete script).
+"""
+
+import sys
+import os
+from glob import glob
+
+class FastFilesCompleter:
+ 'Fast file completer class'
+ def __init__(self, directories=True):
+ self.directories = directories
+
+ def __call__(self, prefix, **kwargs):
+ """only called on non option completions"""
+ if os.path.sep in prefix[1:]: #
+ prefix_dir = len(os.path.dirname(prefix) + os.path.sep)
+ else:
+ prefix_dir = 0
+ completion = []
+ globbed = []
+ if '*' not in prefix and '?' not in prefix:
+ if prefix[-1] == os.path.sep: # we are on unix, otherwise no bash
+ globbed.extend(glob(prefix + '.*'))
+ prefix += '*'
+ globbed.extend(glob(prefix))
+ for x in sorted(globbed):
+ if os.path.isdir(x):
+ x += '/'
+ # append stripping the prefix (like bash, not like compgen)
+ completion.append(x[prefix_dir:])
+ return completion
+
+if os.environ.get('_ARGCOMPLETE'):
+ # argcomplete 0.5.6 is not compatible with python 2.5.6: print/with/format
+ if sys.version_info[:2] < (2, 6):
+ sys.exit(1)
+ try:
+ import argcomplete.completers
+ except ImportError:
+ sys.exit(-1)
+ filescompleter = FastFilesCompleter()
+
+ def try_argcomplete(parser):
+ argcomplete.autocomplete(parser)
+else:
+ def try_argcomplete(parser): pass
+ filescompleter = None
diff --git a/_pytest/assertion/__init__.py b/_pytest/assertion/__init__.py
--- a/_pytest/assertion/__init__.py
+++ b/_pytest/assertion/__init__.py
@@ -3,7 +3,6 @@
"""
import py
import sys
-import pytest
from _pytest.monkeypatch import monkeypatch
from _pytest.assertion import util
@@ -19,8 +18,8 @@
to provide assert expression information. """)
group.addoption('--no-assert', action="store_true", default=False,
dest="noassert", help="DEPRECATED equivalent to --assert=plain")
- group.addoption('--nomagic', action="store_true", default=False,
- dest="nomagic", help="DEPRECATED equivalent to --assert=plain")
+ group.addoption('--nomagic', '--no-magic', action="store_true",
+ default=False, help="DEPRECATED equivalent to --assert=plain")
class AssertionState:
"""State for the assertion plugin."""
@@ -35,22 +34,25 @@
mode = "plain"
if mode == "rewrite":
try:
- import ast
+ import ast # noqa
except ImportError:
mode = "reinterp"
else:
- if sys.platform.startswith('java'):
+ # Both Jython and CPython 2.6.0 have AST bugs that make the
+ # assertion rewriting hook malfunction.
+ if (sys.platform.startswith('java') or
+ sys.version_info[:3] == (2, 6, 0)):
mode = "reinterp"
if mode != "plain":
_load_modules(mode)
m = monkeypatch()
config._cleanup.append(m.undo)
m.setattr(py.builtin.builtins, 'AssertionError',
- reinterpret.AssertionError)
+ reinterpret.AssertionError) # noqa
hook = None
if mode == "rewrite":
- hook = rewrite.AssertionRewritingHook()
- sys.meta_path.append(hook)
+ hook = rewrite.AssertionRewritingHook() # noqa
+ sys.meta_path.insert(0, hook)
warn_about_missing_assertion(mode)
config._assertstate = AssertionState(config, mode)
config._assertstate.hook = hook
@@ -73,9 +75,16 @@
def callbinrepr(op, left, right):
hook_result = item.ihook.pytest_assertrepr_compare(
config=item.config, op=op, left=left, right=right)
+
for new_expl in hook_result:
if new_expl:
- res = '\n~'.join(new_expl)
+ # Don't include pageloads of data unless we are very
+ # verbose (-vv)
+ if (sum(len(p) for p in new_expl[1:]) > 80*8
+ and item.config.option.verbose < 2):
+ new_expl[1:] = [py.builtin._totext(
+ 'Detailed information truncated, use "-vv" to show')]
+ res = py.builtin._totext('\n~').join(new_expl)
if item.config.getvalue("assertmode") == "rewrite":
# The result will be fed back a python % formatting
# operation, which will fail if there are extraneous
@@ -95,9 +104,9 @@
def _load_modules(mode):
"""Lazily import assertion related code."""
global rewrite, reinterpret
- from _pytest.assertion import reinterpret
+ from _pytest.assertion import reinterpret # noqa
if mode == "rewrite":
- from _pytest.assertion import rewrite
+ from _pytest.assertion import rewrite # noqa
def warn_about_missing_assertion(mode):
try:
diff --git a/_pytest/assertion/newinterpret.py b/_pytest/assertion/newinterpret.py
--- a/_pytest/assertion/newinterpret.py
+++ b/_pytest/assertion/newinterpret.py
@@ -11,7 +11,7 @@
from _pytest.assertion.reinterpret import BuiltinAssertionError
-if sys.platform.startswith("java") and sys.version_info < (2, 5, 2):
+if sys.platform.startswith("java"):
# See http://bugs.jython.org/issue1497
_exprs = ("BoolOp", "BinOp", "UnaryOp", "Lambda", "IfExp", "Dict",
"ListComp", "GeneratorExp", "Yield", "Compare", "Call",
diff --git a/_pytest/assertion/oldinterpret.py b/_pytest/assertion/oldinterpret.py
--- a/_pytest/assertion/oldinterpret.py
+++ b/_pytest/assertion/oldinterpret.py
@@ -526,10 +526,13 @@
# example:
def f():
return 5
+
def g():
return 3
+
def h(x):
return 'never'
+
check("f() * g() == 5")
check("not f()")
check("not (f() and g() or 0)")
diff --git a/_pytest/assertion/reinterpret.py b/_pytest/assertion/reinterpret.py
--- a/_pytest/assertion/reinterpret.py
+++ b/_pytest/assertion/reinterpret.py
@@ -1,18 +1,26 @@
import sys
import py
from _pytest.assertion.util import BuiltinAssertionError
+u = py.builtin._totext
+
class AssertionError(BuiltinAssertionError):
def __init__(self, *args):
BuiltinAssertionError.__init__(self, *args)
if args:
+ # on Python2.6 we get len(args)==2 for: assert 0, (x,y)
+ # on Python2.7 and above we always get len(args) == 1
+ # with args[0] being the (x,y) tuple.
+ if len(args) > 1:
+ toprint = args
+ else:
+ toprint = args[0]
try:
- self.msg = str(args[0])
- except py.builtin._sysex:
- raise
- except:
- self.msg = "<[broken __repr__] %s at %0xd>" %(
- args[0].__class__, id(args[0]))
+ self.msg = u(toprint)
+ except Exception:
+ self.msg = u(
+ "<[broken __repr__] %s at %0xd>"
+ % (toprint.__class__, id(toprint)))
else:
f = py.code.Frame(sys._getframe(1))
try:
@@ -44,4 +52,3 @@
from _pytest.assertion.newinterpret import interpret as reinterpret
else:
reinterpret = reinterpret_old
-
diff --git a/_pytest/assertion/rewrite.py b/_pytest/assertion/rewrite.py
--- a/_pytest/assertion/rewrite.py
+++ b/_pytest/assertion/rewrite.py
@@ -6,6 +6,7 @@
import imp
import marshal
import os
+import re
import struct
import sys
import types
@@ -14,13 +15,7 @@
from _pytest.assertion import util
-# Windows gives ENOENT in places *nix gives ENOTDIR.
-if sys.platform.startswith("win"):
- PATH_COMPONENT_NOT_DIR = errno.ENOENT
-else:
- PATH_COMPONENT_NOT_DIR = errno.ENOTDIR
-
-# py.test caches rewritten pycs in __pycache__.
+# pytest caches rewritten pycs in __pycache__.
if hasattr(imp, "get_tag"):
PYTEST_TAG = imp.get_tag() + "-PYTEST"
else:
@@ -34,17 +29,19 @@
PYTEST_TAG = "%s-%s%s-PYTEST" % (impl, ver[0], ver[1])
del ver, impl
-PYC_EXT = ".py" + "c" if __debug__ else "o"
+PYC_EXT = ".py" + (__debug__ and "c" or "o")
PYC_TAIL = "." + PYTEST_TAG + PYC_EXT
REWRITE_NEWLINES = sys.version_info[:2] != (2, 7) and sys.version_info < (3, 2)
+ASCII_IS_DEFAULT_ENCODING = sys.version_info[0] < 3
class AssertionRewritingHook(object):
- """Import hook which rewrites asserts."""
+ """PEP302 Import hook which rewrites asserts."""
def __init__(self):
self.session = None
self.modules = {}
+ self._register_with_pkg_resources()
def set_session(self, session):
self.fnpats = session.config.getini("python_files")
@@ -59,8 +56,12 @@
names = name.rsplit(".", 1)
lastname = names[-1]
pth = None
- if path is not None and len(path) == 1:
- pth = path[0]
+ if path is not None:
+ # Starting with Python 3.3, path is a _NamespacePath(), which
+ # causes problems if not converted to list.
+ path = list(path)
+ if len(path) == 1:
+ pth = path[0]
if pth is None:
try:
fd, fn, desc = imp.find_module(lastname, path)
@@ -95,12 +96,13 @@
finally:
self.session = sess
else:
- state.trace("matched test file (was specified on cmdline): %r" % (fn,))
+ state.trace("matched test file (was specified on cmdline): %r" %
+ (fn,))
# The requested module looks like a test file, so rewrite it. This is
# the most magical part of the process: load the source, rewrite the
# asserts, and load the rewritten source. We also cache the rewritten
# module code in a special pyc. We must be aware of the possibility of
- # concurrent py.test processes rewriting and loading pycs. To avoid
+ # concurrent pytest processes rewriting and loading pycs. To avoid
# tricky race conditions, we maintain the following invariant: The
# cached pyc is always a complete, valid pyc. Operations on it must be
# atomic. POSIX's atomic rename comes in handy.
@@ -116,19 +118,19 @@
# common case) or it's blocked by a non-dir node. In the
# latter case, we'll ignore it in _write_pyc.
pass
- elif e == PATH_COMPONENT_NOT_DIR:
+ elif e in [errno.ENOENT, errno.ENOTDIR]:
# One of the path components was not a directory, likely
# because we're in a zip file.
write = False
elif e == errno.EACCES:
- state.trace("read only directory: %r" % (fn_pypath.dirname,))
+ state.trace("read only directory: %r" % fn_pypath.dirname)
write = False
else:
raise
cache_name = fn_pypath.basename[:-3] + PYC_TAIL
pyc = os.path.join(cache_dir, cache_name)
- # Notice that even if we're in a read-only directory, I'm going to check
- # for a cached pyc. This may not be optimal...
+ # Notice that even if we're in a read-only directory, I'm going
+ # to check for a cached pyc. This may not be optimal...
co = _read_pyc(fn_pypath, pyc)
if co is None:
state.trace("rewriting %r" % (fn,))
@@ -153,27 +155,59 @@
mod.__file__ = co.co_filename
# Normally, this attribute is 3.2+.
mod.__cached__ = pyc
+ mod.__loader__ = self
py.builtin.exec_(co, mod.__dict__)
except:
del sys.modules[name]
raise
return sys.modules[name]
-def _write_pyc(co, source_path, pyc):
- # Technically, we don't have to have the same pyc format as (C)Python, since
- # these "pycs" should never be seen by builtin import. However, there's
- # little reason deviate, and I hope sometime to be able to use
- # imp.load_compiled to load them. (See the comment in load_module above.)
+
+
+ def is_package(self, name):
+ try:
+ fd, fn, desc = imp.find_module(name)
+ except ImportError:
+ return False
+ if fd is not None:
+ fd.close()
+ tp = desc[2]
+ return tp == imp.PKG_DIRECTORY
+
+ @classmethod
+ def _register_with_pkg_resources(cls):
+ """
+ Ensure package resources can be loaded from this loader. May be called
+ multiple times, as the operation is idempotent.
+ """
+ try:
+ import pkg_resources
+ # access an attribute in case a deferred importer is present
+ pkg_resources.__name__
+ except ImportError:
+ return
+
+ # Since pytest tests are always located in the file system, the
+ # DefaultProvider is appropriate.
+ pkg_resources.register_loader_type(cls, pkg_resources.DefaultProvider)
+
+
+def _write_pyc(state, co, source_path, pyc):
+ # Technically, we don't have to have the same pyc format as
+ # (C)Python, since these "pycs" should never be seen by builtin
+ # import. However, there's little reason deviate, and I hope
+ # sometime to be able to use imp.load_compiled to load them. (See
+ # the comment in load_module above.)
mtime = int(source_path.mtime())
try:
fp = open(pyc, "wb")
except IOError:
err = sys.exc_info()[1].errno
- if err == PATH_COMPONENT_NOT_DIR:
- # This happens when we get a EEXIST in find_module creating the
- # __pycache__ directory and __pycache__ is by some non-dir node.
- return False
- raise
+ state.trace("error writing pyc file at %s: errno=%s" %(pyc, err))
+ # we ignore any failure to write the cache file
+ # there are many reasons, permission-denied, __pycache__ being a
+ # file etc.
+ return False
try:
fp.write(imp.get_magic())
fp.write(struct.pack("<l", mtime))
@@ -185,12 +219,43 @@
RN = "\r\n".encode("utf-8")
N = "\n".encode("utf-8")
+cookie_re = re.compile(r"^[ \t\f]*#.*coding[:=][ \t]*[-\w.]+")
+BOM_UTF8 = '\xef\xbb\xbf'
+
def _rewrite_test(state, fn):
"""Try to read and rewrite *fn* and return the code object."""
try:
source = fn.read("rb")
except EnvironmentError:
return None
+ if ASCII_IS_DEFAULT_ENCODING:
+ # ASCII is the default encoding in Python 2. Without a coding
+ # declaration, Python 2 will complain about any bytes in the file
+ # outside the ASCII range. Sadly, this behavior does not extend to
+ # compile() or ast.parse(), which prefer to interpret the bytes as
+ # latin-1. (At least they properly handle explicit coding cookies.) To
+ # preserve this error behavior, we could force ast.parse() to use ASCII
+ # as the encoding by inserting a coding cookie. Unfortunately, that
+ # messes up line numbers. Thus, we have to check ourselves if anything
+ # is outside the ASCII range in the case no encoding is explicitly
+ # declared. For more context, see issue #269. Yay for Python 3 which
+ # gets this right.
+ end1 = source.find("\n")
+ end2 = source.find("\n", end1 + 1)
+ if (not source.startswith(BOM_UTF8) and
+ cookie_re.match(source[0:end1]) is None and
+ cookie_re.match(source[end1 + 1:end2]) is None):
+ if hasattr(state, "_indecode"):
+ return None # encodings imported us again, we don't rewrite
+ state._indecode = True
+ try:
+ try:
+ source.decode("ascii")
+ except UnicodeDecodeError:
+ # Let it fail in real import.
+ return None
+ finally:
+ del state._indecode
# On Python versions which are not 2.7 and less than or equal to 3.1, the
# parser expects *nix newlines.
if REWRITE_NEWLINES:
@@ -216,16 +281,16 @@
if sys.platform.startswith("win"):
# Windows grants exclusive access to open files and doesn't have atomic
# rename, so just write into the final file.
- _write_pyc(co, fn, pyc)
+ _write_pyc(state, co, fn, pyc)
else:
# When not on windows, assume rename is atomic. Dump the code object
# into a file specific to this process and atomically replace it.
proc_pyc = pyc + "." + str(os.getpid())
- if _write_pyc(co, fn, proc_pyc):
+ if _write_pyc(state, co, fn, proc_pyc):
os.rename(proc_pyc, pyc)
def _read_pyc(source, pyc):
- """Possibly read a py.test pyc containing rewritten code.
+ """Possibly read a pytest pyc containing rewritten code.
Return rewritten code if successful or None if not.
"""
@@ -240,9 +305,8 @@
except EnvironmentError:
return None
# Check for invalid or out of date pyc file.
- if (len(data) != 8 or
- data[:4] != imp.get_magic() or
- struct.unpack("<l", data[4:])[0] != mtime):
+ if (len(data) != 8 or data[:4] != imp.get_magic() or
+ struct.unpack("<l", data[4:])[0] != mtime):
return None
co = marshal.load(fp)
if not isinstance(co, types.CodeType):
@@ -259,7 +323,10 @@
_saferepr = py.io.saferepr
-from _pytest.assertion.util import format_explanation as _format_explanation
+from _pytest.assertion.util import format_explanation as _format_explanation # noqa
+
+def _should_repr_global_name(obj):
+ return not hasattr(obj, "__name__") and not py.builtin.callable(obj)
def _format_boolop(explanations, is_or):
return "(" + (is_or and " or " or " and ").join(explanations) + ")"
@@ -280,35 +347,35 @@
unary_map = {
- ast.Not : "not %s",
- ast.Invert : "~%s",
- ast.USub : "-%s",
- ast.UAdd : "+%s"
+ ast.Not: "not %s",
+ ast.Invert: "~%s",
+ ast.USub: "-%s",
+ ast.UAdd: "+%s"
}
binop_map = {
- ast.BitOr : "|",
- ast.BitXor : "^",
- ast.BitAnd : "&",
- ast.LShift : "<<",
- ast.RShift : ">>",
- ast.Add : "+",
- ast.Sub : "-",
- ast.Mult : "*",
- ast.Div : "/",
- ast.FloorDiv : "//",
- ast.Mod : "%",
- ast.Eq : "==",
- ast.NotEq : "!=",
- ast.Lt : "<",
- ast.LtE : "<=",
- ast.Gt : ">",
- ast.GtE : ">=",
- ast.Pow : "**",
- ast.Is : "is",
- ast.IsNot : "is not",
- ast.In : "in",
- ast.NotIn : "not in"
+ ast.BitOr: "|",
+ ast.BitXor: "^",
+ ast.BitAnd: "&",
+ ast.LShift: "<<",
+ ast.RShift: ">>",
+ ast.Add: "+",
+ ast.Sub: "-",
+ ast.Mult: "*",
+ ast.Div: "/",
+ ast.FloorDiv: "//",
+ ast.Mod: "%%", # escaped for string formatting
+ ast.Eq: "==",
+ ast.NotEq: "!=",
+ ast.Lt: "<",
+ ast.LtE: "<=",
+ ast.Gt: ">",
+ ast.GtE: ">=",
+ ast.Pow: "**",
+ ast.Is: "is",
+ ast.IsNot: "is not",
+ ast.In: "in",
+ ast.NotIn: "not in"
}
@@ -341,7 +408,7 @@
lineno = 0
for item in mod.body:
if (expect_docstring and isinstance(item, ast.Expr) and
- isinstance(item.value, ast.Str)):
+ isinstance(item.value, ast.Str)):
doc = item.value.s
if "PYTEST_DONT_REWRITE" in doc:
# The module has disabled assertion rewriting.
@@ -462,7 +529,8 @@
body.append(raise_)
# Clear temporary variables by setting them to None.
if self.variables:
- variables = [ast.Name(name, ast.Store()) for name in self.variables]
+ variables = [ast.Name(name, ast.Store())
+ for name in self.variables]
clear = ast.Assign(variables, ast.Name("None", ast.Load()))
self.statements.append(clear)
# Fix line numbers.
@@ -471,11 +539,12 @@
return self.statements
def visit_Name(self, name):
- # Check if the name is local or not.
+ # Display the repr of the name if it's a local variable or
+ # _should_repr_global_name() thinks it's acceptable.
locs = ast.Call(self.builtin("locals"), [], [], None, None)
- globs = ast.Call(self.builtin("globals"), [], [], None, None)
- ops = [ast.In(), ast.IsNot()]
- test = ast.Compare(ast.Str(name.id), ops, [locs, globs])
+ inlocs = ast.Compare(ast.Str(name.id), [ast.In()], [locs])
+ dorepr = self.helper("should_repr_global_name", name)
+ test = ast.BoolOp(ast.Or(), [inlocs, dorepr])
expr = ast.IfExp(test, self.display(name), ast.Str(name.id))
return name, self.explanation_param(expr)
@@ -492,7 +561,8 @@
for i, v in enumerate(boolop.values):
if i:
fail_inner = []
- self.on_failure.append(ast.If(cond, fail_inner, []))
+ # cond is set in a prior loop iteration below
+ self.on_failure.append(ast.If(cond, fail_inner, [])) # noqa
self.on_failure = fail_inner
self.push_format_context()
res, expl = self.visit(v)
@@ -548,7 +618,8 @@
new_kwarg, expl = self.visit(call.kwargs)
arg_expls.append("**" + expl)
expl = "%s(%s)" % (func_expl, ', '.join(arg_expls))
- new_call = ast.Call(new_func, new_args, new_kwargs, new_star, new_kwarg)
+ new_call = ast.Call(new_func, new_args, new_kwargs,
+ new_star, new_kwarg)
res = self.assign(new_call)
res_expl = self.explanation_param(self.display(res))
outer_expl = "%s\n{%s = %s\n}" % (res_expl, res_expl, expl)
@@ -584,7 +655,7 @@
res_expr = ast.Compare(left_res, [op], [next_res])
self.statements.append(ast.Assign([store_names[i]], res_expr))
left_res, left_expl = next_res, next_expl
- # Use py.code._reprcompare if that's available.
+ # Use pytest.assertion.util._reprcompare if that's available.
expl_call = self.helper("call_reprcompare",
ast.Tuple(syms, ast.Load()),
ast.Tuple(load_names, ast.Load()),
diff --git a/_pytest/assertion/util.py b/_pytest/assertion/util.py
--- a/_pytest/assertion/util.py
+++ b/_pytest/assertion/util.py
@@ -1,8 +1,13 @@
"""Utilities for assertion debugging"""
import py
+try:
+ from collections import Sequence
+except ImportError:
+ Sequence = list
BuiltinAssertionError = py.builtin.builtins.AssertionError
+u = py.builtin._totext
# The _reprcompare attribute on the util module is used by the new assertion
# interpretation code and assertion rewriter to detect this plugin was
@@ -10,6 +15,7 @@
# DebugInterpreter.
_reprcompare = None
+
def format_explanation(explanation):
"""This formats an explanation
@@ -20,7 +26,18 @@
for when one explanation needs to span multiple lines, e.g. when
displaying diffs.
"""
- # simplify 'assert False where False = ...'
+ explanation = _collapse_false(explanation)
+ lines = _split_explanation(explanation)
+ result = _format_lines(lines)
+ return u('\n').join(result)
+
+
+def _collapse_false(explanation):
+ """Collapse expansions of False
+
+ So this strips out any "assert False\n{where False = ...\n}"
+ blocks.
+ """
where = 0
while True:
start = where = explanation.find("False\n{False = ", where)
@@ -42,28 +59,48 @@
explanation = (explanation[:start] + explanation[start+15:end-1] +
explanation[end+1:])
where -= 17
- raw_lines = (explanation or '').split('\n')
- # escape newlines not followed by {, } and ~
+ return explanation
+
+
+def _split_explanation(explanation):
+ """Return a list of individual lines in the explanation
+
+ This will return a list of lines split on '\n{', '\n}' and '\n~'.
+ Any other newlines will be escaped and appear in the line as the
+ literal '\n' characters.
+ """
+ raw_lines = (explanation or u('')).split('\n')
lines = [raw_lines[0]]
for l in raw_lines[1:]:
if l.startswith('{') or l.startswith('}') or l.startswith('~'):
lines.append(l)
else:
lines[-1] += '\\n' + l
+ return lines
+
+def _format_lines(lines):
+ """Format the individual lines
+
+ This will replace the '{', '}' and '~' characters of our mini
+ formatting language with the proper 'where ...', 'and ...' and ' +
+ ...' text, taking care of indentation along the way.
+
+ Return a list of formatted lines.
+ """
result = lines[:1]
stack = [0]
stackcnt = [0]
for line in lines[1:]:
if line.startswith('{'):
if stackcnt[-1]:
- s = 'and '
+ s = u('and ')
else:
- s = 'where '
+ s = u('where ')
stack.append(len(result))
stackcnt[-1] += 1
stackcnt.append(0)
- result.append(' +' + ' '*(len(stack)-1) + s + line[1:])
+ result.append(u(' +') + u(' ')*(len(stack)-1) + s + line[1:])
elif line.startswith('}'):
assert line.startswith('}')
stack.pop()
@@ -71,9 +108,9 @@
result[stack[-1]] += line[1:]
else:
assert line.startswith('~')
- result.append(' '*len(stack) + line[1:])
+ result.append(u(' ')*len(stack) + line[1:])
assert len(stack) == 1
- return '\n'.join(result)
+ return result
# Provide basestring in python3
@@ -83,132 +120,163 @@
basestring = str
-def assertrepr_compare(op, left, right):
- """return specialised explanations for some operators/operands"""
- width = 80 - 15 - len(op) - 2 # 15 chars indentation, 1 space around op
+def assertrepr_compare(config, op, left, right):
+ """Return specialised explanations for some operators/operands"""
+ width = 80 - 15 - len(op) - 2 # 15 chars indentation, 1 space around op
left_repr = py.io.saferepr(left, maxsize=int(width/2))
right_repr = py.io.saferepr(right, maxsize=width-len(left_repr))
- summary = '%s %s %s' % (left_repr, op, right_repr)
+ summary = u('%s %s %s') % (left_repr, op, right_repr)
- issequence = lambda x: isinstance(x, (list, tuple))
+ issequence = lambda x: (isinstance(x, (list, tuple, Sequence))
+ and not isinstance(x, basestring))
istext = lambda x: isinstance(x, basestring)
isdict = lambda x: isinstance(x, dict)
- isset = lambda x: isinstance(x, set)
+ isset = lambda x: isinstance(x, (set, frozenset))
+ verbose = config.getoption('verbose')
explanation = None
try:
if op == '==':
if istext(left) and istext(right):
- explanation = _diff_text(left, right)
+ explanation = _diff_text(left, right, verbose)
elif issequence(left) and issequence(right):
- explanation = _compare_eq_sequence(left, right)
+ explanation = _compare_eq_sequence(left, right, verbose)
elif isset(left) and isset(right):
- explanation = _compare_eq_set(left, right)
+ explanation = _compare_eq_set(left, right, verbose)
elif isdict(left) and isdict(right):
- explanation = _diff_text(py.std.pprint.pformat(left),
- py.std.pprint.pformat(right))
+ explanation = _compare_eq_dict(left, right, verbose)
elif op == 'not in':
if istext(left) and istext(right):
- explanation = _notin_text(left, right)
- except py.builtin._sysex:
- raise
- except:
+ explanation = _notin_text(left, right, verbose)
+ except Exception:
excinfo = py.code.ExceptionInfo()
- explanation = ['(pytest_assertion plugin: representation of '
- 'details failed. Probably an object has a faulty __repr__.)',
- str(excinfo)
- ]
-
+ explanation = [
+ u('(pytest_assertion plugin: representation of details failed. '
+ 'Probably an object has a faulty __repr__.)'),
+ u(excinfo)]
if not explanation:
return None
- # Don't include pageloads of data, should be configurable
- if len(''.join(explanation)) > 80*8:
- explanation = ['Detailed information too verbose, truncated']
-
return [summary] + explanation
-def _diff_text(left, right):
- """Return the explanation for the diff between text
+def _diff_text(left, right, verbose=False):
+ """Return the explanation for the diff between text or bytes
- This will skip leading and trailing characters which are
- identical to keep the diff minimal.
+ Unless --verbose is used this will skip leading and trailing
+ characters which are identical to keep the diff minimal.
+
+ If the input are bytes they will be safely converted to text.
"""
explanation = []
- i = 0 # just in case left or right has zero length
- for i in range(min(len(left), len(right))):
- if left[i] != right[i]:
- break
- if i > 42:
- i -= 10 # Provide some context
- explanation = ['Skipping %s identical '
- 'leading characters in diff' % i]
- left = left[i:]
- right = right[i:]
- if len(left) == len(right):
- for i in range(len(left)):
- if left[-i] != right[-i]:
+ if isinstance(left, py.builtin.bytes):
+ left = u(repr(left)[1:-1]).replace(r'\n', '\n')
+ if isinstance(right, py.builtin.bytes):
+ right = u(repr(right)[1:-1]).replace(r'\n', '\n')
+ if not verbose:
+ i = 0 # just in case left or right has zero length
+ for i in range(min(len(left), len(right))):
+ if left[i] != right[i]:
break
if i > 42:
- i -= 10 # Provide some context
- explanation += ['Skipping %s identical '
- 'trailing characters in diff' % i]
- left = left[:-i]
- right = right[:-i]
+ i -= 10 # Provide some context
+ explanation = [u('Skipping %s identical leading '
+ 'characters in diff, use -v to show') % i]
+ left = left[i:]
+ right = right[i:]
+ if len(left) == len(right):
+ for i in range(len(left)):
+ if left[-i] != right[-i]:
+ break
+ if i > 42:
+ i -= 10 # Provide some context
+ explanation += [u('Skipping %s identical trailing '
+ 'characters in diff, use -v to show') % i]
+ left = left[:-i]
+ right = right[:-i]
explanation += [line.strip('\n')
for line in py.std.difflib.ndiff(left.splitlines(),
right.splitlines())]
return explanation
-def _compare_eq_sequence(left, right):
+def _compare_eq_sequence(left, right, verbose=False):
explanation = []
for i in range(min(len(left), len(right))):
if left[i] != right[i]:
- explanation += ['At index %s diff: %r != %r' %
- (i, left[i], right[i])]
+ explanation += [u('At index %s diff: %r != %r')
+ % (i, left[i], right[i])]
break
if len(left) > len(right):
- explanation += ['Left contains more items, '
- 'first extra item: %s' % py.io.saferepr(left[len(right)],)]
+ explanation += [u('Left contains more items, first extra item: %s')
+ % py.io.saferepr(left[len(right)],)]
elif len(left) < len(right):
- explanation += ['Right contains more items, '
- 'first extra item: %s' % py.io.saferepr(right[len(left)],)]
- return explanation # + _diff_text(py.std.pprint.pformat(left),
- # py.std.pprint.pformat(right))
+ explanation += [
+ u('Right contains more items, first extra item: %s') %
+ py.io.saferepr(right[len(left)],)]
+ return explanation # + _diff_text(py.std.pprint.pformat(left),
+ # py.std.pprint.pformat(right))
-def _compare_eq_set(left, right):
+def _compare_eq_set(left, right, verbose=False):
explanation = []
diff_left = left - right
diff_right = right - left
if diff_left:
- explanation.append('Extra items in the left set:')
+ explanation.append(u('Extra items in the left set:'))
for item in diff_left:
explanation.append(py.io.saferepr(item))
if diff_right:
- explanation.append('Extra items in the right set:')
+ explanation.append(u('Extra items in the right set:'))
for item in diff_right:
explanation.append(py.io.saferepr(item))
return explanation
-def _notin_text(term, text):
+def _compare_eq_dict(left, right, verbose=False):
+ explanation = []
+ common = set(left).intersection(set(right))
+ same = dict((k, left[k]) for k in common if left[k] == right[k])
+ if same and not verbose:
+ explanation += [u('Omitting %s identical items, use -v to show') %
+ len(same)]
+ elif same:
+ explanation += [u('Common items:')]
+ explanation += py.std.pprint.pformat(same).splitlines()
+ diff = set(k for k in common if left[k] != right[k])
+ if diff:
+ explanation += [u('Differing items:')]
+ for k in diff:
+ explanation += [py.io.saferepr({k: left[k]}) + ' != ' +
+ py.io.saferepr({k: right[k]})]
+ extra_left = set(left) - set(right)
+ if extra_left:
+ explanation.append(u('Left contains more items:'))
+ explanation.extend(py.std.pprint.pformat(
+ dict((k, left[k]) for k in extra_left)).splitlines())
+ extra_right = set(right) - set(left)
+ if extra_right:
+ explanation.append(u('Right contains more items:'))
+ explanation.extend(py.std.pprint.pformat(
+ dict((k, right[k]) for k in extra_right)).splitlines())
+ return explanation
+
+
+def _notin_text(term, text, verbose=False):
index = text.find(term)
head = text[:index]
tail = text[index+len(term):]
correct_text = head + tail
- diff = _diff_text(correct_text, text)
- newdiff = ['%s is contained here:' % py.io.saferepr(term, maxsize=42)]
+ diff = _diff_text(correct_text, text, verbose)
+ newdiff = [u('%s is contained here:') % py.io.saferepr(term, maxsize=42)]
for line in diff:
- if line.startswith('Skipping'):
+ if line.startswith(u('Skipping')):
continue
- if line.startswith('- '):
+ if line.startswith(u('- ')):
continue
- if line.startswith('+ '):
- newdiff.append(' ' + line[2:])
+ if line.startswith(u('+ ')):
+ newdiff.append(u(' ') + line[2:])
else:
newdiff.append(line)
return newdiff
diff --git a/_pytest/capture.py b/_pytest/capture.py
--- a/_pytest/capture.py
+++ b/_pytest/capture.py
@@ -1,43 +1,114 @@
-""" per-test stdout/stderr capturing mechanisms, ``capsys`` and ``capfd`` function arguments. """
+"""
+ per-test stdout/stderr capturing mechanisms,
+ ``capsys`` and ``capfd`` function arguments.
+"""
+# note: py.io capture was where copied from
+# pylib 1.4.20.dev2 (rev 13d9af95547e)
+import sys
+import os
+import tempfile
-import pytest, py
-import os
+import py
+import pytest
+
+try:
+ from io import StringIO
+except ImportError:
+ from StringIO import StringIO
+
+try:
+ from io import BytesIO
+except ImportError:
+ class BytesIO(StringIO):
+ def write(self, data):
+ if isinstance(data, unicode):
+ raise TypeError("not a byte value: %r" % (data,))
+ StringIO.write(self, data)
+
+if sys.version_info < (3, 0):
+ class TextIO(StringIO):
+ def write(self, data):
+ if not isinstance(data, unicode):
+ enc = getattr(self, '_encoding', 'UTF-8')
+ data = unicode(data, enc, 'replace')
+ StringIO.write(self, data)
+else:
+ TextIO = StringIO
+
+
+patchsysdict = {0: 'stdin', 1: 'stdout', 2: 'stderr'}
+
def pytest_addoption(parser):
group = parser.getgroup("general")
- group._addoption('--capture', action="store", default=None,
- metavar="method", type="choice", choices=['fd', 'sys', 'no'],
+ group._addoption(
+ '--capture', action="store", default=None,
+ metavar="method", choices=['fd', 'sys', 'no'],
help="per-test capturing method: one of fd (default)|sys|no.")
- group._addoption('-s', action="store_const", const="no", dest="capture",
+ group._addoption(
+ '-s', action="store_const", const="no", dest="capture",
help="shortcut for --capture=no.")
+
@pytest.mark.tryfirst
-def pytest_cmdline_parse(pluginmanager, args):
- # we want to perform capturing already for plugin/conftest loading
- if '-s' in args or "--capture=no" in args:
- method = "no"
- elif hasattr(os, 'dup') and '--capture=sys' not in args:
+def pytest_load_initial_conftests(early_config, parser, args, __multicall__):
+ ns = parser.parse_known_args(args)
+ method = ns.capture
+ if not method:
method = "fd"
- else:
+ if method == "fd" and not hasattr(os, "dup"):
method = "sys"
capman = CaptureManager(method)
- pluginmanager.register(capman, "capturemanager")
+ early_config.pluginmanager.register(capman, "capturemanager")
+
+ # make sure that capturemanager is properly reset at final shutdown
+ def teardown():
+ try:
+ capman.reset_capturings()
+ except ValueError:
+ pass
+
+ early_config.pluginmanager.add_shutdown(teardown)
+
+ # make sure logging does not raise exceptions at the end
+ def silence_logging_at_shutdown():
+ if "logging" in sys.modules:
+ sys.modules["logging"].raiseExceptions = False
+ early_config.pluginmanager.add_shutdown(silence_logging_at_shutdown)
+
+ # finally trigger conftest loading but while capturing (issue93)
+ capman.resumecapture()
+ try:
+ try:
+ return __multicall__.execute()
+ finally:
+ out, err = capman.suspendcapture()
+ except:
+ sys.stdout.write(out)
+ sys.stderr.write(err)
+ raise
+
def addouterr(rep, outerr):
for secname, content in zip(["out", "err"], outerr):
if content:
rep.sections.append(("Captured std%s" % secname, content))
+
class NoCapture:
def startall(self):
pass
+
def resume(self):
pass
+
def reset(self):
pass
+
def suspend(self):
return "", ""
+
class CaptureManager:
def __init__(self, defaultmethod=None):
self._method2capture = {}
@@ -45,21 +116,23 @@
def _maketempfile(self):
f = py.std.tempfile.TemporaryFile()
- newf = py.io.dupfile(f, encoding="UTF-8")
+ newf = dupfile(f, encoding="UTF-8")
f.close()
return newf
def _makestringio(self):
- return py.io.TextIO()
+ return TextIO()
def _getcapture(self, method):
if method == "fd":
- return py.io.StdCaptureFD(now=False,
- out=self._maketempfile(), err=self._maketempfile()
+ return StdCaptureFD(
+ out=self._maketempfile(),
+ err=self._maketempfile(),
)
elif method == "sys":
- return py.io.StdCapture(now=False,
- out=self._makestringio(), err=self._makestringio()
+ return StdCapture(
+ out=self._makestringio(),
+ err=self._makestringio(),
)
elif method == "no":
return NoCapture()
@@ -74,23 +147,24 @@
method = config._conftest.rget("option_capture", path=fspath)
except KeyError:
method = "fd"
- if method == "fd" and not hasattr(os, 'dup'): # e.g. jython
+ if method == "fd" and not hasattr(os, 'dup'): # e.g. jython
method = "sys"
return method
def reset_capturings(self):
- for name, cap in self._method2capture.items():
+ for cap in self._method2capture.values():
cap.reset()
def resumecapture_item(self, item):
method = self._getmethod(item.config, item.fspath)
if not hasattr(item, 'outerr'):
- item.outerr = ('', '') # we accumulate outerr on the item
+ item.outerr = ('', '') # we accumulate outerr on the item
return self.resumecapture(method)
def resumecapture(self, method=None):
if hasattr(self, '_capturing'):
- raise ValueError("cannot resume, already capturing with %r" %
+ raise ValueError(
+ "cannot resume, already capturing with %r" %
(self._capturing,))
if method is None:
method = self._defaultmethod
@@ -119,30 +193,29 @@
return "", ""
def activate_funcargs(self, pyfuncitem):
- if not hasattr(pyfuncitem, 'funcargs'):
- return
- assert not hasattr(self, '_capturing_funcargs')
- self._capturing_funcargs = capturing_funcargs = []
- for name, capfuncarg in pyfuncitem.funcargs.items():
- if name in ('capsys', 'capfd'):
- capturing_funcargs.append(capfuncarg)
- capfuncarg._start()
+ funcargs = getattr(pyfuncitem, "funcargs", None)
+ if funcargs is not None:
+ for name, capfuncarg in funcargs.items():
+ if name in ('capsys', 'capfd'):
+ assert not hasattr(self, '_capturing_funcarg')
+ self._capturing_funcarg = capfuncarg
+ capfuncarg._start()
def deactivate_funcargs(self):
- capturing_funcargs = getattr(self, '_capturing_funcargs', None)
- if capturing_funcargs is not None:
- while capturing_funcargs:
- capfuncarg = capturing_funcargs.pop()
- capfuncarg._finalize()
- del self._capturing_funcargs
+ capturing_funcarg = getattr(self, '_capturing_funcarg', None)
+ if capturing_funcarg:
+ outerr = capturing_funcarg._finalize()
+ del self._capturing_funcarg
+ return outerr
def pytest_make_collect_report(self, __multicall__, collector):
method = self._getmethod(collector.config, collector.fspath)
try:
self.resumecapture(method)
except ValueError:
- return # recursive collect, XXX refactor capturing
- # to allow for more lightweight recursive capturing
+ # recursive collect, XXX refactor capturing
+ # to allow for more lightweight recursive capturing
+ return
try:
rep = __multicall__.execute()
finally:
@@ -169,46 +242,371 @@
@pytest.mark.tryfirst
def pytest_runtest_makereport(self, __multicall__, item, call):
- self.deactivate_funcargs()
+ funcarg_outerr = self.deactivate_funcargs()
rep = __multicall__.execute()
outerr = self.suspendcapture(item)
- if not rep.passed:
- addouterr(rep, outerr)
+ if funcarg_outerr is not None:
+ outerr = (outerr[0] + funcarg_outerr[0],
+ outerr[1] + funcarg_outerr[1])
+ addouterr(rep, outerr)
if not rep.passed or rep.when == "teardown":
outerr = ('', '')
item.outerr = outerr
return rep
+error_capsysfderror = "cannot use capsys and capfd at the same time"
+
+
def pytest_funcarg__capsys(request):
"""enables capturing of writes to sys.stdout/sys.stderr and makes
captured output available via ``capsys.readouterr()`` method calls
which return a ``(out, err)`` tuple.
"""
- return CaptureFuncarg(py.io.StdCapture)
+ if "capfd" in request._funcargs:
+ raise request.raiseerror(error_capsysfderror)
+ return CaptureFixture(StdCapture)
+
def pytest_funcarg__capfd(request):
"""enables capturing of writes to file descriptors 1 and 2 and makes
captured output available via ``capsys.readouterr()`` method calls
which return a ``(out, err)`` tuple.
"""
+ if "capsys" in request._funcargs:
+ request.raiseerror(error_capsysfderror)
if not hasattr(os, 'dup'):
- py.test.skip("capfd funcarg needs os.dup")
- return CaptureFuncarg(py.io.StdCaptureFD)
+ pytest.skip("capfd funcarg needs os.dup")
+ return CaptureFixture(StdCaptureFD)
-class CaptureFuncarg:
+
+class CaptureFixture:
def __init__(self, captureclass):
- self.capture = captureclass(now=False)
+ self._capture = captureclass()
def _start(self):
- self.capture.startall()
+ self._capture.startall()
def _finalize(self):
- if hasattr(self, 'capture'):
- self.capture.reset()
- del self.capture
+ if hasattr(self, '_capture'):
+ outerr = self._outerr = self._capture.reset()
+ del self._capture
+ return outerr
def readouterr(self):
- return self.capture.readouterr()
+ try:
+ return self._capture.readouterr()
+ except AttributeError:
+ return self._outerr
def close(self):
self._finalize()
+
+
+class FDCapture:
+ """ Capture IO to/from a given os-level filedescriptor. """
+
+ def __init__(self, targetfd, tmpfile=None, patchsys=False):
+ """ save targetfd descriptor, and open a new
+ temporary file there. If no tmpfile is
+ specified a tempfile.Tempfile() will be opened
+ in text mode.
+ """
+ self.targetfd = targetfd
+ if tmpfile is None and targetfd != 0:
+ f = tempfile.TemporaryFile('wb+')
+ tmpfile = dupfile(f, encoding="UTF-8")
+ f.close()
+ self.tmpfile = tmpfile
+ self._savefd = os.dup(self.targetfd)
+ if patchsys:
+ self._oldsys = getattr(sys, patchsysdict[targetfd])
+
+ def start(self):
+ try:
+ os.fstat(self._savefd)
+ except OSError:
+ raise ValueError(
+ "saved filedescriptor not valid, "
+ "did you call start() twice?")
+ if self.targetfd == 0 and not self.tmpfile:
+ fd = os.open(os.devnull, os.O_RDONLY)
+ os.dup2(fd, 0)
+ os.close(fd)
+ if hasattr(self, '_oldsys'):
+ setattr(sys, patchsysdict[self.targetfd], DontReadFromInput())
+ else:
+ os.dup2(self.tmpfile.fileno(), self.targetfd)
+ if hasattr(self, '_oldsys'):
+ setattr(sys, patchsysdict[self.targetfd], self.tmpfile)
+
+ def done(self):
+ """ unpatch and clean up, returns the self.tmpfile (file object)
+ """
+ os.dup2(self._savefd, self.targetfd)
+ os.close(self._savefd)
+ if self.targetfd != 0:
+ self.tmpfile.seek(0)
+ if hasattr(self, '_oldsys'):
+ setattr(sys, patchsysdict[self.targetfd], self._oldsys)
+ return self.tmpfile
+
+ def writeorg(self, data):
+ """ write a string to the original file descriptor
+ """
+ tempfp = tempfile.TemporaryFile()
+ try:
+ os.dup2(self._savefd, tempfp.fileno())
+ tempfp.write(data)
+ finally:
+ tempfp.close()
+
+
+def dupfile(f, mode=None, buffering=0, raising=False, encoding=None):
+ """ return a new open file object that's a duplicate of f
+
+ mode is duplicated if not given, 'buffering' controls
+ buffer size (defaulting to no buffering) and 'raising'
+ defines whether an exception is raised when an incompatible
+ file object is passed in (if raising is False, the file
+ object itself will be returned)
+ """
+ try:
+ fd = f.fileno()
+ mode = mode or f.mode
+ except AttributeError:
+ if raising:
+ raise
+ return f
+ newfd = os.dup(fd)
+ if sys.version_info >= (3, 0):
+ if encoding is not None:
+ mode = mode.replace("b", "")
+ buffering = True
+ return os.fdopen(newfd, mode, buffering, encoding, closefd=True)
+ else:
+ f = os.fdopen(newfd, mode, buffering)
+ if encoding is not None:
+ return EncodedFile(f, encoding)
+ return f
+
+
+class EncodedFile(object):
+ def __init__(self, _stream, encoding):
+ self._stream = _stream
+ self.encoding = encoding
+
+ def write(self, obj):
+ if isinstance(obj, unicode):
+ obj = obj.encode(self.encoding)
+ self._stream.write(obj)
+
+ def writelines(self, linelist):
+ data = ''.join(linelist)
+ self.write(data)
+
+ def __getattr__(self, name):
+ return getattr(self._stream, name)
+
+
+class Capture(object):
+ def reset(self):
+ """ reset sys.stdout/stderr and return captured output as strings. """
+ if hasattr(self, '_reset'):
+ raise ValueError("was already reset")
+ self._reset = True
+ outfile, errfile = self.done(save=False)
+ out, err = "", ""
+ if outfile and not outfile.closed:
+ out = outfile.read()
+ outfile.close()
+ if errfile and errfile != outfile and not errfile.closed:
+ err = errfile.read()
+ errfile.close()
+ return out, err
+
+ def suspend(self):
+ """ return current snapshot captures, memorize tempfiles. """
+ outerr = self.readouterr()
+ outfile, errfile = self.done()
+ return outerr
+
+
+class StdCaptureFD(Capture):
+ """ This class allows to capture writes to FD1 and FD2
+ and may connect a NULL file to FD0 (and prevent
+ reads from sys.stdin). If any of the 0,1,2 file descriptors
+ is invalid it will not be captured.
+ """
+ def __init__(self, out=True, err=True, in_=True, patchsys=True):
+ self._options = {
+ "out": out,
+ "err": err,
+ "in_": in_,
+ "patchsys": patchsys,
+ }
+ self._save()
+
+ def _save(self):
+ in_ = self._options['in_']
+ out = self._options['out']
+ err = self._options['err']
+ patchsys = self._options['patchsys']
+ if in_:
+ try:
+ self.in_ = FDCapture(
+ 0, tmpfile=None,
+ patchsys=patchsys)
+ except OSError:
+ pass
+ if out:
+ tmpfile = None
+ if hasattr(out, 'write'):
+ tmpfile = out
+ try:
+ self.out = FDCapture(
+ 1, tmpfile=tmpfile,
+ patchsys=patchsys)
+ self._options['out'] = self.out.tmpfile
+ except OSError:
+ pass
+ if err:
+ if hasattr(err, 'write'):
+ tmpfile = err
+ else:
+ tmpfile = None
+ try:
+ self.err = FDCapture(
+ 2, tmpfile=tmpfile,
+ patchsys=patchsys)
+ self._options['err'] = self.err.tmpfile
+ except OSError:
+ pass
+
+ def startall(self):
+ if hasattr(self, 'in_'):
+ self.in_.start()
+ if hasattr(self, 'out'):
+ self.out.start()
+ if hasattr(self, 'err'):
+ self.err.start()
+
+ def resume(self):
+ """ resume capturing with original temp files. """
+ self.startall()
+
+ def done(self, save=True):
+ """ return (outfile, errfile) and stop capturing. """
+ outfile = errfile = None
+ if hasattr(self, 'out') and not self.out.tmpfile.closed:
+ outfile = self.out.done()
+ if hasattr(self, 'err') and not self.err.tmpfile.closed:
+ errfile = self.err.done()
+ if hasattr(self, 'in_'):
+ self.in_.done()
+ if save:
+ self._save()
+ return outfile, errfile
+
+ def readouterr(self):
+ """ return snapshot value of stdout/stderr capturings. """
+ out = self._readsnapshot('out')
+ err = self._readsnapshot('err')
+ return out, err
+
+ def _readsnapshot(self, name):
+ if hasattr(self, name):
+ f = getattr(self, name).tmpfile
+ else:
+ return ''
+
+ f.seek(0)
+ res = f.read()
+ enc = getattr(f, "encoding", None)
+ if enc:
+ res = py.builtin._totext(res, enc, "replace")
+ f.truncate(0)
+ f.seek(0)
+ return res
+
+
+class StdCapture(Capture):
+ """ This class allows to capture writes to sys.stdout|stderr "in-memory"
+ and will raise errors on tries to read from sys.stdin. It only
+ modifies sys.stdout|stderr|stdin attributes and does not
+ touch underlying File Descriptors (use StdCaptureFD for that).
+ """
+ def __init__(self, out=True, err=True, in_=True):
+ self._oldout = sys.stdout
+ self._olderr = sys.stderr
+ self._oldin = sys.stdin
+ if out and not hasattr(out, 'file'):
+ out = TextIO()
+ self.out = out
+ if err:
+ if not hasattr(err, 'write'):
+ err = TextIO()
+ self.err = err
+ self.in_ = in_
+
+ def startall(self):
+ if self.out:
+ sys.stdout = self.out
+ if self.err:
+ sys.stderr = self.err
+ if self.in_:
+ sys.stdin = self.in_ = DontReadFromInput()
+
+ def done(self, save=True):
+ """ return (outfile, errfile) and stop capturing. """
+ outfile = errfile = None
+ if self.out and not self.out.closed:
+ sys.stdout = self._oldout
+ outfile = self.out
+ outfile.seek(0)
+ if self.err and not self.err.closed:
+ sys.stderr = self._olderr
+ errfile = self.err
+ errfile.seek(0)
+ if self.in_:
+ sys.stdin = self._oldin
+ return outfile, errfile
+
+ def resume(self):
+ """ resume capturing with original temp files. """
+ self.startall()
+
+ def readouterr(self):
+ """ return snapshot value of stdout/stderr capturings. """
+ out = err = ""
+ if self.out:
+ out = self.out.getvalue()
+ self.out.truncate(0)
+ self.out.seek(0)
+ if self.err:
+ err = self.err.getvalue()
+ self.err.truncate(0)
+ self.err.seek(0)
+ return out, err
+
+
+class DontReadFromInput:
+ """Temporary stub class. Ideally when stdin is accessed, the
+ capturing should be turned off, with possibly all data captured
+ so far sent to the screen. This should be configurable, though,
+ because in automated test runs it is better to crash than
+ hang indefinitely.
+ """
+ def read(self, *args):
+ raise IOError("reading from stdin while output is captured")
+ readline = read
+ readlines = read
+ __iter__ = read
+
+ def fileno(self):
+ raise ValueError("redirected Stdin is pseudofile, has no fileno()")
+
+ def isatty(self):
+ return False
+
+ def close(self):
+ pass
diff --git a/_pytest/config.py b/_pytest/config.py
--- a/_pytest/config.py
+++ b/_pytest/config.py
@@ -1,25 +1,91 @@
""" command line options, ini-file and conftest.py processing. """
import py
+# DON't import pytest here because it causes import cycle troubles
import sys, os
+from _pytest import hookspec # the extension point definitions
from _pytest.core import PluginManager
-import pytest
-def pytest_cmdline_parse(pluginmanager, args):
- config = Config(pluginmanager)
- config.parse(args)
- return config
+# pytest startup
-def pytest_unconfigure(config):
- while 1:
- try:
- fin = config._cleanup.pop()
- except IndexError:
- break
- fin()
+def main(args=None, plugins=None):
+ """ return exit code, after performing an in-process test run.
+
+ :arg args: list of command line arguments.
+
+ :arg plugins: list of plugin objects to be auto-registered during
+ initialization.
+ """
+ config = _prepareconfig(args, plugins)
+ return config.hook.pytest_cmdline_main(config=config)
+
+class cmdline: # compatibility namespace
+ main = staticmethod(main)
+
+class UsageError(Exception):
+ """ error in pytest usage or invocation"""
+
+_preinit = []
+
+default_plugins = (
+ "mark main terminal runner python pdb unittest capture skipping "
+ "tmpdir monkeypatch recwarn pastebin helpconfig nose assertion genscript "
+ "junitxml resultlog doctest").split()
+
+def _preloadplugins():
+ assert not _preinit
+ _preinit.append(get_plugin_manager())
+
+def get_plugin_manager():
+ if _preinit:
+ return _preinit.pop(0)
+ # subsequent calls to main will create a fresh instance
+ pluginmanager = PytestPluginManager()
+ pluginmanager.config = Config(pluginmanager) # XXX attr needed?
+ for spec in default_plugins:
+ pluginmanager.import_plugin(spec)
+ return pluginmanager
+
+def _prepareconfig(args=None, plugins=None):
+ if args is None:
+ args = sys.argv[1:]
+ elif isinstance(args, py.path.local):
+ args = [str(args)]
+ elif not isinstance(args, (tuple, list)):
+ if not isinstance(args, str):
+ raise ValueError("not a string or argument list: %r" % (args,))
+ args = py.std.shlex.split(args)
+ pluginmanager = get_plugin_manager()
+ if plugins:
+ for plugin in plugins:
+ pluginmanager.register(plugin)
+ return pluginmanager.hook.pytest_cmdline_parse(
+ pluginmanager=pluginmanager, args=args)
+
+class PytestPluginManager(PluginManager):
+ def __init__(self, hookspecs=[hookspec]):
+ super(PytestPluginManager, self).__init__(hookspecs=hookspecs)
+ self.register(self)
+ if os.environ.get('PYTEST_DEBUG'):
+ err = sys.stderr
+ encoding = getattr(err, 'encoding', 'utf8')
+ try:
+ err = py.io.dupfile(err, encoding=encoding)
+ except Exception:
+ pass
+ self.trace.root.setwriter(err.write)
+
+ def pytest_configure(self, config):
+ config.addinivalue_line("markers",
+ "tryfirst: mark a hook implementation function such that the "
+ "plugin machinery will try to call it first/as early as possible.")
+ config.addinivalue_line("markers",
+ "trylast: mark a hook implementation function such that the "
+ "plugin machinery will try to call it last/as late as possible.")
+
class Parser:
- """ Parser for command line arguments. """
+ """ Parser for command line arguments and ini-file values. """
def __init__(self, usage=None, processopt=None):
self._anonymous = OptionGroup("custom options", parser=self)
@@ -35,15 +101,17 @@
if option.dest:
self._processopt(option)
- def addnote(self, note):
- self._notes.append(note)
-
def getgroup(self, name, description="", after=None):
""" get (or create) a named option Group.
- :name: unique name of the option group.
+ :name: name of the option group.
:description: long description for --help output.
:after: name of other group, used for ordering --help output.
+
+ The returned group object has an ``addoption`` method with the same
+ signature as :py:func:`parser.addoption
+ <_pytest.config.Parser.addoption>` but will be shown in the
+ respective group in the output of ``pytest. --help``.
"""
for group in self._groups:
if group.name == name:
@@ -57,33 +125,222 @@
return group
def addoption(self, *opts, **attrs):
- """ add an optparse-style option. """
+ """ register a command line option.
+
+ :opts: option names, can be short or long options.
+ :attrs: same attributes which the ``add_option()`` function of the
+ `argparse library
+ <http://docs.python.org/2/library/argparse.html>`_
+ accepts.
+
+ After command line parsing options are available on the pytest config
+ object via ``config.option.NAME`` where ``NAME`` is usually set
+ by passing a ``dest`` attribute, for example
+ ``addoption("--long", dest="NAME", ...)``.
+ """
self._anonymous.addoption(*opts, **attrs)
def parse(self, args):
- self.optparser = optparser = MyOptionParser(self)
+ from _pytest._argcomplete import try_argcomplete
+ self.optparser = self._getparser()
+ try_argcomplete(self.optparser)
+ return self.optparser.parse_args([str(x) for x in args])
+
+ def _getparser(self):
+ from _pytest._argcomplete import filescompleter
+ optparser = MyOptionParser(self)
groups = self._groups + [self._anonymous]
for group in groups:
if group.options:
desc = group.description or group.name
- optgroup = py.std.optparse.OptionGroup(optparser, desc)
- optgroup.add_options(group.options)
- optparser.add_option_group(optgroup)
- return self.optparser.parse_args([str(x) for x in args])
+ arggroup = optparser.add_argument_group(desc)
+ for option in group.options:
+ n = option.names()
+ a = option.attrs()
+ arggroup.add_argument(*n, **a)
+ # bash like autocompletion for dirs (appending '/')
+ optparser.add_argument(FILE_OR_DIR, nargs='*'
+ ).completer=filescompleter
+ return optparser
def parse_setoption(self, args, option):
- parsedoption, args = self.parse(args)
+ parsedoption = self.parse(args)
for name, value in parsedoption.__dict__.items():
setattr(option, name, value)
- return args
+ return getattr(parsedoption, FILE_OR_DIR)
+
+ def parse_known_args(self, args):
+ optparser = self._getparser()
+ args = [str(x) for x in args]
+ return optparser.parse_known_args(args)[0]
def addini(self, name, help, type=None, default=None):
- """ add an ini-file option with the given name and description. """
+ """ register an ini-file option.
+
+ :name: name of the ini-variable
+ :type: type of the variable, can be ``pathlist``, ``args`` or ``linelist``.
+ :default: default value if no ini-file option exists but is queried.
+
+ The value of ini-variables can be retrieved via a call to
+ :py:func:`config.getini(name) <_pytest.config.Config.getini>`.
+ """
assert type in (None, "pathlist", "args", "linelist")
self._inidict[name] = (help, type, default)
self._ininames.append(name)
+class ArgumentError(Exception):
+ """
+ Raised if an Argument instance is created with invalid or
+ inconsistent arguments.
+ """
+
+ def __init__(self, msg, option):
+ self.msg = msg
+ self.option_id = str(option)
+
+ def __str__(self):
+ if self.option_id:
+ return "option %s: %s" % (self.option_id, self.msg)
+ else:
+ return self.msg
+
+
+class Argument:
+ """class that mimics the necessary behaviour of py.std.optparse.Option """
+ _typ_map = {
+ 'int': int,
+ 'string': str,
+ }
+ # enable after some grace period for plugin writers
+ TYPE_WARN = False
+
+ def __init__(self, *names, **attrs):
+ """store parms in private vars for use in add_argument"""
+ self._attrs = attrs
+ self._short_opts = []
+ self._long_opts = []
+ self.dest = attrs.get('dest')
+ if self.TYPE_WARN:
+ try:
+ help = attrs['help']
+ if '%default' in help:
+ py.std.warnings.warn(
+ 'pytest now uses argparse. "%default" should be'
+ ' changed to "%(default)s" ',
+ FutureWarning,
+ stacklevel=3)
+ except KeyError:
+ pass
+ try:
+ typ = attrs['type']
+ except KeyError:
+ pass
+ else:
+ # this might raise a keyerror as well, don't want to catch that
+ if isinstance(typ, py.builtin._basestring):
+ if typ == 'choice':
+ if self.TYPE_WARN:
+ py.std.warnings.warn(
+ 'type argument to addoption() is a string %r.'
+ ' For parsearg this is optional and when supplied '
+ ' should be a type.'
+ ' (options: %s)' % (typ, names),
+ FutureWarning,
+ stacklevel=3)
+ # argparse expects a type here take it from
+ # the type of the first element
+ attrs['type'] = type(attrs['choices'][0])
+ else:
+ if self.TYPE_WARN:
+ py.std.warnings.warn(
+ 'type argument to addoption() is a string %r.'
+ ' For parsearg this should be a type.'
+ ' (options: %s)' % (typ, names),
+ FutureWarning,
+ stacklevel=3)
+ attrs['type'] = Argument._typ_map[typ]
+ # used in test_parseopt -> test_parse_defaultgetter
+ self.type = attrs['type']
+ else:
+ self.type = typ
+ try:
+ # attribute existence is tested in Config._processopt
+ self.default = attrs['default']
+ except KeyError:
+ pass
+ self._set_opt_strings(names)
+ if not self.dest:
+ if self._long_opts:
+ self.dest = self._long_opts[0][2:].replace('-', '_')
+ else:
+ try:
+ self.dest = self._short_opts[0][1:]
+ except IndexError:
+ raise ArgumentError(
+ 'need a long or short option', self)
+
+ def names(self):
+ return self._short_opts + self._long_opts
+
+ def attrs(self):
+ # update any attributes set by processopt
+ attrs = 'default dest help'.split()
+ if self.dest:
+ attrs.append(self.dest)
+ for attr in attrs:
+ try:
+ self._attrs[attr] = getattr(self, attr)
+ except AttributeError:
+ pass
+ if self._attrs.get('help'):
+ a = self._attrs['help']
+ a = a.replace('%default', '%(default)s')
+ #a = a.replace('%prog', '%(prog)s')
+ self._attrs['help'] = a
+ return self._attrs
+
+ def _set_opt_strings(self, opts):
+ """directly from optparse
+
+ might not be necessary as this is passed to argparse later on"""
+ for opt in opts:
+ if len(opt) < 2:
+ raise ArgumentError(
+ "invalid option string %r: "
+ "must be at least two characters long" % opt, self)
+ elif len(opt) == 2:
+ if not (opt[0] == "-" and opt[1] != "-"):
+ raise ArgumentError(
+ "invalid short option string %r: "
+ "must be of the form -x, (x any non-dash char)" % opt,
+ self)
+ self._short_opts.append(opt)
+ else:
+ if not (opt[0:2] == "--" and opt[2] != "-"):
+ raise ArgumentError(
+ "invalid long option string %r: "
+ "must start with --, followed by non-dash" % opt,
+ self)
+ self._long_opts.append(opt)
+
+ def __repr__(self):
+ retval = 'Argument('
+ if self._short_opts:
+ retval += '_short_opts: ' + repr(self._short_opts) + ', '
+ if self._long_opts:
+ retval += '_long_opts: ' + repr(self._long_opts) + ', '
+ retval += 'dest: ' + repr(self.dest) + ', '
+ if hasattr(self, 'type'):
+ retval += 'type: ' + repr(self.type) + ', '
+ if hasattr(self, 'default'):
+ retval += 'default: ' + repr(self.default) + ', '
+ if retval[-2:] == ', ': # always long enough to test ("Argument(" )
+ retval = retval[:-2]
+ retval += ')'
+ return retval
+
+
class OptionGroup:
def __init__(self, name, description="", parser=None):
self.name = name
@@ -92,12 +349,18 @@
self.parser = parser
def addoption(self, *optnames, **attrs):
- """ add an option to this group. """
- option = py.std.optparse.Option(*optnames, **attrs)
+ """ add an option to this group.
+
+ if a shortened version of a long option is specified it will
+ be suppressed in the help. addoption('--twowords', '--two-words')
+ results in help showing '--two-words' only, but --twowords gets
+ accepted **and** the automatic destination is in args.twowords
+ """
+ option = Argument(*optnames, **attrs)
self._addoption_instance(option, shortupper=False)
def _addoption(self, *optnames, **attrs):
- option = py.std.optparse.Option(*optnames, **attrs)
+ option = Argument(*optnames, **attrs)
self._addoption_instance(option, shortupper=True)
More information about the pypy-commit
mailing list