[py-svn] py-trunk commit 3fee6fd40486: remove dist-testing and looponfail code from core. there remain some (pytest_runner particularly) tests that test both plain and dist modes which cannot be easily dis-entangled. food for thought.
commits-noreply at bitbucket.org
commits-noreply at bitbucket.org
Wed Jan 13 16:01:59 CET 2010
# HG changeset patch -- Bitbucket.org
# Project py-trunk
# URL http://bitbucket.org/hpk42/py-trunk/overview/
# User holger krekel <holger at merlinux.eu>
# Date 1263394833 -3600
# Node ID 3fee6fd40486fd5b234a5fc6bf61ef1aa03c1c6d
# Parent 73b1a73c0ca8f0c2d246daf5b33222c8f8bafad6
remove dist-testing and looponfail code from core. there remain some (pytest_runner particularly) tests that test both plain and dist modes which cannot be easily dis-entangled. food for thought.
--- a/doc/test/plugin/resultlog.txt
+++ b/doc/test/plugin/resultlog.txt
@@ -1,8 +1,7 @@
-
-pytest_resultlog plugin
-=======================
non-xml machine-readable logging of test results.
+=================================================
+
.. contents::
:local:
--- a/testing/pytest/looponfail/__init__.py
+++ /dev/null
@@ -1,1 +0,0 @@
-#
--- a/doc/test/index.txt
+++ b/doc/test/index.txt
@@ -13,8 +13,6 @@ funcargs_: powerful parametrized test fu
`plugins`_: list of available plugins with usage examples and feature details.
-`distributed testing`_: ad-hoc run tests on multiple CPUs and platforms
-
customize_: configuration, customization, extensions
changelog_: history of changes covering last releases
--- a/testing/pytest/test_config.py
+++ b/testing/pytest/test_config.py
@@ -180,21 +180,6 @@ class TestConfigApi_getinitialnodes:
for col in col.listchain():
assert col.config is config
-class TestOptionEffects:
- def test_boxed_option_default(self, testdir):
- tmpdir = testdir.tmpdir.ensure("subdir", dir=1)
- config = testdir.reparseconfig()
- config.initsession()
- assert not config.option.boxed
- py.test.importorskip("execnet")
- config = testdir.reparseconfig(['-d', tmpdir])
- config.initsession()
- assert not config.option.boxed
-
- def test_is_not_boxed_by_default(self, testdir):
- config = testdir.reparseconfig([testdir.tmpdir])
- assert not config.option.boxed
-
class TestConfig_gettopdir:
def test_gettopdir(self, testdir):
from py.impl.test.config import gettopdir
--- a/py/impl/test/looponfail/util.py
+++ /dev/null
@@ -1,53 +0,0 @@
-import py
-
-class StatRecorder:
- def __init__(self, rootdirlist):
- self.rootdirlist = rootdirlist
- self.statcache = {}
- self.check() # snapshot state
-
- def fil(self, p):
- return p.ext in ('.py', '.txt', '.c', '.h')
- def rec(self, p):
- return p.check(dotfile=0)
-
- def waitonchange(self, checkinterval=1.0):
- while 1:
- changed = self.check()
- if changed:
- return
- py.std.time.sleep(checkinterval)
-
- def check(self, removepycfiles=True):
- changed = False
- statcache = self.statcache
- newstat = {}
- for rootdir in self.rootdirlist:
- for path in rootdir.visit(self.fil, self.rec):
- oldstat = statcache.get(path, None)
- if oldstat is not None:
- del statcache[path]
- try:
- newstat[path] = curstat = path.stat()
- except py.error.ENOENT:
- if oldstat:
- del statcache[path]
- changed = True
- else:
- if oldstat:
- if oldstat.mtime != curstat.mtime or \
- oldstat.size != curstat.size:
- changed = True
- py.builtin.print_("# MODIFIED", path)
- if removepycfiles and path.ext == ".py":
- pycfile = path + "c"
- if pycfile.check():
- pycfile.remove()
-
- else:
- changed = True
- if statcache:
- changed = True
- self.statcache = newstat
- return changed
-
--- a/doc/test/plugin/helpconfig.txt
+++ b/doc/test/plugin/helpconfig.txt
@@ -1,8 +1,7 @@
-
-pytest_helpconfig plugin
-========================
provide version info, conftest/environment config names.
+========================================================
+
.. contents::
:local:
--- a/doc/test/plugin/terminal.txt
+++ b/doc/test/plugin/terminal.txt
@@ -1,8 +1,7 @@
-
-pytest_terminal plugin
-======================
Implements terminal reporting of the full testing process.
+==========================================================
+
.. contents::
:local:
--- a/doc/test/plugin/capture.txt
+++ b/doc/test/plugin/capture.txt
@@ -1,8 +1,7 @@
-
-pytest_capture plugin
-=====================
configurable per-test stdout/stderr capturing mechanisms.
+=========================================================
+
.. contents::
:local:
--- a/doc/test/plugin/hooklog.txt
+++ b/doc/test/plugin/hooklog.txt
@@ -1,8 +1,7 @@
-
-pytest_hooklog plugin
-=====================
log invocations of extension hooks to a file.
+=============================================
+
.. contents::
:local:
--- a/testing/test_py_imports.py
+++ b/testing/test_py_imports.py
@@ -30,7 +30,6 @@ def test_importall():
base.join('test', 'testing', 'data'),
base.join('path', 'gateway',),
base.join('code', 'oldmagic.py'),
- base.join('execnet', 'script'),
base.join('compat', 'testing'),
]
if sys.version_info >= (3,0):
@@ -41,11 +40,6 @@ def test_importall():
def recurse(p):
return p.check(dotfile=0) and p.basename != "attic"
- try:
- import execnet
- except ImportError:
- execnet = None
-
for p in base.visit('*.py', recurse):
if p.basename == '__init__.py':
continue
@@ -57,10 +51,6 @@ def test_importall():
else:
relpath = relpath.replace(base.sep, '.')
modpath = 'py.impl.%s' % relpath
- if modpath.startswith("py.impl.test.dist") or \
- modpath.startswith("py.impl.test.looponfail"):
- if not execnet:
- continue
check_import(modpath)
def check_import(modpath):
--- a/doc/test/plugin/pdb.txt
+++ b/doc/test/plugin/pdb.txt
@@ -1,8 +1,7 @@
-
-pytest_pdb plugin
-=================
interactive debugging with the Python Debugger.
+===============================================
+
.. contents::
:local:
--- a/py/plugin/pytest_default.py
+++ b/py/plugin/pytest_default.py
@@ -3,16 +3,6 @@
import sys
import py
-try:
- import execnet
- if not py.path.local(py.__file__).check():
- raise ImportError("")
-except ImportError:
- execnet = None
-else:
- if not hasattr(execnet, 'Group'):
- execnet = None
-
def pytest_pyfunc_call(__multicall__, pyfuncitem):
if not __multicall__.execute():
testfunction = pyfuncitem.obj
@@ -63,10 +53,6 @@ def pytest_addoption(parser):
"space separated keywords. precede a keyword with '-' to negate. "
"Terminate the expression with ':' to treat a match as a signal "
"to run all subsequent tests. ")
- if execnet:
- group._addoption('-f', '--looponfail',
- action="store_true", dest="looponfail", default=False,
- help="run tests, re-run failing test set until all pass.")
group = parser.getgroup("collect", "collection")
group.addoption('--collectonly',
@@ -82,60 +68,15 @@ def pytest_addoption(parser):
"test process debugging and configuration")
group.addoption('--basetemp', dest="basetemp", default=None, metavar="dir",
help="base temporary directory for this test run.")
- if execnet:
- add_dist_options(parser)
- else:
- parser.hints.append(
- "'execnet>=1.0.0b4' required for --looponfailing / distributed testing."
- )
-
-def add_dist_options(parser):
- # see http://pytest.org/help/dist")
- group = parser.getgroup("dist", "distributed testing")
- group._addoption('--dist', metavar="distmode",
- action="store", choices=['load', 'each', 'no'],
- type="choice", dest="dist", default="no",
- help=("set mode for distributing tests to exec environments.\n\n"
- "each: send each test to each available environment.\n\n"
- "load: send each test to available environment.\n\n"
- "(default) no: run tests inprocess, don't distribute."))
- group._addoption('--tx', dest="tx", action="append", default=[], metavar="xspec",
- help=("add a test execution environment. some examples: "
- "--tx popen//python=python2.5 --tx socket=192.168.1.102:8888 "
- "--tx ssh=user at codespeak.net//chdir=testcache"))
- group._addoption('-d',
- action="store_true", dest="distload", default=False,
- help="load-balance tests. shortcut for '--dist=load'")
- group._addoption('-n', dest="numprocesses", metavar="numprocesses",
- action="store", type="int",
- help="shortcut for '--dist=load --tx=NUM*popen'")
- group.addoption('--rsyncdir', action="append", default=[], metavar="dir1",
- help="add directory for rsyncing to remote tx nodes.")
def pytest_configure(config):
- fixoptions(config)
setsession(config)
-def fixoptions(config):
- if execnet:
- if config.option.numprocesses:
- config.option.dist = "load"
- config.option.tx = ['popen'] * int(config.option.numprocesses)
- if config.option.distload:
- config.option.dist = "load"
-
def setsession(config):
val = config.getvalue
if val("collectonly"):
from py.impl.test.session import Session
config.setsessionclass(Session)
- elif execnet:
- if val("looponfail"):
- from py.impl.test.looponfail.remote import LooponfailingSession
- config.setsessionclass(LooponfailingSession)
- elif val("dist") != "no":
- from py.impl.test.dist.dsession import DSession
- config.setsessionclass(DSession)
# pycollect related hooks and code, should move to pytest_pycollect.py
--- a/py/plugin/pytest_pdb.py
+++ b/py/plugin/pytest_pdb.py
@@ -4,10 +4,6 @@ interactive debugging with the Python De
import py
import pdb, sys, linecache
from py.impl.test.outcome import Skipped
-try:
- import execnet
-except ImportError:
- execnet = None
def pytest_addoption(parser):
group = parser.getgroup("general")
@@ -15,16 +11,9 @@ def pytest_addoption(parser):
action="store_true", dest="usepdb", default=False,
help="start the interactive Python debugger on errors.")
-
-def pytest_configure(__multicall__, config):
- if config.option.usepdb:
- if execnet:
- __multicall__.execute()
- if config.getvalue("looponfail"):
- raise config.Error("--pdb incompatible with --looponfail.")
- if config.option.dist != "no":
- raise config.Error("--pdb incompatible with distributing tests.")
- config.pluginmanager.register(PdbInvoke())
+def pytest_configure(config):
+ if config.getvalue("usepdb"):
+ config.pluginmanager.register(PdbInvoke(), 'pdb')
class PdbInvoke:
def pytest_runtest_makereport(self, item, call):
--- a/py/impl/test/pluginmanager.py
+++ b/py/impl/test/pluginmanager.py
@@ -61,6 +61,18 @@ class PluginManager(object):
def getplugins(self):
return list(self.registry)
+ def skipifmissing(self, name):
+ if not self.hasplugin(name):
+ py.test.skip("plugin %r is missing" % name)
+
+ def hasplugin(self, name):
+ try:
+ self.getplugin(name)
+ except KeyError:
+ return False
+ else:
+ return True
+
def getplugin(self, name):
try:
return self._name2plugin[name]
--- a/py/impl/test/dist/gwmanage.py
+++ /dev/null
@@ -1,99 +0,0 @@
-"""
- instantiating, managing and rsyncing to test hosts
-"""
-
-import py
-import sys, os.path
-import execnet
-from execnet.gateway_base import RemoteError
-
-class GatewayManager:
- RemoteError = RemoteError
- def __init__(self, specs, hook, defaultchdir="pyexecnetcache"):
- self.specs = []
- self.hook = hook
- self.group = execnet.Group()
- for spec in specs:
- if not isinstance(spec, execnet.XSpec):
- spec = execnet.XSpec(spec)
- if not spec.chdir and not spec.popen:
- spec.chdir = defaultchdir
- self.specs.append(spec)
-
- def makegateways(self):
- assert not list(self.group)
- for spec in self.specs:
- gw = self.group.makegateway(spec)
- self.hook.pytest_gwmanage_newgateway(
- gateway=gw, platinfo=gw._rinfo())
-
- def rsync(self, source, notify=None, verbose=False, ignores=None):
- """ perform rsync to all remote hosts.
- """
- rsync = HostRSync(source, verbose=verbose, ignores=ignores)
- seen = py.builtin.set()
- gateways = []
- for gateway in self.group:
- spec = gateway.spec
- if spec.popen and not spec.chdir:
- # XXX this assumes that sources are python-packages
- # and that adding the basedir does not hurt
- gateway.remote_exec("""
- import sys ; sys.path.insert(0, %r)
- """ % os.path.dirname(str(source))).waitclose()
- continue
- if spec not in seen:
- def finished():
- if notify:
- notify("rsyncrootready", spec, source)
- rsync.add_target_host(gateway, finished=finished)
- seen.add(spec)
- gateways.append(gateway)
- if seen:
- self.hook.pytest_gwmanage_rsyncstart(
- source=source,
- gateways=gateways,
- )
- rsync.send()
- self.hook.pytest_gwmanage_rsyncfinish(
- source=source,
- gateways=gateways,
- )
-
- def exit(self):
- self.group.terminate()
-
-class HostRSync(execnet.RSync):
- """ RSyncer that filters out common files
- """
- def __init__(self, sourcedir, *args, **kwargs):
- self._synced = {}
- ignores= None
- if 'ignores' in kwargs:
- ignores = kwargs.pop('ignores')
- self._ignores = ignores or []
- super(HostRSync, self).__init__(sourcedir=sourcedir, **kwargs)
-
- def filter(self, path):
- path = py.path.local(path)
- if not path.ext in ('.pyc', '.pyo'):
- if not path.basename.endswith('~'):
- if path.check(dotfile=0):
- for x in self._ignores:
- if path == x:
- break
- else:
- return True
-
- def add_target_host(self, gateway, finished=None):
- remotepath = os.path.basename(self._sourcedir)
- super(HostRSync, self).add_target(gateway, remotepath,
- finishedcallback=finished,
- delete=True,)
-
- def _report_send_file(self, gateway, modified_rel_path):
- if self._verbose:
- path = os.path.basename(self._sourcedir) + "/" + modified_rel_path
- remotepath = gateway.spec.chdir
- py.builtin.print_('%s:%s <= %s' %
- (gateway.spec, remotepath, path))
--- a/py/impl/test/config.py
+++ b/py/impl/test/config.py
@@ -198,8 +198,10 @@ class Config(object):
modpath = py.path.local(mod.__file__).dirpath()
l = []
for relroot in relroots:
- relroot = relroot.replace("/", py.path.local.sep)
- l.append(modpath.join(relroot, abs=True))
+ if not isinstance(relroot, py.path.local):
+ relroot = relroot.replace("/", py.path.local.sep)
+ relroot = modpath.join(relroot, abs=True)
+ l.append(relroot)
return l
def addoptions(self, groupname, *specs):
@@ -253,46 +255,10 @@ class Config(object):
self.trace("instantiated session %r" % session)
return session
- def getxspecs(self):
- xspeclist = []
- for xspec in self.getvalue("tx"):
- i = xspec.find("*")
- try:
- num = int(xspec[:i])
- except ValueError:
- xspeclist.append(xspec)
- else:
- xspeclist.extend([xspec[i+1:]] * num)
- if not xspeclist:
- raise self.Error("MISSING test execution (tx) nodes: please specify --tx")
- import execnet
- return [execnet.XSpec(x) for x in xspeclist]
-
- def getrsyncdirs(self):
- config = self
- candidates = [py._pydir] + config.option.rsyncdir
- conftestroots = config.getconftest_pathlist("rsyncdirs")
- if conftestroots:
- candidates.extend(conftestroots)
- roots = []
- for root in candidates:
- root = py.path.local(root).realpath()
- if not root.check():
- raise config.Error("rsyncdir doesn't exist: %r" %(root,))
- if root not in roots:
- roots.append(root)
- return roots
-
#
# helpers
#
-def checkmarshal(name, value):
- try:
- py.std.marshal.dumps(value)
- except ValueError:
- raise ValueError("%s=%r is not marshallable" %(name, value))
-
def gettopdir(args):
""" return the top directory for the given paths.
if the common base dir resides in a python package
--- a/testing/pytest/dist/acceptance_test.py
+++ /dev/null
@@ -1,119 +0,0 @@
-import py
-
-class TestDistribution:
- def test_manytests_to_one_popen(self, testdir):
- p1 = testdir.makepyfile("""
- import py
- def test_fail0():
- assert 0
- def test_fail1():
- raise ValueError()
- def test_ok():
- pass
- def test_skip():
- py.test.skip("hello")
- """,
- )
- result = testdir.runpytest(p1, '-d', '--tx=popen', '--tx=popen')
- result.stdout.fnmatch_lines([
- "*0*popen*Python*",
- "*1*popen*Python*",
- "*2 failed, 1 passed, 1 skipped*",
- ])
- assert result.ret == 1
-
- def test_dist_conftest_specified(self, testdir):
- p1 = testdir.makepyfile("""
- import py
- def test_fail0():
- assert 0
- def test_fail1():
- raise ValueError()
- def test_ok():
- pass
- def test_skip():
- py.test.skip("hello")
- """,
- )
- testdir.makeconftest("""
- option_tx = 'popen popen popen'.split()
- """)
- result = testdir.runpytest(p1, '-d')
- result.stdout.fnmatch_lines([
- "*0*popen*Python*",
- "*1*popen*Python*",
- "*2*popen*Python*",
- "*2 failed, 1 passed, 1 skipped*",
- ])
- assert result.ret == 1
-
- def test_dist_tests_with_crash(self, testdir):
- if not hasattr(py.std.os, 'kill'):
- py.test.skip("no os.kill")
-
- p1 = testdir.makepyfile("""
- import py
- def test_fail0():
- assert 0
- def test_fail1():
- raise ValueError()
- def test_ok():
- pass
- def test_skip():
- py.test.skip("hello")
- def test_crash():
- import time
- import os
- time.sleep(0.5)
- os.kill(os.getpid(), 15)
- """
- )
- result = testdir.runpytest(p1, '-d', '--tx=3*popen')
- result.stdout.fnmatch_lines([
- "*popen*Python*",
- "*popen*Python*",
- "*popen*Python*",
- "*node down*",
- "*3 failed, 1 passed, 1 skipped*"
- ])
- assert result.ret == 1
-
- def test_distribution_rsyncdirs_example(self, testdir):
- source = testdir.mkdir("source")
- dest = testdir.mkdir("dest")
- subdir = source.mkdir("example_pkg")
- subdir.ensure("__init__.py")
- p = subdir.join("test_one.py")
- p.write("def test_5(): assert not __file__.startswith(%r)" % str(p))
- result = testdir.runpytest("-d", "--rsyncdir=%(subdir)s" % locals(),
- "--tx=popen//chdir=%(dest)s" % locals(), p)
- assert result.ret == 0
- result.stdout.fnmatch_lines([
- "*0* *popen*platform*",
- #"RSyncStart: [G1]",
- #"RSyncFinished: [G1]",
- "*1 passed*"
- ])
- assert dest.join(subdir.basename).check(dir=1)
-
- def test_dist_each(self, testdir):
- interpreters = []
- for name in ("python2.4", "python2.5"):
- interp = py.path.local.sysfind(name)
- if interp is None:
- py.test.skip("%s not found" % name)
- interpreters.append(interp)
-
- testdir.makepyfile(__init__="", test_one="""
- import sys
- def test_hello():
- print("%s...%s" % sys.version_info[:2])
- assert 0
- """)
- args = ["--dist=each"]
- args += ["--tx", "popen//python=%s" % interpreters[0]]
- args += ["--tx", "popen//python=%s" % interpreters[1]]
- result = testdir.runpytest(*args)
- s = result.stdout.str()
- assert "2.4" in s
- assert "2.5" in s
--- a/testing/pytest/dist/test_gwmanage.py
+++ /dev/null
@@ -1,127 +0,0 @@
-"""
- tests for
- - gateway management
- - manage rsyncing of hosts
-
-"""
-
-import py
-import os
-from py.impl.test.dist.gwmanage import GatewayManager, HostRSync
-from py.impl.test.pluginmanager import HookRelay, Registry
-from py.plugin import hookspec
-import execnet
-
-def pytest_funcarg__hookrecorder(request):
- _pytest = request.getfuncargvalue('_pytest')
- hook = request.getfuncargvalue('hook')
- return _pytest.gethookrecorder(hook)
-
-def pytest_funcarg__hook(request):
- return HookRelay(hookspec, Registry())
-
-class TestGatewayManagerPopen:
- def test_popen_no_default_chdir(self, hook):
- gm = GatewayManager(["popen"], hook)
- assert gm.specs[0].chdir is None
-
- def test_default_chdir(self, hook):
- l = ["ssh=noco", "socket=xyz"]
- for spec in GatewayManager(l, hook).specs:
- assert spec.chdir == "pyexecnetcache"
- for spec in GatewayManager(l, hook, defaultchdir="abc").specs:
- assert spec.chdir == "abc"
-
- def test_popen_makegateway_events(self, hook, hookrecorder, _pytest):
- hm = GatewayManager(["popen"] * 2, hook)
- hm.makegateways()
- call = hookrecorder.popcall("pytest_gwmanage_newgateway")
- assert call.gateway.spec == execnet.XSpec("popen")
- assert call.gateway.id == "gw0"
- assert call.platinfo.executable == call.gateway._rinfo().executable
- call = hookrecorder.popcall("pytest_gwmanage_newgateway")
- assert call.gateway.id == "gw1"
- assert len(hm.group) == 2
- hm.exit()
- assert not len(hm.group)
-
- def test_popens_rsync(self, hook, mysetup):
- source = mysetup.source
- hm = GatewayManager(["popen"] * 2, hook)
- hm.makegateways()
- assert len(hm.group) == 2
- for gw in hm.group:
- class pseudoexec:
- args = []
- def __init__(self, *args):
- self.args.extend(args)
- def waitclose(self):
- pass
- gw.remote_exec = pseudoexec
- l = []
- hm.rsync(source, notify=lambda *args: l.append(args))
- assert not l
- hm.exit()
- assert not len(hm.group)
- assert "sys.path.insert" in gw.remote_exec.args[0]
-
- def test_rsync_popen_with_path(self, hook, mysetup):
- source, dest = mysetup.source, mysetup.dest
- hm = GatewayManager(["popen//chdir=%s" %dest] * 1, hook)
- hm.makegateways()
- source.ensure("dir1", "dir2", "hello")
- l = []
- hm.rsync(source, notify=lambda *args: l.append(args))
- assert len(l) == 1
- assert l[0] == ("rsyncrootready", hm.group['gw0'].spec, source)
- hm.exit()
- dest = dest.join(source.basename)
- assert dest.join("dir1").check()
- assert dest.join("dir1", "dir2").check()
- assert dest.join("dir1", "dir2", 'hello').check()
-
- def test_rsync_same_popen_twice(self, hook, mysetup, hookrecorder):
- source, dest = mysetup.source, mysetup.dest
- hm = GatewayManager(["popen//chdir=%s" %dest] * 2, hook)
- hm.makegateways()
- source.ensure("dir1", "dir2", "hello")
- hm.rsync(source)
- call = hookrecorder.popcall("pytest_gwmanage_rsyncstart")
- assert call.source == source
- assert len(call.gateways) == 1
- assert call.gateways[0] in hm.group
- call = hookrecorder.popcall("pytest_gwmanage_rsyncfinish")
-
-class pytest_funcarg__mysetup:
- def __init__(self, request):
- tmp = request.getfuncargvalue('tmpdir')
- self.source = tmp.mkdir("source")
- self.dest = tmp.mkdir("dest")
-
-class TestHRSync:
- def test_hrsync_filter(self, mysetup):
- source, dest = mysetup.source, mysetup.dest
- source.ensure("dir", "file.txt")
- source.ensure(".svn", "entries")
- source.ensure(".somedotfile", "moreentries")
- source.ensure("somedir", "editfile~")
- syncer = HostRSync(source)
- l = list(source.visit(rec=syncer.filter,
- fil=syncer.filter))
- assert len(l) == 3
- basenames = [x.basename for x in l]
- assert 'dir' in basenames
- assert 'file.txt' in basenames
- assert 'somedir' in basenames
-
- def test_hrsync_one_host(self, mysetup):
- source, dest = mysetup.source, mysetup.dest
- gw = execnet.makegateway("popen//chdir=%s" % dest)
- finished = []
- rsync = HostRSync(source)
- rsync.add_target_host(gw, finished=lambda: finished.append(1))
- source.join("hello.py").write("world")
- rsync.send()
- gw.exit()
- assert dest.join(source.basename, "hello.py").check()
- assert len(finished) == 1
--- a/py/impl/test/looponfail/remote.py
+++ /dev/null
@@ -1,165 +0,0 @@
-"""
- LooponfailingSession and Helpers.
-
- NOTE that one really has to avoid loading and depending on
- application modules within the controlling process
- (the one that starts repeatedly test processes)
- otherwise changes to source code can crash
- the controlling process which should never happen.
-"""
-import py
-import sys
-import execnet
-from py.impl.test.session import Session
-from py.impl.test.looponfail import util
-
-class LooponfailingSession(Session):
- def __init__(self, config):
- super(LooponfailingSession, self).__init__(config=config)
- self.rootdirs = [self.config.topdir] # xxx dist_rsync_roots?
- self.statrecorder = util.StatRecorder(self.rootdirs)
- self.remotecontrol = RemoteControl(self.config)
- self.out = py.io.TerminalWriter()
-
- def main(self, initialitems):
- try:
- self.loopstate = loopstate = LoopState([])
- self.remotecontrol.setup()
- while 1:
- self.loop_once(loopstate)
- if not loopstate.colitems and loopstate.wasfailing:
- continue # the last failures passed, let's rerun all
- self.statrecorder.waitonchange(checkinterval=2.0)
- except KeyboardInterrupt:
- print
-
- def loop_once(self, loopstate):
- colitems = loopstate.colitems
- loopstate.wasfailing = colitems and len(colitems)
- loopstate.colitems = self.remotecontrol.runsession(colitems or ())
- self.remotecontrol.setup()
-
-class LoopState:
- def __init__(self, colitems=None):
- self.colitems = colitems
-
-class RemoteControl(object):
- def __init__(self, config):
- self.config = config
-
- def trace(self, *args):
- if self.config.option.debug:
- msg = " ".join([str(x) for x in args])
- py.builtin.print_("RemoteControl:", msg)
-
- def initgateway(self):
- return execnet.makegateway("popen")
-
- def setup(self, out=None):
- if out is None:
- out = py.io.TerminalWriter()
- if hasattr(self, 'gateway'):
- raise ValueError("already have gateway %r" % self.gateway)
- self.trace("setting up slave session")
- self.gateway = self.initgateway()
- self.channel = channel = self.gateway.remote_exec("""
- import os
- import py
- chdir = channel.receive()
- outchannel = channel.gateway.newchannel()
- channel.send(outchannel)
- os.chdir(chdir) # unpickling config uses cwd as topdir
- config_state = channel.receive()
- fullwidth, hasmarkup = channel.receive()
- py.test.config.__setstate__(config_state)
-
- import sys
- sys.stdout = sys.stderr = outchannel.makefile('w')
-
- from py.impl.test.looponfail.remote import slave_runsession
- slave_runsession(channel, py.test.config, fullwidth, hasmarkup)
- """)
- channel.send(str(self.config.topdir))
- remote_outchannel = channel.receive()
- def write(s):
- out._file.write(s)
- out._file.flush()
- remote_outchannel.setcallback(write)
- channel.send(self.config.__getstate__())
- channel.send((out.fullwidth, out.hasmarkup))
- self.trace("set up of slave session complete")
-
- def ensure_teardown(self):
- if hasattr(self, 'channel'):
- if not self.channel.isclosed():
- self.trace("closing", self.channel)
- self.channel.close()
- del self.channel
- if hasattr(self, 'gateway'):
- self.trace("exiting", self.gateway)
- self.gateway.exit()
- del self.gateway
-
- def runsession(self, colitems=()):
- try:
- self.trace("sending", colitems)
- trails = colitems
- self.channel.send(trails)
- try:
- return self.channel.receive()
- except self.channel.RemoteError:
- e = sys.exc_info()[1]
- self.trace("ERROR", e)
- raise
- finally:
- self.ensure_teardown()
-
-def slave_runsession(channel, config, fullwidth, hasmarkup):
- """ we run this on the other side. """
- if config.option.debug:
- def DEBUG(*args):
- print(" ".join(map(str, args)))
- else:
- def DEBUG(*args): pass
-
- DEBUG("SLAVE: received configuration, using topdir:", config.topdir)
- #config.option.session = None
- config.option.looponfail = False
- config.option.usepdb = False
- trails = channel.receive()
- config.pluginmanager.do_configure(config)
- DEBUG("SLAVE: initsession()")
- session = config.initsession()
- # XXX configure the reporter object's terminal writer more directly
- # XXX and write a test for this remote-terminal setting logic
- config.pytest_terminal_hasmarkup = hasmarkup
- config.pytest_terminal_fullwidth = fullwidth
- if trails:
- colitems = []
- for trail in trails:
- try:
- colitem = config._rootcol.fromtrail(trail)
- except ValueError:
- #XXX send info for "test disappeared" or so
- continue
- colitems.append(colitem)
- else:
- colitems = config.getinitialnodes()
- session.shouldclose = channel.isclosed
-
- class Failures(list):
- def pytest_runtest_logreport(self, report):
- if report.failed:
- self.append(report)
- pytest_collectreport = pytest_runtest_logreport
-
- failreports = Failures()
- session.pluginmanager.register(failreports)
-
- DEBUG("SLAVE: starting session.main()")
- session.main(colitems)
- session.config.hook.pytest_looponfailinfo(
- failreports=list(failreports),
- rootdirs=[config.topdir])
- rootcol = session.config._rootcol
- channel.send([rootcol.totrail(rep.getnode()) for rep in failreports])
--- a/testing/plugin/test_pytest_terminal.py
+++ b/testing/plugin/test_pytest_terminal.py
@@ -3,10 +3,6 @@ terminal reporting of the full testing p
"""
import py
import sys
-try:
- import execnet
-except ImportError:
- execnet = None
# ===============================================================================
# plugin tests
@@ -45,12 +41,13 @@ def pytest_generate_tests(metafunc):
id="verbose",
funcargs={'option': Option(verbose=True)}
)
- nodist = getattr(metafunc.function, 'nodist', False)
- if execnet and not nodist:
- metafunc.addcall(
- id="verbose-dist",
- funcargs={'option': Option(dist='each', verbose=True)}
- )
+ if metafunc.config.pluginmanager.hasplugin("xdist"):
+ nodist = getattr(metafunc.function, 'nodist', False)
+ if not nodist:
+ metafunc.addcall(
+ id="verbose-dist",
+ funcargs={'option': Option(dist='each', verbose=True)}
+ )
class TestTerminal:
def test_pass_skip_fail(self, testdir, option):
@@ -545,7 +542,7 @@ class TestTerminalFunctional:
"y* = 'xxxxxx*"
])
- def test_verbose_reporting(self, testdir):
+ def test_verbose_reporting(self, testdir, pytestconfig):
p1 = testdir.makepyfile("""
import py
def test_fail():
@@ -568,12 +565,12 @@ class TestTerminalFunctional:
"*test_verbose_reporting.py:10: test_gen*FAIL*",
])
assert result.ret == 1
- if execnet:
- result = testdir.runpytest(p1, '-v', '-n 1')
- result.stdout.fnmatch_lines([
- "*FAIL*test_verbose_reporting.py:2: test_fail*",
- ])
- assert result.ret == 1
+ pytestconfig.pluginmanager.skipifmissing("xdist")
+ result = testdir.runpytest(p1, '-v', '-n 1')
+ result.stdout.fnmatch_lines([
+ "*FAIL*test_verbose_reporting.py:2: test_fail*",
+ ])
+ assert result.ret == 1
def test_getreportopt():
--- a/doc/test/plugin/figleaf.txt
+++ b/doc/test/plugin/figleaf.txt
@@ -1,17 +1,41 @@
-pytest_figleaf plugin
-=====================
-add options to drive and report python test coverage using the 'figleaf' package.
+report test coverage using the 'figleaf' package.
+=================================================
-Install the `pytest-figleaf`_ plugin to use figleaf coverage testing::
- easy_install pytest-figleaf
+.. contents::
+ :local:
-or::
-
- pip install pytest-figleaf
+Usage
+---------------
-This will make py.test have figleaf related options.
+after pip or easy_install mediated installation of ``pytest-figleaf`` you can type::
-.. _`pytest-figleaf`: http://bitbucket.org/hpk42/pytest-figleaf/
+ py.test --figleaf [...]
+to enable figleaf coverage in your test run. A default ".figleaf" data file
+and "html" directory will be created. You can use ``--fig-data``
+and ``fig-html`` to modify the paths.
+
+command line options
+--------------------
+
+
+``--figleaf``
+ trace python coverage with figleaf and write HTML for files below the current working dir
+``--fig-data=dir``
+ set tracing file, default: ".figleaf".
+``--fig-html=dir``
+ set html reporting dir, default "html".
+
+Start improving this plugin in 30 seconds
+=========================================
+
+
+1. Download `pytest_figleaf.py`_ plugin source code
+2. put it somewhere as ``pytest_figleaf.py`` into your import path
+3. a subsequent ``py.test`` run will use your local version
+
+Checkout customize_, other plugins_ or `get in contact`_.
+
+.. include:: links.txt
--- a/doc/test/plugin/genscript.txt
+++ b/doc/test/plugin/genscript.txt
@@ -1,8 +1,7 @@
-
-pytest_genscript plugin
-=======================
generate standalone test script to be distributed along with an application.
+============================================================================
+
.. contents::
:local:
--- a/doc/test/plugin/recwarn.txt
+++ b/doc/test/plugin/recwarn.txt
@@ -1,8 +1,7 @@
-
-pytest_recwarn plugin
-=====================
helpers for asserting deprecation and other warnings.
+=====================================================
+
.. contents::
:local:
--- a/doc/test/plugin/links.txt
+++ b/doc/test/plugin/links.txt
@@ -1,5 +1,5 @@
.. _`pytest_logxml.py`: http://bitbucket.org/hpk42/py-trunk/raw/1.2.0a1/py/plugin/pytest_logxml.py
-.. _`helpconfig`: helpconfig.html
+.. _`terminal`: terminal.html
.. _`pytest_recwarn.py`: http://bitbucket.org/hpk42/py-trunk/raw/1.2.0a1/py/plugin/pytest_recwarn.py
.. _`unittest`: unittest.html
.. _`pytest_monkeypatch.py`: http://bitbucket.org/hpk42/py-trunk/raw/1.2.0a1/py/plugin/pytest_monkeypatch.py
@@ -15,11 +15,14 @@
.. _`pytest_nose.py`: http://bitbucket.org/hpk42/py-trunk/raw/1.2.0a1/py/plugin/pytest_nose.py
.. _`pytest_restdoc.py`: http://bitbucket.org/hpk42/py-trunk/raw/1.2.0a1/py/plugin/pytest_restdoc.py
.. _`restdoc`: restdoc.html
+.. _`xdist`: xdist.html
.. _`pytest_pastebin.py`: http://bitbucket.org/hpk42/py-trunk/raw/1.2.0a1/py/plugin/pytest_pastebin.py
.. _`pytest_tmpdir.py`: http://bitbucket.org/hpk42/py-trunk/raw/1.2.0a1/py/plugin/pytest_tmpdir.py
-.. _`terminal`: terminal.html
+.. _`pytest_figleaf.py`: http://bitbucket.org/hpk42/py-trunk/raw/1.2.0a1/py/plugin/pytest_figleaf.py
.. _`pytest_hooklog.py`: http://bitbucket.org/hpk42/py-trunk/raw/1.2.0a1/py/plugin/pytest_hooklog.py
.. _`logxml`: logxml.html
+.. _`helpconfig`: helpconfig.html
+.. _`plugin.py`: http://bitbucket.org/hpk42/py-trunk/raw/1.2.0a1/py/plugin/plugin.py
.. _`pytest_skipping.py`: http://bitbucket.org/hpk42/py-trunk/raw/1.2.0a1/py/plugin/pytest_skipping.py
.. _`checkout the py.test development version`: ../../install.html#checkout
.. _`pytest_helpconfig.py`: http://bitbucket.org/hpk42/py-trunk/raw/1.2.0a1/py/plugin/pytest_helpconfig.py
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,9 @@
Changes between 1.X and 1.1.1
=====================================
+- moved dist/looponfailing from py.test core into a new
+ separately released pytest-xdist plugin.
+
- new junitxml plugin: --xml=path will generate a junit style xml file
which is parseable e.g. by the hudson continous integration server.
--- a/doc/test/plugin/nose.txt
+++ b/doc/test/plugin/nose.txt
@@ -1,8 +1,7 @@
-
-pytest_nose plugin
-==================
nose-compatibility plugin: allow to run nose test suites natively.
+==================================================================
+
.. contents::
:local:
--- a/testing/pytest/test_pickling.py
+++ /dev/null
@@ -1,198 +0,0 @@
-import py
-import pickle
-
-def setglobals(request):
- oldconfig = py.test.config
- print("setting py.test.config to None")
- py.test.config = None
- def resetglobals():
- py.builtin.print_("setting py.test.config to", oldconfig)
- py.test.config = oldconfig
- request.addfinalizer(resetglobals)
-
-def pytest_funcarg__testdir(request):
- setglobals(request)
- return request.getfuncargvalue("testdir")
-
-class ImmutablePickleTransport:
- def __init__(self, request):
- from py.impl.test.dist.mypickle import ImmutablePickler
- self.p1 = ImmutablePickler(uneven=0)
- self.p2 = ImmutablePickler(uneven=1)
- setglobals(request)
-
- def p1_to_p2(self, obj):
- return self.p2.loads(self.p1.dumps(obj))
-
- def p2_to_p1(self, obj):
- return self.p1.loads(self.p2.dumps(obj))
-
- def unifyconfig(self, config):
- p2config = self.p1_to_p2(config)
- p2config._initafterpickle(config.topdir)
- return p2config
-
-pytest_funcarg__pickletransport = ImmutablePickleTransport
-
-class TestImmutablePickling:
- def test_pickle_config(self, testdir, pickletransport):
- config1 = testdir.parseconfig()
- assert config1.topdir == testdir.tmpdir
- testdir.chdir()
- p2config = pickletransport.p1_to_p2(config1)
- assert p2config.topdir.realpath() == config1.topdir.realpath()
- config_back = pickletransport.p2_to_p1(p2config)
- assert config_back is config1
-
- def test_pickle_modcol(self, testdir, pickletransport):
- modcol1 = testdir.getmodulecol("def test_one(): pass")
- modcol2a = pickletransport.p1_to_p2(modcol1)
- modcol2b = pickletransport.p1_to_p2(modcol1)
- assert modcol2a is modcol2b
-
- modcol1_back = pickletransport.p2_to_p1(modcol2a)
- assert modcol1_back
-
- def test_pickle_func(self, testdir, pickletransport):
- modcol1 = testdir.getmodulecol("def test_one(): pass")
- item = modcol1.collect_by_name("test_one")
- testdir.chdir()
- item2a = pickletransport.p1_to_p2(item)
- assert item is not item2a # of course
- assert item2a.name == item.name
- modback = pickletransport.p2_to_p1(item2a.parent)
- assert modback is modcol1
-
-
-class TestConfigPickling:
- def test_config_getstate_setstate(self, testdir):
- from py.impl.test.config import Config
- testdir.makepyfile(__init__="", conftest="x=1; y=2")
- hello = testdir.makepyfile(hello="")
- tmp = testdir.tmpdir
- testdir.chdir()
- config1 = testdir.parseconfig(hello)
- config2 = Config()
- config2.__setstate__(config1.__getstate__())
- assert config2.topdir == py.path.local()
- config2_relpaths = [py.path.local(x).relto(config2.topdir)
- for x in config2.args]
- config1_relpaths = [py.path.local(x).relto(config1.topdir)
- for x in config1.args]
-
- assert config2_relpaths == config1_relpaths
- for name, value in config1.option.__dict__.items():
- assert getattr(config2.option, name) == value
- assert config2.getvalue("x") == 1
-
- def test_config_pickling_customoption(self, testdir):
- testdir.makeconftest("""
- def pytest_addoption(parser):
- group = parser.getgroup("testing group")
- group.addoption('-G', '--glong', action="store", default=42,
- type="int", dest="gdest", help="g value.")
- """)
- config = testdir.parseconfig("-G", "11")
- assert config.option.gdest == 11
- repr = config.__getstate__()
-
- config = testdir.Config()
- py.test.raises(AttributeError, "config.option.gdest")
-
- config2 = testdir.Config()
- config2.__setstate__(repr)
- assert config2.option.gdest == 11
-
- def test_config_pickling_and_conftest_deprecated(self, testdir):
- tmp = testdir.tmpdir.ensure("w1", "w2", dir=1)
- tmp.ensure("__init__.py")
- tmp.join("conftest.py").write(py.code.Source("""
- def pytest_addoption(parser):
- group = parser.getgroup("testing group")
- group.addoption('-G', '--glong', action="store", default=42,
- type="int", dest="gdest", help="g value.")
- """))
- config = testdir.parseconfig(tmp, "-G", "11")
- assert config.option.gdest == 11
- repr = config.__getstate__()
-
- config = testdir.Config()
- py.test.raises(AttributeError, "config.option.gdest")
-
- config2 = testdir.Config()
- config2.__setstate__(repr)
- assert config2.option.gdest == 11
-
- option = config2.addoptions("testing group",
- config2.Option('-G', '--glong', action="store", default=42,
- type="int", dest="gdest", help="g value."))
- assert option.gdest == 11
-
- def test_config_picklability(self, testdir):
- config = testdir.parseconfig()
- s = pickle.dumps(config)
- newconfig = pickle.loads(s)
- assert hasattr(newconfig, "topdir")
- assert newconfig.topdir == py.path.local()
-
- def test_collector_implicit_config_pickling(self, testdir):
- tmpdir = testdir.tmpdir
- testdir.chdir()
- testdir.makepyfile(hello="def test_x(): pass")
- config = testdir.parseconfig(tmpdir)
- col = config.getnode(config.topdir)
- io = py.io.BytesIO()
- pickler = pickle.Pickler(io)
- pickler.dump(col)
- io.seek(0)
- unpickler = pickle.Unpickler(io)
- col2 = unpickler.load()
- assert col2.name == col.name
- assert col2.listnames() == col.listnames()
-
- def test_config_and_collector_pickling(self, testdir):
- tmpdir = testdir.tmpdir
- dir1 = tmpdir.ensure("somedir", dir=1)
- config = testdir.parseconfig()
- col = config.getnode(config.topdir)
- col1 = col.join(dir1.basename)
- assert col1.parent is col
- io = py.io.BytesIO()
- pickler = pickle.Pickler(io)
- pickler.dump(col)
- pickler.dump(col1)
- pickler.dump(col)
- io.seek(0)
- unpickler = pickle.Unpickler(io)
- topdir = tmpdir.ensure("newtopdir", dir=1)
- topdir.ensure("somedir", dir=1)
- old = topdir.chdir()
- try:
- newcol = unpickler.load()
- newcol2 = unpickler.load()
- newcol3 = unpickler.load()
- assert newcol2.config is newcol.config
- assert newcol2.parent == newcol
- assert newcol2.config.topdir.realpath() == topdir.realpath()
- assert newcol.fspath.realpath() == topdir.realpath()
- assert newcol2.fspath.basename == dir1.basename
- assert newcol2.fspath.relto(newcol2.config.topdir)
- finally:
- old.chdir()
-
-def test_config__setstate__wired_correctly_in_childprocess(testdir):
- execnet = py.test.importorskip("execnet")
- from py.impl.test.dist.mypickle import PickleChannel
- gw = execnet.makegateway()
- channel = gw.remote_exec("""
- import py
- from py.impl.test.dist.mypickle import PickleChannel
- channel = PickleChannel(channel)
- config = channel.receive()
- assert py.test.config == config
- """)
- channel = PickleChannel(channel)
- config = testdir.parseconfig()
- channel.send(config)
- channel.waitclose() # this will potentially raise
- gw.exit()
--- a/py/plugin/pytest_runner.py
+++ b/py/plugin/pytest_runner.py
@@ -8,12 +8,6 @@ from py.impl.test.outcome import Skipped
#
# pytest plugin hooks
-def pytest_addoption(parser):
- group = parser.getgroup("general")
- group.addoption('--boxed',
- action="store_true", dest="boxed", default=False,
- help="box each test run in a separate process (unix)")
-
# XXX move to pytest_sessionstart and fix py.test owns tests
def pytest_configure(config):
config._setupstate = SetupState()
@@ -36,12 +30,7 @@ def pytest_make_collect_report(collector
return CollectReport(collector, result, excinfo)
def pytest_runtest_protocol(item):
- if item.config.getvalue("boxed"):
- reports = forked_run_report(item)
- for rep in reports:
- item.ihook.pytest_runtest_logreport(report=rep)
- else:
- runtestprotocol(item)
+ runtestprotocol(item)
return True
def runtestprotocol(item, log=True):
@@ -116,38 +105,6 @@ class CallInfo:
status = "result: %r" % (self.result,)
return "<CallInfo when=%r %s>" % (self.when, status)
-def forked_run_report(item):
- # for now, we run setup/teardown in the subprocess
- # XXX optionally allow sharing of setup/teardown
- EXITSTATUS_TESTEXIT = 4
- from py.impl.test.dist.mypickle import ImmutablePickler
- ipickle = ImmutablePickler(uneven=0)
- ipickle.selfmemoize(item.config)
- # XXX workaround the issue that 2.6 cannot pickle
- # instances of classes defined in global conftest.py files
- ipickle.selfmemoize(item)
- def runforked():
- try:
- reports = runtestprotocol(item, log=False)
- except KeyboardInterrupt:
- py.std.os._exit(EXITSTATUS_TESTEXIT)
- return ipickle.dumps(reports)
-
- ff = py.process.ForkedFunc(runforked)
- result = ff.waitfinish()
- if result.retval is not None:
- return ipickle.loads(result.retval)
- else:
- if result.exitstatus == EXITSTATUS_TESTEXIT:
- py.test.exit("forked test item %s raised Exit" %(item,))
- return [report_process_crash(item, result)]
-
-def report_process_crash(item, result):
- path, lineno = item._getfslineno()
- info = "%s:%s: running the test CRASHED with signal %d" %(
- path, lineno, result.signal)
- return ItemTestReport(item, excinfo=info, when="???")
-
class BaseReport(object):
def __repr__(self):
l = ["%s=%s" %(key, value)
--- a/testing/plugin/test_pytest_genscript.py
+++ b/testing/plugin/test_pytest_genscript.py
@@ -25,13 +25,14 @@ def test_gen(testdir, anypython, standal
"*imported from*mypytest"
])
-def test_rundist(testdir, standalone):
+def test_rundist(testdir, pytestconfig, standalone):
+ pytestconfig.pluginmanager.skipifmissing("xdist")
testdir.makepyfile("""
def test_one():
pass
""")
result = standalone.run(sys.executable, testdir, '-n', '3')
- assert result.ret == 2
- result.stderr.fnmatch_lines([
- "*no such option*"
+ assert result.ret == 0
+ result.stdout.fnmatch_lines([
+ "*1 passed*",
])
--- a/testing/plugin/test_pytest_runner.py
+++ b/testing/plugin/test_pytest_runner.py
@@ -221,7 +221,9 @@ class TestExecutionForked(BaseFunctional
pytestmark = py.test.mark.skipif("not hasattr(os, 'fork')")
def getrunner(self):
- return runner.forked_run_report
+ # XXX re-arrange this test to live in pytest-xdist
+ xplugin = py.test.importorskip("xdist.plugin")
+ return xplugin.forked_run_report
def test_suicide(self, testdir):
reports = testdir.runitem("""
@@ -262,19 +264,6 @@ class TestCollectionReports:
assert not rep.passed
assert rep.skipped
- at py.test.mark.skipif("not hasattr(os, 'fork')")
-def test_functional_boxed(testdir):
- p1 = testdir.makepyfile("""
- import os
- def test_function():
- os.kill(os.getpid(), 15)
- """)
- result = testdir.runpytest(p1, "--boxed")
- assert result.stdout.fnmatch_lines([
- "*CRASHED*",
- "*1 failed*"
- ])
-
def test_callinfo():
ci = runner.CallInfo(lambda: 0, '123')
assert ci.when == "123"
--- a/py/impl/test/dist/nodemanage.py
+++ /dev/null
@@ -1,81 +0,0 @@
-import py
-import sys, os
-from py.impl.test.dist.txnode import TXNode
-from py.impl.test.dist.gwmanage import GatewayManager
-
-
-class NodeManager(object):
- def __init__(self, config, specs=None):
- self.config = config
- if specs is None:
- specs = self.config.getxspecs()
- self.roots = self.config.getrsyncdirs()
- self.gwmanager = GatewayManager(specs, config.hook)
- self.nodes = []
- self._nodesready = py.std.threading.Event()
-
- def trace(self, msg):
- self.config.hook.pytest_trace(category="nodemanage", msg=msg)
-
- def config_getignores(self):
- return self.config.getconftest_pathlist("rsyncignore")
-
- def rsync_roots(self):
- """ make sure that all remote gateways
- have the same set of roots in their
- current directory.
- """
- self.makegateways()
- options = {
- 'ignores': self.config_getignores(),
- 'verbose': self.config.option.verbose,
- }
- if self.roots:
- # send each rsync root
- for root in self.roots:
- self.gwmanager.rsync(root, **options)
- else:
- XXX # do we want to care for situations without explicit rsyncdirs?
- # we transfer our topdir as the root
- self.gwmanager.rsync(self.config.topdir, **options)
- # and cd into it
- self.gwmanager.multi_chdir(self.config.topdir.basename, inplacelocal=False)
-
- def makegateways(self):
- # we change to the topdir sot that
- # PopenGateways will have their cwd
- # such that unpickling configs will
- # pick it up as the right topdir
- # (for other gateways this chdir is irrelevant)
- self.trace("making gateways")
- old = self.config.topdir.chdir()
- try:
- self.gwmanager.makegateways()
- finally:
- old.chdir()
-
- def setup_nodes(self, putevent):
- self.rsync_roots()
- self.trace("setting up nodes")
- for gateway in self.gwmanager.group:
- node = TXNode(gateway, self.config, putevent, slaveready=self._slaveready)
- gateway.node = node # to keep node alive
- self.trace("started node %r" % node)
-
- def _slaveready(self, node):
- #assert node.gateway == node.gateway
- #assert node.gateway.node == node
- self.nodes.append(node)
- self.trace("%s slave node ready %r" % (node.gateway.id, node))
- if len(self.nodes) == len(list(self.gwmanager.group)):
- self._nodesready.set()
-
- def wait_nodesready(self, timeout=None):
- self._nodesready.wait(timeout)
- if not self._nodesready.isSet():
- raise IOError("nodes did not get ready for %r secs" % timeout)
-
- def teardown_nodes(self):
- # XXX do teardown nodes?
- self.gwmanager.exit()
-
--- a/doc/test/plugin/restdoc.txt
+++ b/doc/test/plugin/restdoc.txt
@@ -1,8 +1,7 @@
-
-pytest_restdoc plugin
-=====================
perform ReST syntax, local and remote reference tests on .rst/.txt files.
+=========================================================================
+
.. contents::
:local:
--- a/py/impl/test/looponfail/__init__.py
+++ /dev/null
@@ -1,1 +0,0 @@
-#
--- a/doc/test/plugin/unittest.txt
+++ b/doc/test/plugin/unittest.txt
@@ -1,8 +1,7 @@
-
-pytest_unittest plugin
-======================
automatically discover and run traditional "unittest.py" style tests.
+=====================================================================
+
.. contents::
:local:
--- a/py/impl/test/dist/txnode.py
+++ /dev/null
@@ -1,164 +0,0 @@
-"""
- Manage setup, running and local representation of remote nodes/processes.
-"""
-import py
-from py.impl.test.dist.mypickle import PickleChannel
-from py.impl.test import outcome
-
-class TXNode(object):
- """ Represents a Test Execution environment in the controlling process.
- - sets up a slave node through an execnet gateway
- - manages sending of test-items and receival of results and events
- - creates events when the remote side crashes
- """
- ENDMARK = -1
-
- def __init__(self, gateway, config, putevent, slaveready=None):
- self.config = config
- self.putevent = putevent
- self.gateway = gateway
- self.channel = install_slave(gateway, config)
- self._sendslaveready = slaveready
- self.channel.setcallback(self.callback, endmarker=self.ENDMARK)
- self._down = False
-
- def __repr__(self):
- id = self.gateway.id
- status = self._down and 'true' or 'false'
- return "<TXNode %r down=%s>" %(id, status)
-
- def notify(self, eventname, *args, **kwargs):
- assert not args
- self.putevent((eventname, args, kwargs))
-
- def callback(self, eventcall):
- """ this gets called for each object we receive from
- the other side and if the channel closes.
-
- Note that channel callbacks run in the receiver
- thread of execnet gateways - we need to
- avoid raising exceptions or doing heavy work.
- """
- try:
- if eventcall == self.ENDMARK:
- err = self.channel._getremoteerror()
- if not self._down:
- if not err or isinstance(err, EOFError):
- err = "Not properly terminated"
- self.notify("pytest_testnodedown", node=self, error=err)
- self._down = True
- return
- eventname, args, kwargs = eventcall
- if eventname == "slaveready":
- if self._sendslaveready:
- self._sendslaveready(self)
- self.notify("pytest_testnodeready", node=self)
- elif eventname == "slavefinished":
- self._down = True
- self.notify("pytest_testnodedown", error=None, node=self)
- elif eventname in ("pytest_runtest_logreport",
- "pytest__teardown_final_logerror"):
- kwargs['report'].node = self
- self.notify(eventname, **kwargs)
- else:
- self.notify(eventname, **kwargs)
- except KeyboardInterrupt:
- # should not land in receiver-thread
- raise
- except:
- excinfo = py.code.ExceptionInfo()
- py.builtin.print_("!" * 20, excinfo)
- self.config.pluginmanager.notify_exception(excinfo)
-
- def send(self, item):
- assert item is not None
- self.channel.send(item)
-
- def sendlist(self, itemlist):
- self.channel.send(itemlist)
-
- def shutdown(self):
- self.channel.send(None)
-
-# setting up slave code
-def install_slave(gateway, config):
- channel = gateway.remote_exec(source="""
- import os, sys
- sys.path.insert(0, os.getcwd())
- from py.impl.test.dist.mypickle import PickleChannel
- from py.impl.test.dist.txnode import SlaveNode
- channel.send("basicimport")
- channel = PickleChannel(channel)
- slavenode = SlaveNode(channel)
- slavenode.run()
- """)
- channel.receive()
- channel = PickleChannel(channel)
- basetemp = None
- if gateway.spec.popen:
- popenbase = config.ensuretemp("popen")
- basetemp = py.path.local.make_numbered_dir(prefix="slave-",
- keep=0, rootdir=popenbase)
- basetemp = str(basetemp)
- channel.send((config, basetemp, gateway.id))
- return channel
-
-class SlaveNode(object):
- def __init__(self, channel):
- self.channel = channel
-
- def __repr__(self):
- return "<%s channel=%s>" %(self.__class__.__name__, self.channel)
-
- def sendevent(self, eventname, *args, **kwargs):
- self.channel.send((eventname, args, kwargs))
-
- def pytest_runtest_logreport(self, report):
- self.sendevent("pytest_runtest_logreport", report=report)
-
- def pytest__teardown_final_logerror(self, report):
- self.sendevent("pytest__teardown_final_logerror", report=report)
-
- def run(self):
- channel = self.channel
- self.config, basetemp, self.nodeid = channel.receive()
- if basetemp:
- self.config.basetemp = py.path.local(basetemp)
- self.config.pluginmanager.do_configure(self.config)
- self.config.pluginmanager.register(self)
- self.runner = self.config.pluginmanager.getplugin("pytest_runner")
- self.sendevent("slaveready")
- try:
- self.config.hook.pytest_sessionstart(session=self)
- while 1:
- task = channel.receive()
- if task is None:
- break
- if isinstance(task, list):
- for item in task:
- self.run_single(item=item)
- else:
- self.run_single(item=task)
- self.config.hook.pytest_sessionfinish(
- session=self,
- exitstatus=outcome.EXIT_OK)
- except KeyboardInterrupt:
- raise
- except:
- er = py.code.ExceptionInfo().getrepr(funcargs=True, showlocals=True)
- self.sendevent("pytest_internalerror", excrepr=er)
- raise
- else:
- self.sendevent("slavefinished")
-
- def run_single(self, item):
- call = self.runner.CallInfo(item._checkcollectable, when='setup')
- if call.excinfo:
- # likely it is not collectable here because of
- # platform/import-dependency induced skips
- # we fake a setup-error report with the obtained exception
- # and do not care about capturing or non-runner hooks
- rep = self.runner.pytest_runtest_makereport(item=item, call=call)
- self.pytest_runtest_logreport(rep)
- return
- item.config.hook.pytest_runtest_protocol(item=item)
--- a/doc/test/plugin/pastebin.txt
+++ b/doc/test/plugin/pastebin.txt
@@ -1,8 +1,7 @@
-
-pytest_pastebin plugin
-======================
submit failure or test session information to a pastebin service.
+=================================================================
+
.. contents::
:local:
--- a/py/impl/test/dist/dsession.py
+++ /dev/null
@@ -1,280 +0,0 @@
-import py
-from py.impl.test.session import Session
-from py.impl.test import outcome
-from py.impl.test.dist.nodemanage import NodeManager
-queue = py.builtin._tryimport('queue', 'Queue')
-
-debug_file = None # open('/tmp/loop.log', 'w')
-def debug(*args):
- if debug_file is not None:
- s = " ".join(map(str, args))
- debug_file.write(s+"\n")
- debug_file.flush()
-
-class LoopState(object):
- def __init__(self, dsession, colitems):
- self.dsession = dsession
- self.colitems = colitems
- self.exitstatus = None
- # loopstate.dowork is False after reschedule events
- # because otherwise we might very busily loop
- # waiting for a host to become ready.
- self.dowork = True
- self.shuttingdown = False
- self.testsfailed = False
-
- def __repr__(self):
- return "<LoopState exitstatus=%r shuttingdown=%r len(colitems)=%d>" % (
- self.exitstatus, self.shuttingdown, len(self.colitems))
-
- def pytest_runtest_logreport(self, report):
- if report.item in self.dsession.item2nodes:
- if report.when != "teardown": # otherwise we already managed it
- self.dsession.removeitem(report.item, report.node)
- if report.failed:
- self.testsfailed = True
-
- def pytest_collectreport(self, report):
- if report.passed:
- self.colitems.extend(report.result)
-
- def pytest_testnodeready(self, node):
- self.dsession.addnode(node)
-
- def pytest_testnodedown(self, node, error=None):
- pending = self.dsession.removenode(node)
- if pending:
- if error:
- crashitem = pending[0]
- debug("determined crashitem", crashitem)
- self.dsession.handle_crashitem(crashitem, node)
- # XXX recovery handling for "each"?
- # currently pending items are not retried
- if self.dsession.config.option.dist == "load":
- self.colitems.extend(pending[1:])
-
- def pytest_rescheduleitems(self, items):
- self.colitems.extend(items)
- self.dowork = False # avoid busywait
-
-class DSession(Session):
- """
- Session drives the collection and running of tests
- and generates test events for reporters.
- """
- MAXITEMSPERHOST = 15
-
- def __init__(self, config):
- self.queue = queue.Queue()
- self.node2pending = {}
- self.item2nodes = {}
- super(DSession, self).__init__(config=config)
-
- #def pytest_configure(self, __multicall__, config):
- # __multicall__.execute()
- # try:
- # config.getxspecs()
- # except config.Error:
- # print
- # raise config.Error("dist mode %r needs test execution environments, "
- # "none found." %(config.option.dist))
-
- def main(self, colitems):
- self.sessionstarts()
- self.setup()
- exitstatus = self.loop(colitems)
- self.teardown()
- self.sessionfinishes(exitstatus=exitstatus)
- return exitstatus
-
- def loop_once(self, loopstate):
- if loopstate.shuttingdown:
- return self.loop_once_shutdown(loopstate)
- colitems = loopstate.colitems
- if loopstate.dowork and colitems:
- self.triggertesting(loopstate.colitems)
- colitems[:] = []
- # we use a timeout here so that control-C gets through
- while 1:
- try:
- eventcall = self.queue.get(timeout=2.0)
- break
- except queue.Empty:
- continue
- loopstate.dowork = True
-
- callname, args, kwargs = eventcall
- if callname is not None:
- call = getattr(self.config.hook, callname)
- assert not args
- call(**kwargs)
-
- # termination conditions
- if ((loopstate.testsfailed and self.config.option.exitfirst) or
- (not self.item2nodes and not colitems and not self.queue.qsize())):
- self.triggershutdown()
- loopstate.shuttingdown = True
- elif not self.node2pending:
- loopstate.exitstatus = outcome.EXIT_NOHOSTS
-
- def loop_once_shutdown(self, loopstate):
- # once we are in shutdown mode we dont send
- # events other than HostDown upstream
- eventname, args, kwargs = self.queue.get()
- if eventname == "pytest_testnodedown":
- self.config.hook.pytest_testnodedown(**kwargs)
- self.removenode(kwargs['node'])
- elif eventname == "pytest_runtest_logreport":
- # might be some teardown report
- self.config.hook.pytest_runtest_logreport(**kwargs)
- elif eventname == "pytest_internalerror":
- self.config.hook.pytest_internalerror(**kwargs)
- loopstate.exitstatus = outcome.EXIT_INTERNALERROR
- elif eventname == "pytest__teardown_final_logerror":
- self.config.hook.pytest__teardown_final_logerror(**kwargs)
- loopstate.exitstatus = outcome.EXIT_TESTSFAILED
- if not self.node2pending:
- # finished
- if loopstate.testsfailed:
- loopstate.exitstatus = outcome.EXIT_TESTSFAILED
- else:
- loopstate.exitstatus = outcome.EXIT_OK
- #self.config.pluginmanager.unregister(loopstate)
-
- def _initloopstate(self, colitems):
- loopstate = LoopState(self, colitems)
- self.config.pluginmanager.register(loopstate)
- return loopstate
-
- def loop(self, colitems):
- try:
- loopstate = self._initloopstate(colitems)
- loopstate.dowork = False # first receive at least one HostUp events
- while 1:
- self.loop_once(loopstate)
- if loopstate.exitstatus is not None:
- exitstatus = loopstate.exitstatus
- break
- except KeyboardInterrupt:
- excinfo = py.code.ExceptionInfo()
- self.config.hook.pytest_keyboard_interrupt(excinfo=excinfo)
- exitstatus = outcome.EXIT_INTERRUPTED
- except:
- self.config.pluginmanager.notify_exception()
- exitstatus = outcome.EXIT_INTERNALERROR
- self.config.pluginmanager.unregister(loopstate)
- if exitstatus == 0 and self._testsfailed:
- exitstatus = outcome.EXIT_TESTSFAILED
- return exitstatus
-
- def triggershutdown(self):
- for node in self.node2pending:
- node.shutdown()
-
- def addnode(self, node):
- assert node not in self.node2pending
- self.node2pending[node] = []
-
- def removenode(self, node):
- try:
- pending = self.node2pending.pop(node)
- except KeyError:
- # this happens if we didn't receive a testnodeready event yet
- return []
- for item in pending:
- l = self.item2nodes[item]
- l.remove(node)
- if not l:
- del self.item2nodes[item]
- return pending
-
- def triggertesting(self, colitems):
- colitems = self.filteritems(colitems)
- senditems = []
- for next in colitems:
- if isinstance(next, py.test.collect.Item):
- senditems.append(next)
- else:
- self.config.hook.pytest_collectstart(collector=next)
- colrep = self.config.hook.pytest_make_collect_report(collector=next)
- self.queueevent("pytest_collectreport", report=colrep)
- if self.config.option.dist == "each":
- self.senditems_each(senditems)
- else:
- # XXX assert self.config.option.dist == "load"
- self.senditems_load(senditems)
-
- def queueevent(self, eventname, **kwargs):
- self.queue.put((eventname, (), kwargs))
-
- def senditems_each(self, tosend):
- if not tosend:
- return
- room = self.MAXITEMSPERHOST
- for node, pending in self.node2pending.items():
- room = min(self.MAXITEMSPERHOST - len(pending), room)
- sending = tosend[:room]
- if sending:
- for node, pending in self.node2pending.items():
- node.sendlist(sending)
- pending.extend(sending)
- for item in sending:
- nodes = self.item2nodes.setdefault(item, [])
- assert node not in nodes
- nodes.append(node)
- item.ihook.pytest_itemstart(item=item, node=node)
- tosend[:] = tosend[room:] # update inplace
- if tosend:
- # we have some left, give it to the main loop
- self.queueevent("pytest_rescheduleitems", items=tosend)
-
- def senditems_load(self, tosend):
- if not tosend:
- return
- for node, pending in self.node2pending.items():
- room = self.MAXITEMSPERHOST - len(pending)
- if room > 0:
- sending = tosend[:room]
- node.sendlist(sending)
- for item in sending:
- #assert item not in self.item2node, (
- # "sending same item %r to multiple "
- # "not implemented" %(item,))
- self.item2nodes.setdefault(item, []).append(node)
- item.ihook.pytest_itemstart(item=item, node=node)
- pending.extend(sending)
- tosend[:] = tosend[room:] # update inplace
- if not tosend:
- break
- if tosend:
- # we have some left, give it to the main loop
- self.queueevent("pytest_rescheduleitems", items=tosend)
-
- def removeitem(self, item, node):
- if item not in self.item2nodes:
- raise AssertionError(item, self.item2nodes)
- nodes = self.item2nodes[item]
- if node in nodes: # the node might have gone down already
- nodes.remove(node)
- if not nodes:
- del self.item2nodes[item]
- pending = self.node2pending[node]
- pending.remove(item)
-
- def handle_crashitem(self, item, node):
- runner = item.config.pluginmanager.getplugin("runner")
- info = "!!! Node %r crashed during running of test %r" %(node, item)
- rep = runner.ItemTestReport(item=item, excinfo=info, when="???")
- rep.node = node
- item.ihook.pytest_runtest_logreport(report=rep)
-
- def setup(self):
- """ setup any neccessary resources ahead of the test run. """
- self.nodemanager = NodeManager(self.config)
- self.nodemanager.setup_nodes(putevent=self.queue.put)
- if self.config.option.dist == "each":
- self.nodemanager.wait_nodesready(5.0)
-
- def teardown(self):
- """ teardown any resources after a test run. """
- self.nodemanager.teardown_nodes()
--- a/doc/test/plugin/tmpdir.txt
+++ b/doc/test/plugin/tmpdir.txt
@@ -1,8 +1,7 @@
-
-pytest_tmpdir plugin
-====================
provide temporary directories to test functions.
+================================================
+
.. contents::
:local:
--- a/testing/pytest/dist/test_dsession.py
+++ /dev/null
@@ -1,505 +0,0 @@
-from py.impl.test.dist.dsession import DSession
-from py.impl.test import outcome
-import py
-import execnet
-
-XSpec = execnet.XSpec
-
-def run(item, node, excinfo=None):
- runner = item.config.pluginmanager.getplugin("runner")
- rep = runner.ItemTestReport(item=item,
- excinfo=excinfo, when="call")
- rep.node = node
- return rep
-
-class MockNode:
- def __init__(self):
- self.sent = []
-
- def sendlist(self, items):
- self.sent.append(items)
-
- def shutdown(self):
- self._shutdown=True
-
-def dumpqueue(queue):
- while queue.qsize():
- print(queue.get())
-
-class TestDSession:
- def test_add_remove_node(self, testdir):
- item = testdir.getitem("def test_func(): pass")
- node = MockNode()
- rep = run(item, node)
- session = DSession(item.config)
- assert not session.node2pending
- session.addnode(node)
- assert len(session.node2pending) == 1
- session.senditems_load([item])
- pending = session.removenode(node)
- assert pending == [item]
- assert item not in session.item2nodes
- l = session.removenode(node)
- assert not l
-
- def test_senditems_each_and_receive_with_two_nodes(self, testdir):
- item = testdir.getitem("def test_func(): pass")
- node1 = MockNode()
- node2 = MockNode()
- session = DSession(item.config)
- session.addnode(node1)
- session.addnode(node2)
- session.senditems_each([item])
- assert session.node2pending[node1] == [item]
- assert session.node2pending[node2] == [item]
- assert node1 in session.item2nodes[item]
- assert node2 in session.item2nodes[item]
- session.removeitem(item, node1)
- assert session.item2nodes[item] == [node2]
- session.removeitem(item, node2)
- assert not session.node2pending[node1]
- assert not session.item2nodes
-
- def test_senditems_load_and_receive_one_node(self, testdir):
- item = testdir.getitem("def test_func(): pass")
- node = MockNode()
- rep = run(item, node)
- session = DSession(item.config)
- session.addnode(node)
- session.senditems_load([item])
- assert session.node2pending[node] == [item]
- assert session.item2nodes[item] == [node]
- session.removeitem(item, node)
- assert not session.node2pending[node]
- assert not session.item2nodes
-
- def test_triggertesting_collect(self, testdir):
- modcol = testdir.getmodulecol("""
- def test_func():
- pass
- """)
- session = DSession(modcol.config)
- session.triggertesting([modcol])
- name, args, kwargs = session.queue.get(block=False)
- assert name == 'pytest_collectreport'
- report = kwargs['report']
- assert len(report.result) == 1
-
- def test_triggertesting_item(self, testdir):
- item = testdir.getitem("def test_func(): pass")
- session = DSession(item.config)
- node1 = MockNode()
- node2 = MockNode()
- session.addnode(node1)
- session.addnode(node2)
- session.triggertesting([item] * (session.MAXITEMSPERHOST*2 + 1))
- sent1 = node1.sent[0]
- sent2 = node2.sent[0]
- assert sent1 == [item] * session.MAXITEMSPERHOST
- assert sent2 == [item] * session.MAXITEMSPERHOST
- assert session.node2pending[node1] == sent1
- assert session.node2pending[node2] == sent2
- name, args, kwargs = session.queue.get(block=False)
- assert name == "pytest_rescheduleitems"
- assert kwargs['items'] == [item]
-
- def test_keyboardinterrupt(self, testdir):
- item = testdir.getitem("def test_func(): pass")
- session = DSession(item.config)
- def raise_(timeout=None): raise KeyboardInterrupt()
- session.queue.get = raise_
- exitstatus = session.loop([])
- assert exitstatus == outcome.EXIT_INTERRUPTED
-
- def test_internalerror(self, testdir):
- item = testdir.getitem("def test_func(): pass")
- session = DSession(item.config)
- def raise_(): raise ValueError()
- session.queue.get = raise_
- exitstatus = session.loop([])
- assert exitstatus == outcome.EXIT_INTERNALERROR
-
- def test_rescheduleevent(self, testdir):
- item = testdir.getitem("def test_func(): pass")
- session = DSession(item.config)
- node = MockNode()
- session.addnode(node)
- loopstate = session._initloopstate([])
- session.queueevent("pytest_rescheduleitems", items=[item])
- session.loop_once(loopstate)
- # check that RescheduleEvents are not immediately
- # rescheduled if there are no nodes
- assert loopstate.dowork == False
- session.queueevent(None)
- session.loop_once(loopstate)
- session.queueevent(None)
- session.loop_once(loopstate)
- assert node.sent == [[item]]
- session.queueevent("pytest_runtest_logreport", report=run(item, node))
- session.loop_once(loopstate)
- assert loopstate.shuttingdown
- assert not loopstate.testsfailed
-
- def test_no_node_remaining_for_tests(self, testdir):
- item = testdir.getitem("def test_func(): pass")
- # setup a session with one node
- session = DSession(item.config)
- node = MockNode()
- session.addnode(node)
-
- # setup a HostDown event
- session.queueevent("pytest_testnodedown", node=node, error=None)
-
- loopstate = session._initloopstate([item])
- loopstate.dowork = False
- session.loop_once(loopstate)
- dumpqueue(session.queue)
- assert loopstate.exitstatus == outcome.EXIT_NOHOSTS
-
- def test_removeitem_from_failing_teardown(self, testdir):
- # teardown reports only come in when they signal a failure
- # internal session-management should basically ignore them
- # XXX probably it'S best to invent a new error hook for
- # teardown/setup related failures
- modcol = testdir.getmodulecol("""
- def test_one():
- pass
- def teardown_function(function):
- assert 0
- """)
- item1, = modcol.collect()
-
- # setup a session with two nodes
- session = DSession(item1.config)
- node1, node2 = MockNode(), MockNode()
- session.addnode(node1)
- session.addnode(node2)
-
- # have one test pending for a node that goes down
- session.senditems_each([item1])
- nodes = session.item2nodes[item1]
- class rep:
- failed = True
- item = item1
- node = nodes[0]
- when = "call"
- session.queueevent("pytest_runtest_logreport", report=rep)
- reprec = testdir.getreportrecorder(session)
- print(session.item2nodes)
- loopstate = session._initloopstate([])
- assert len(session.item2nodes[item1]) == 2
- session.loop_once(loopstate)
- assert len(session.item2nodes[item1]) == 1
- rep.when = "teardown"
- session.queueevent("pytest_runtest_logreport", report=rep)
- session.loop_once(loopstate)
- assert len(session.item2nodes[item1]) == 1
-
- def test_testnodedown_causes_reschedule_pending(self, testdir):
- modcol = testdir.getmodulecol("""
- def test_crash():
- assert 0
- def test_fail():
- x
- """)
- item1, item2 = modcol.collect()
-
- # setup a session with two nodes
- session = DSession(item1.config)
- node1, node2 = MockNode(), MockNode()
- session.addnode(node1)
- session.addnode(node2)
-
- # have one test pending for a node that goes down
- session.senditems_load([item1, item2])
- node = session.item2nodes[item1] [0]
- item1.config.option.dist = "load"
- session.queueevent("pytest_testnodedown", node=node, error="xyz")
- reprec = testdir.getreportrecorder(session)
- print(session.item2nodes)
- loopstate = session._initloopstate([])
- session.loop_once(loopstate)
-
- assert loopstate.colitems == [item2] # do not reschedule crash item
- rep = reprec.matchreport(names="pytest_runtest_logreport")
- assert rep.failed
- assert rep.item == item1
- assert str(rep.longrepr).find("crashed") != -1
- #assert str(testrep.longrepr).find(node.gateway.spec) != -1
-
- def test_testnodeready_adds_to_available(self, testdir):
- item = testdir.getitem("def test_func(): pass")
- # setup a session with two nodes
- session = DSession(item.config)
- node1 = MockNode()
- session.queueevent("pytest_testnodeready", node=node1)
- loopstate = session._initloopstate([item])
- loopstate.dowork = False
- assert len(session.node2pending) == 0
- session.loop_once(loopstate)
- assert len(session.node2pending) == 1
-
- def runthrough(self, item, excinfo=None):
- session = DSession(item.config)
- node = MockNode()
- session.addnode(node)
- loopstate = session._initloopstate([item])
-
- session.queueevent(None)
- session.loop_once(loopstate)
-
- assert node.sent == [[item]]
- ev = run(item, node, excinfo=excinfo)
- session.queueevent("pytest_runtest_logreport", report=ev)
- session.loop_once(loopstate)
- assert loopstate.shuttingdown
- session.queueevent("pytest_testnodedown", node=node, error=None)
- session.loop_once(loopstate)
- dumpqueue(session.queue)
- return session, loopstate.exitstatus
-
- def test_exit_completed_tests_ok(self, testdir):
- item = testdir.getitem("def test_func(): pass")
- session, exitstatus = self.runthrough(item)
- assert exitstatus == outcome.EXIT_OK
-
- def test_exit_completed_tests_fail(self, testdir):
- item = testdir.getitem("def test_func(): 0/0")
- session, exitstatus = self.runthrough(item, excinfo="fail")
- assert exitstatus == outcome.EXIT_TESTSFAILED
-
- def test_exit_on_first_failing(self, testdir):
- modcol = testdir.getmodulecol("""
- def test_fail():
- assert 0
- def test_pass():
- pass
- """)
- modcol.config.option.exitfirst = True
- session = DSession(modcol.config)
- node = MockNode()
- session.addnode(node)
- items = modcol.config.hook.pytest_make_collect_report(collector=modcol).result
-
- # trigger testing - this sends tests to the node
- session.triggertesting(items)
-
- # run tests ourselves and produce reports
- ev1 = run(items[0], node, "fail")
- ev2 = run(items[1], node, None)
- session.queueevent("pytest_runtest_logreport", report=ev1) # a failing one
- session.queueevent("pytest_runtest_logreport", report=ev2)
- # now call the loop
- loopstate = session._initloopstate(items)
- session.loop_once(loopstate)
- assert loopstate.testsfailed
- assert loopstate.shuttingdown
-
- def test_shuttingdown_filters(self, testdir):
- item = testdir.getitem("def test_func(): pass")
- session = DSession(item.config)
- node = MockNode()
- session.addnode(node)
- loopstate = session._initloopstate([])
- loopstate.shuttingdown = True
- reprec = testdir.getreportrecorder(session)
- session.queueevent("pytest_runtest_logreport", report=run(item, node))
- session.loop_once(loopstate)
- assert not reprec.getcalls("pytest_testnodedown")
- session.queueevent("pytest_testnodedown", node=node, error=None)
- session.loop_once(loopstate)
- assert reprec.getcall('pytest_testnodedown').node == node
-
- def test_filteritems(self, testdir):
- modcol = testdir.getmodulecol("""
- def test_fail():
- assert 0
- def test_pass():
- pass
- """)
- session = DSession(modcol.config)
-
- modcol.config.option.keyword = "nothing"
- dsel = session.filteritems([modcol])
- assert dsel == [modcol]
- items = modcol.collect()
- hookrecorder = testdir.getreportrecorder(session).hookrecorder
- remaining = session.filteritems(items)
- assert remaining == []
-
- event = hookrecorder.getcalls("pytest_deselected")[-1]
- assert event.items == items
-
- modcol.config.option.keyword = "test_fail"
- remaining = session.filteritems(items)
- assert remaining == [items[0]]
-
- event = hookrecorder.getcalls("pytest_deselected")[-1]
- assert event.items == [items[1]]
-
- def test_testnodedown_shutdown_after_completion(self, testdir):
- item = testdir.getitem("def test_func(): pass")
- session = DSession(item.config)
-
- node = MockNode()
- session.addnode(node)
- session.senditems_load([item])
- session.queueevent("pytest_runtest_logreport", report=run(item, node))
- loopstate = session._initloopstate([])
- session.loop_once(loopstate)
- assert node._shutdown is True
- assert loopstate.exitstatus is None, "loop did not wait for testnodedown"
- assert loopstate.shuttingdown
- session.queueevent("pytest_testnodedown", node=node, error=None)
- session.loop_once(loopstate)
- assert loopstate.exitstatus == 0
-
- def test_nopending_but_collection_remains(self, testdir):
- modcol = testdir.getmodulecol("""
- def test_fail():
- assert 0
- def test_pass():
- pass
- """)
- session = DSession(modcol.config)
- node = MockNode()
- session.addnode(node)
-
- colreport = modcol.config.hook.pytest_make_collect_report(collector=modcol)
- item1, item2 = colreport.result
- session.senditems_load([item1])
- # node2pending will become empty when the loop sees the report
- rep = run(item1, node)
- session.queueevent("pytest_runtest_logreport", report=run(item1, node))
-
- # but we have a collection pending
- session.queueevent("pytest_collectreport", report=colreport)
-
- loopstate = session._initloopstate([])
- session.loop_once(loopstate)
- assert loopstate.exitstatus is None, "loop did not care for collection report"
- assert not loopstate.colitems
- session.loop_once(loopstate)
- assert loopstate.colitems == colreport.result
- assert loopstate.exitstatus is None, "loop did not care for colitems"
-
- def test_dist_some_tests(self, testdir):
- p1 = testdir.makepyfile(test_one="""
- def test_1():
- pass
- def test_x():
- import py
- py.test.skip("aaa")
- def test_fail():
- assert 0
- """)
- config = testdir.parseconfig('-d', p1, '--tx=popen')
- dsession = DSession(config)
- hookrecorder = testdir.getreportrecorder(config).hookrecorder
- dsession.main([config.getnode(p1)])
- rep = hookrecorder.popcall("pytest_runtest_logreport").report
- assert rep.passed
- rep = hookrecorder.popcall("pytest_runtest_logreport").report
- assert rep.skipped
- rep = hookrecorder.popcall("pytest_runtest_logreport").report
- assert rep.failed
- # see that the node is really down
- node = hookrecorder.popcall("pytest_testnodedown").node
- assert node.gateway.spec.popen
- #XXX eq.geteventargs("pytest_sessionfinish")
-
-def test_collected_function_causes_remote_skip(testdir):
- sub = testdir.mkpydir("testing")
- sub.join("test_module.py").write(py.code.Source("""
- import py
- path = py.path.local(%r)
- if path.check():
- path.remove()
- else:
- py.test.skip("remote skip")
- def test_func():
- pass
- def test_func2():
- pass
- """ % str(sub.ensure("somefile"))))
- result = testdir.runpytest('-v', '--dist=each', '--tx=popen')
- result.stdout.fnmatch_lines([
- "*2 skipped*"
- ])
-
-def test_teardownfails_one_function(testdir):
- p = testdir.makepyfile("""
- def test_func():
- pass
- def teardown_function(function):
- assert 0
- """)
- result = testdir.runpytest(p, '--dist=each', '--tx=popen')
- result.stdout.fnmatch_lines([
- "*def teardown_function(function):*",
- "*1 passed*1 error*"
- ])
-
- at py.test.mark.xfail
-def test_terminate_on_hangingnode(testdir):
- p = testdir.makeconftest("""
- def pytest__teardown_final(session):
- if session.nodeid == "my": # running on slave
- import time
- time.sleep(3)
- """)
- result = testdir.runpytest(p, '--dist=each', '--tx=popen//id=my')
- assert result.duration < 2.0
- result.stdout.fnmatch_lines([
- "*killed*my*",
- ])
-
-
-
-def test_session_hooks(testdir):
- testdir.makeconftest("""
- import sys
- def pytest_sessionstart(session):
- sys.pytestsessionhooks = session
- def pytest_sessionfinish(session):
- f = open(session.nodeid or "master", 'w')
- f.write("xy")
- f.close()
- # let's fail on the slave
- if session.nodeid:
- raise ValueError(42)
- """)
- p = testdir.makepyfile("""
- import sys
- def test_hello():
- assert hasattr(sys, 'pytestsessionhooks')
- """)
- result = testdir.runpytest(p, "--dist=each", "--tx=popen//id=my1")
- result.stdout.fnmatch_lines([
- "*ValueError*",
- "*1 passed*",
- ])
- assert result.ret
- d = result.parseoutcomes()
- assert d['passed'] == 1
- assert testdir.tmpdir.join("my1").check()
- assert testdir.tmpdir.join("master").check()
-
-def test_funcarg_teardown_failure(testdir):
- p = testdir.makepyfile("""
- def pytest_funcarg__myarg(request):
- def teardown(val):
- raise ValueError(val)
- return request.cached_setup(setup=lambda: 42, teardown=teardown,
- scope="module")
- def test_hello(myarg):
- pass
- """)
- result = testdir.runpytest(p, "-n1")
- assert result.ret
- result.stdout.fnmatch_lines([
- "*ValueError*42*",
- "*1 passed*1 error*",
- ])
-
-
--- /dev/null
+++ b/doc/test/dist.html
@@ -0,0 +1,18 @@
+<html>
+ <head>
+ <meta http-equiv="refresh" content=" 1 ; URL=plugin/xdist.html" />
+ </head>
+
+ <body>
+<script type="text/javascript">
+var gaJsHost = (("https:" == document.location.protocol) ? "https://ssl." : "http://www.");
+document.write(unescape("%3Cscript src='" + gaJsHost + "google-analytics.com/ga.js' type='text/javascript'%3E%3C/script%3E"));
+</script>
+<script type="text/javascript">
+try {
+var pageTracker = _gat._getTracker("UA-7597274-3");
+pageTracker._trackPageview();
+} catch(err) {}</script>
+</body>
+</html>
+
--- a/testing/pytest/dist/conftest.py
+++ /dev/null
@@ -1,4 +0,0 @@
-try:
- import execnet
-except ImportError:
- collect_ignore = ['.']
--- a/testing/pytest/dist/test_nodemanage.py
+++ /dev/null
@@ -1,127 +0,0 @@
-import py
-from py.impl.test.dist.nodemanage import NodeManager
-
-class pytest_funcarg__mysetup:
- def __init__(self, request):
- basetemp = request.config.mktemp(
- "mysetup-%s" % request.function.__name__,
- numbered=True)
- self.source = basetemp.mkdir("source")
- self.dest = basetemp.mkdir("dest")
- request.getfuncargvalue("_pytest")
-
-class TestNodeManager:
- @py.test.mark.xfail
- def test_rsync_roots_no_roots(self, testdir, mysetup):
- mysetup.source.ensure("dir1", "file1").write("hello")
- config = testdir.reparseconfig([source])
- nodemanager = NodeManager(config, ["popen//chdir=%s" % mysetup.dest])
- assert nodemanager.config.topdir == source == config.topdir
- nodemanager.rsync_roots()
- p, = nodemanager.gwmanager.multi_exec("import os ; channel.send(os.getcwd())").receive_each()
- p = py.path.local(p)
- py.builtin.print_("remote curdir", p)
- assert p == mysetup.dest.join(config.topdir.basename)
- assert p.join("dir1").check()
- assert p.join("dir1", "file1").check()
-
- def test_popen_nodes_are_ready(self, testdir):
- nodemanager = NodeManager(testdir.parseconfig(
- "--tx", "3*popen"))
-
- nodemanager.setup_nodes([].append)
- nodemanager.wait_nodesready(timeout=10.0)
-
- def test_popen_rsync_subdir(self, testdir, mysetup):
- source, dest = mysetup.source, mysetup.dest
- dir1 = mysetup.source.mkdir("dir1")
- dir2 = dir1.mkdir("dir2")
- dir2.ensure("hello")
- for rsyncroot in (dir1, source):
- dest.remove()
- nodemanager = NodeManager(testdir.parseconfig(
- "--tx", "popen//chdir=%s" % dest,
- "--rsyncdir", rsyncroot,
- source,
- ))
- assert nodemanager.config.topdir == source
- nodemanager.rsync_roots()
- if rsyncroot == source:
- dest = dest.join("source")
- assert dest.join("dir1").check()
- assert dest.join("dir1", "dir2").check()
- assert dest.join("dir1", "dir2", 'hello').check()
- nodemanager.gwmanager.exit()
-
- def test_init_rsync_roots(self, testdir, mysetup):
- source, dest = mysetup.source, mysetup.dest
- dir2 = source.ensure("dir1", "dir2", dir=1)
- source.ensure("dir1", "somefile", dir=1)
- dir2.ensure("hello")
- source.ensure("bogusdir", "file")
- source.join("conftest.py").write(py.code.Source("""
- rsyncdirs = ['dir1/dir2']
- """))
- session = testdir.reparseconfig([source]).initsession()
- nodemanager = NodeManager(session.config, ["popen//chdir=%s" % dest])
- nodemanager.rsync_roots()
- assert dest.join("dir2").check()
- assert not dest.join("dir1").check()
- assert not dest.join("bogus").check()
-
- def test_rsyncignore(self, testdir, mysetup):
- source, dest = mysetup.source, mysetup.dest
- dir2 = source.ensure("dir1", "dir2", dir=1)
- dir5 = source.ensure("dir5", "dir6", "bogus")
- dirf = source.ensure("dir5", "file")
- dir2.ensure("hello")
- source.join("conftest.py").write(py.code.Source("""
- rsyncdirs = ['dir1', 'dir5']
- rsyncignore = ['dir1/dir2', 'dir5/dir6']
- """))
- session = testdir.reparseconfig([source]).initsession()
- nodemanager = NodeManager(session.config,
- ["popen//chdir=%s" % dest])
- nodemanager.rsync_roots()
- assert dest.join("dir1").check()
- assert not dest.join("dir1", "dir2").check()
- assert dest.join("dir5","file").check()
- assert not dest.join("dir6").check()
-
- def test_optimise_popen(self, testdir, mysetup):
- source, dest = mysetup.source, mysetup.dest
- specs = ["popen"] * 3
- source.join("conftest.py").write("rsyncdirs = ['a']")
- source.ensure('a', dir=1)
- config = testdir.reparseconfig([source])
- nodemanager = NodeManager(config, specs)
- nodemanager.rsync_roots()
- for gwspec in nodemanager.gwmanager.specs:
- assert gwspec._samefilesystem()
- assert not gwspec.chdir
-
- def test_setup_DEBUG(self, mysetup, testdir):
- source = mysetup.source
- specs = ["popen"] * 2
- source.join("conftest.py").write("rsyncdirs = ['a']")
- source.ensure('a', dir=1)
- config = testdir.reparseconfig([source, '--debug'])
- assert config.option.debug
- nodemanager = NodeManager(config, specs)
- reprec = testdir.getreportrecorder(config).hookrecorder
- nodemanager.setup_nodes(putevent=[].append)
- for spec in nodemanager.gwmanager.specs:
- l = reprec.getcalls("pytest_trace")
- assert l
- nodemanager.teardown_nodes()
-
- def test_ssh_setup_nodes(self, specssh, testdir):
- testdir.makepyfile(__init__="", test_x="""
- def test_one():
- pass
- """)
- reprec = testdir.inline_run("-d", "--rsyncdir=%s" % testdir.tmpdir,
- "--tx", specssh, testdir.tmpdir)
- rep, = reprec.getreports("pytest_runtest_logreport")
- assert rep.passed
-
--- a/doc/test/plugin/index.txt
+++ b/doc/test/plugin/index.txt
@@ -8,7 +8,7 @@ mark_ generic mechanism for marking pyth
pdb_ interactive debugging with the Python Debugger.
-figleaf_ (external) for testing with Titus' figleaf coverage module
+figleaf_ report test coverage using the 'figleaf' package.
coverage_ (external) for testing with Ned's coverage module
@@ -21,13 +21,15 @@ recwarn_ helpers for asserting deprecati
tmpdir_ provide temporary directories to test functions.
-testing domains
-===============
+other testing domains, misc
+===========================
oejskit_ (external) run javascript tests in real life browsers
django_ (external) for testing django applications
+xdist_ loop on failing tests, distribute test runs to CPUs and hosts.
+
genscript_ generate standalone test script to be distributed along with an application.
--- a/testing/pytest/dist/test_txnode.py
+++ /dev/null
@@ -1,148 +0,0 @@
-
-import py
-import execnet
-from py.impl.test.dist.txnode import TXNode
-queue = py.builtin._tryimport("queue", "Queue")
-Queue = queue.Queue
-
-class EventQueue:
- def __init__(self, registry, queue=None):
- if queue is None:
- queue = Queue()
- self.queue = queue
- registry.register(self)
-
- def geteventargs(self, eventname, timeout=2.0):
- events = []
- while 1:
- try:
- eventcall = self.queue.get(timeout=timeout)
- except queue.Empty:
- #print "node channel", self.node.channel
- #print "remoteerror", self.node.channel._getremoteerror()
- py.builtin.print_("seen events", events)
- raise IOError("did not see %r events" % (eventname))
- else:
- name, args, kwargs = eventcall
- assert isinstance(name, str)
- if name == eventname:
- if args:
- return args
- return kwargs
- events.append(name)
- if name == "pytest_internalerror":
- py.builtin.print_(str(kwargs["excrepr"]))
-
-class MySetup:
- def __init__(self, request):
- self.id = 0
- self.request = request
-
- def geteventargs(self, eventname, timeout=2.0):
- eq = EventQueue(self.config.pluginmanager, self.queue)
- return eq.geteventargs(eventname, timeout=timeout)
-
- def makenode(self, config=None):
- if config is None:
- testdir = self.request.getfuncargvalue("testdir")
- config = testdir.reparseconfig([])
- self.config = config
- self.queue = Queue()
- self.xspec = execnet.XSpec("popen")
- self.gateway = execnet.makegateway(self.xspec)
- self.id += 1
- self.gateway.id = str(self.id)
- self.node = TXNode(self.gateway, self.config, putevent=self.queue.put)
- assert not self.node.channel.isclosed()
- return self.node
-
- def xfinalize(self):
- if hasattr(self, 'node'):
- gw = self.node.gateway
- py.builtin.print_("exiting:", gw)
- gw.exit()
-
-def pytest_funcarg__mysetup(request):
- mysetup = MySetup(request)
- #pyfuncitem.addfinalizer(mysetup.finalize)
- return mysetup
-
-def test_node_hash_equality(mysetup):
- node = mysetup.makenode()
- node2 = mysetup.makenode()
- assert node != node2
- assert node == node
- assert not (node != node)
-
-class TestMasterSlaveConnection:
- def test_crash_invalid_item(self, mysetup):
- node = mysetup.makenode()
- node.send(123) # invalid item
- kwargs = mysetup.geteventargs("pytest_testnodedown")
- assert kwargs['node'] is node
- assert "Not properly terminated" in str(kwargs['error'])
-
- def test_crash_killed(self, testdir, mysetup):
- if not hasattr(py.std.os, 'kill'):
- py.test.skip("no os.kill")
- item = testdir.getitem("""
- def test_func():
- import os
- os.kill(os.getpid(), 9)
- """)
- node = mysetup.makenode(item.config)
- node.send(item)
- kwargs = mysetup.geteventargs("pytest_testnodedown")
- assert kwargs['node'] is node
- assert "Not properly terminated" in str(kwargs['error'])
-
- def test_node_down(self, mysetup):
- node = mysetup.makenode()
- node.shutdown()
- kwargs = mysetup.geteventargs("pytest_testnodedown")
- assert kwargs['node'] is node
- assert not kwargs['error']
- node.callback(node.ENDMARK)
- excinfo = py.test.raises(IOError,
- "mysetup.geteventargs('testnodedown', timeout=0.01)")
-
- def test_send_on_closed_channel(self, testdir, mysetup):
- item = testdir.getitem("def test_func(): pass")
- node = mysetup.makenode(item.config)
- node.channel.close()
- py.test.raises(IOError, "node.send(item)")
- #ev = self.getcalls(pytest_internalerror)
- #assert ev.excinfo.errisinstance(IOError)
-
- def test_send_one(self, testdir, mysetup):
- item = testdir.getitem("def test_func(): pass")
- node = mysetup.makenode(item.config)
- node.send(item)
- kwargs = mysetup.geteventargs("pytest_runtest_logreport")
- rep = kwargs['report']
- assert rep.passed
- py.builtin.print_(rep)
- assert rep.item == item
-
- def test_send_some(self, testdir, mysetup):
- items = testdir.getitems("""
- def test_pass():
- pass
- def test_fail():
- assert 0
- def test_skip():
- import py
- py.test.skip("x")
- """)
- node = mysetup.makenode(items[0].config)
- for item in items:
- node.send(item)
- for outcome in "passed failed skipped".split():
- kwargs = mysetup.geteventargs("pytest_runtest_logreport")
- report = kwargs['report']
- assert getattr(report, outcome)
-
- node.sendlist(items)
- for outcome in "passed failed skipped".split():
- rep = mysetup.geteventargs("pytest_runtest_logreport")['report']
- assert getattr(rep, outcome)
--- a/doc/test/plugin/monkeypatch.txt
+++ b/doc/test/plugin/monkeypatch.txt
@@ -1,8 +1,7 @@
-
-pytest_monkeypatch plugin
-=========================
safely patch object attributes, dicts and environment variables.
+================================================================
+
.. contents::
:local:
--- a/bin-for-dist/makepluginlist.py
+++ b/bin-for-dist/makepluginlist.py
@@ -6,8 +6,8 @@ plugins = [
('advanced python testing',
'skipping mark pdb figleaf coverage '
'monkeypatch capture recwarn tmpdir',),
- ('testing domains',
- 'oejskit django genscript'),
+ ('other testing domains, misc',
+ 'oejskit django xdist genscript'),
('reporting and failure logging',
'pastebin logxml xmlresult resultlog terminal',),
('other testing conventions',
@@ -22,7 +22,6 @@ plugins = [
externals = {
'oejskit': "run javascript tests in real life browsers",
- 'figleaf': "for testing with Titus' figleaf coverage module",
'django': "for testing django applications",
'coverage': "for testing with Ned's coverage module ",
'xmlresult': "for generating xml reports "
@@ -159,7 +158,7 @@ class PluginDoc(RestWriter):
config.pluginmanager.import_plugin(name)
plugin = config.pluginmanager.getplugin(name)
assert plugin is not None, plugin
-
+ print plugin
doc = plugin.__doc__.strip()
i = doc.find("\n")
if i == -1:
@@ -169,12 +168,13 @@ class PluginDoc(RestWriter):
oneliner = doc[:i].strip()
moduledoc = doc[i+1:].strip()
- self.name = plugin.__name__.split(".")[-1]
+ self.name = oneliner # plugin.__name__.split(".")[-1]
self.oneliner = oneliner
self.moduledoc = moduledoc
- self.h1("%s plugin" % self.name) # : %s" %(self.name, self.oneliner))
- self.Print(self.oneliner)
+ #self.h1("%s plugin" % self.name) # : %s" %(self.name, self.oneliner))
+ self.h1(oneliner)
+ #self.Print(self.oneliner)
self.Print()
self.Print(".. contents::")
self.Print(" :local:")
--- a/setup.py
+++ b/setup.py
@@ -55,8 +55,6 @@ def main():
'py.impl.path',
'py.impl.process',
'py.impl.test',
- 'py.impl.test.dist',
- 'py.impl.test.looponfail',
],
zip_safe=False,
)
--- a/conftest.py
+++ b/conftest.py
@@ -5,14 +5,7 @@ pytest_plugins = '_pytest doctest pytest
collect_ignore = ['build', 'doc/_build']
-
rsyncdirs = ['conftest.py', 'bin', 'py', 'doc', 'testing']
-try:
- import execnet
-except ImportError:
- pass
-else:
- rsyncdirs.append(str(py.path.local(execnet.__file__).dirpath()))
import py
def pytest_addoption(parser):
@@ -20,42 +13,16 @@ def pytest_addoption(parser):
group.addoption('--sshhost',
action="store", dest="sshhost", default=None,
help=("ssh xspec for ssh functional tests. "))
- group.addoption('--gx',
- action="append", dest="gspecs", default=None,
- help=("add a global test environment, XSpec-syntax. "))
group.addoption('--runslowtests',
action="store_true", dest="runslowtests", default=False,
help=("run slow tests"))
-def pytest_funcarg__specssh(request):
- return getspecssh(request.config)
-def getgspecs(config):
- return [execnet.XSpec(spec)
- for spec in config.getvalueorskip("gspecs")]
-
-# configuration information for tests
-def getgspecs(config):
- return [execnet.XSpec(spec)
- for spec in config.getvalueorskip("gspecs")]
-
-def getspecssh(config):
- xspecs = getgspecs(config)
- for spec in xspecs:
- if spec.ssh:
- if not py.path.local.sysfind("ssh"):
- py.test.skip("command not found: ssh")
- return spec
- py.test.skip("need '--gx ssh=...'")
-
-def getsocketspec(config):
- xspecs = getgspecs(config)
- for spec in xspecs:
- if spec.socket:
- return spec
- py.test.skip("need '--gx socket=...'")
-
-
+def pytest_funcarg__sshhost(request):
+ val = request.config.getvalue("sshhost")
+ if val:
+ return val
+ py.test.skip("need --sshhost option")
def pytest_generate_tests(metafunc):
multi = getattr(metafunc.function, 'multi', None)
if multi is not None:
--- a/testing/plugin/test_pytest_default.py
+++ b/testing/plugin/test_pytest_default.py
@@ -1,21 +1,6 @@
import py
from py.plugin.pytest_default import pytest_report_iteminfo
-def test_implied_different_sessions(testdir, tmpdir):
- def x(*args):
- config = testdir.reparseconfig([tmpdir] + list(args))
- try:
- config.pluginmanager.do_configure(config)
- except ValueError:
- return Exception
- return getattr(config._sessionclass, '__name__', None)
- assert x() == None
- py.test.importorskip("execnet")
- assert x('-d') == 'DSession'
- assert x('--dist=each') == 'DSession'
- assert x('-n3') == 'DSession'
- assert x('-f') == 'LooponfailingSession'
-
def test_plugin_specify(testdir):
testdir.chdir()
config = py.test.raises(ImportError, """
@@ -40,50 +25,6 @@ def test_exclude(testdir):
assert result.ret == 0
assert result.stdout.fnmatch_lines(["*1 passed*"])
-class TestDistOptions:
- def setup_method(self, method):
- py.test.importorskip("execnet")
- def test_getxspecs(self, testdir):
- config = testdir.parseconfigure("--tx=popen", "--tx", "ssh=xyz")
- xspecs = config.getxspecs()
- assert len(xspecs) == 2
- print(xspecs)
- assert xspecs[0].popen
- assert xspecs[1].ssh == "xyz"
-
- def test_xspecs_multiplied(self, testdir):
- xspecs = testdir.parseconfigure("--tx=3*popen",).getxspecs()
- assert len(xspecs) == 3
- assert xspecs[1].popen
-
- def test_getrsyncdirs(self, testdir):
- config = testdir.parseconfigure('--rsyncdir=' + str(testdir.tmpdir))
- roots = config.getrsyncdirs()
- assert len(roots) == 1 + 1 # pylib itself
- assert testdir.tmpdir in roots
-
- def test_getrsyncdirs_with_conftest(self, testdir):
- p = py.path.local()
- for bn in 'x y z'.split():
- p.mkdir(bn)
- testdir.makeconftest("""
- rsyncdirs= 'x',
- """)
- config = testdir.parseconfigure(testdir.tmpdir, '--rsyncdir=y', '--rsyncdir=z')
- roots = config.getrsyncdirs()
- assert len(roots) == 3 + 1 # pylib itself
- assert py.path.local('y') in roots
- assert py.path.local('z') in roots
- assert testdir.tmpdir.join('x') in roots
-
- def test_dist_options(self, testdir):
- config = testdir.parseconfigure("-n 2")
- assert config.option.dist == "load"
- assert config.option.tx == ['popen'] * 2
-
- config = testdir.parseconfigure("-d")
- assert config.option.dist == "load"
-
def test_pytest_report_iteminfo():
class FakeItem(object):
--- a/doc/test/plugin/mark.txt
+++ b/doc/test/plugin/mark.txt
@@ -1,8 +1,7 @@
-
-pytest_mark plugin
-==================
generic mechanism for marking python functions.
+===============================================
+
.. contents::
:local:
--- a/doc/test/plugin/doctest.txt
+++ b/doc/test/plugin/doctest.txt
@@ -1,8 +1,7 @@
-
-pytest_doctest plugin
-=====================
collect and execute doctests from modules and test files.
+=========================================================
+
.. contents::
:local:
--- a/testing/plugin/test_pytest_pdb.py
+++ b/testing/plugin/test_pytest_pdb.py
@@ -43,14 +43,3 @@ class TestPDB:
child.expect("1 failed")
if child.isalive():
child.wait()
-
- def test_dist_incompatibility_messages(self, testdir):
- py.test.importorskip("execnet")
- Error = py.test.config.Error
- py.test.raises(Error, "testdir.parseconfigure('--pdb', '--looponfail')")
- result = testdir.runpytest("--pdb", "-n", "3")
- assert result.ret != 0
- assert "incompatible" in result.stderr.str()
- result = testdir.runpytest("--pdb", "-d", "--tx", "popen")
- assert result.ret != 0
- assert "incompatible" in result.stderr.str()
--- a/doc/test/plugin/logxml.txt
+++ b/doc/test/plugin/logxml.txt
@@ -1,8 +1,7 @@
-
-pytest_logxml plugin
-====================
logging of test results in JUnit-XML format, for use with Hudson
+================================================================
+
.. contents::
:local:
--- a/testing/pytest/dist/test_mypickle.py
+++ /dev/null
@@ -1,254 +0,0 @@
-
-import py
-import sys
-import execnet
-
-Queue = py.builtin._tryimport('queue', 'Queue').Queue
-
-from py.impl.test.dist.mypickle import ImmutablePickler, PickleChannel
-from py.impl.test.dist.mypickle import UnpickleError, makekey
-# first let's test some basic functionality
-
-def pytest_generate_tests(metafunc):
- if 'picklemod' in metafunc.funcargnames:
- import pickle
- metafunc.addcall(funcargs={'picklemod': pickle})
- try:
- import cPickle
- except ImportError:
- pass
- else:
- metafunc.addcall(funcargs={'picklemod': cPickle})
- elif "obj" in metafunc.funcargnames and "proto" in metafunc.funcargnames:
- a1 = A()
- a2 = A()
- a2.a1 = a1
- for proto in (0,1,2, -1):
- for obj in {1:2}, [1,2,3], a1, a2:
- metafunc.addcall(funcargs=dict(obj=obj, proto=proto))
-
-def test_underlying_basic_pickling_mechanisms(picklemod):
- f1 = py.io.BytesIO()
- f2 = py.io.BytesIO()
-
- pickler1 = picklemod.Pickler(f1)
- unpickler1 = picklemod.Unpickler(f2)
-
- pickler2 = picklemod.Pickler(f2)
- unpickler2 = picklemod.Unpickler(f1)
-
- #pickler1.memo = unpickler1.memo = {}
- #pickler2.memo = unpickler2.memo = {}
-
- d = {}
-
- pickler1.dump(d)
- f1.seek(0)
- d_other = unpickler2.load()
-
- # translate unpickler2 memo to pickler2
- pickler2.memo = dict([(id(obj), (int(x), obj))
- for x, obj in unpickler2.memo.items()])
-
- pickler2.dump(d_other)
- f2.seek(0)
-
- unpickler1.memo = dict([(makekey(x), y)
- for x, y in pickler1.memo.values()])
- d_back = unpickler1.load()
- assert d is d_back
-
-
-class A:
- pass
-
-
-def test_pickle_and_back_IS_same(obj, proto):
- p1 = ImmutablePickler(uneven=False, protocol=proto)
- p2 = ImmutablePickler(uneven=True, protocol=proto)
- s1 = p1.dumps(obj)
- d2 = p2.loads(s1)
- s2 = p2.dumps(d2)
- obj_back = p1.loads(s2)
- assert obj is obj_back
-
-def test_pickling_twice_before_unpickling():
- p1 = ImmutablePickler(uneven=False)
- p2 = ImmutablePickler(uneven=True)
-
- a1 = A()
- a2 = A()
- a3 = A()
- a3.a1 = a1
- a2.a1 = a1
- s1 = p1.dumps(a1)
- a1.a3 = a3
- s2 = p1.dumps(a2)
- other_a1 = p2.loads(s1)
- other_a2 = p2.loads(s2)
- back_a1 = p1.loads(p2.dumps(other_a1))
- other_a3 = p2.loads(p1.dumps(a3))
- back_a3 = p1.loads(p2.dumps(other_a3))
- back_a2 = p1.loads(p2.dumps(other_a2))
- back_a1 = p1.loads(p2.dumps(other_a1))
- assert back_a1 is a1
- assert back_a2 is a2
-
-def test_pickling_concurrently():
- p1 = ImmutablePickler(uneven=False)
- p2 = ImmutablePickler(uneven=True)
-
- a1 = A()
- a1.hasattr = 42
- a2 = A()
-
- s1 = p1.dumps(a1)
- s2 = p2.dumps(a2)
- other_a1 = p2.loads(s1)
- other_a2 = p1.loads(s2)
- a1_back = p1.loads(p2.dumps(other_a1))
-
-def test_self_memoize():
- p1 = ImmutablePickler(uneven=False)
- a1 = A()
- p1.selfmemoize(a1)
- x = p1.loads(p1.dumps(a1))
- assert x is a1
-
-TESTTIMEOUT = 2.0
-class TestPickleChannelFunctional:
- def setup_class(cls):
- cls.gw = execnet.PopenGateway()
- cls.gw.remote_exec(
- "import py ; py.path.local(%r).pyimport()" %(__file__)
- )
- cls.gw.remote_init_threads(5)
- # we need the remote test code to import
- # the same test module here
-
- def test_popen_send_instance(self):
- channel = self.gw.remote_exec("""
- from py.impl.test.dist.mypickle import PickleChannel
- channel = PickleChannel(channel)
- from testing.pytest.dist.test_mypickle import A
- a1 = A()
- a1.hello = 10
- channel.send(a1)
- a2 = channel.receive()
- channel.send(a2 is a1)
- """)
- channel = PickleChannel(channel)
- a_received = channel.receive()
- assert isinstance(a_received, A)
- assert a_received.hello == 10
- channel.send(a_received)
- remote_a2_is_a1 = channel.receive()
- assert remote_a2_is_a1
-
- def test_send_concurrent(self):
- channel = self.gw.remote_exec("""
- from py.impl.test.dist.mypickle import PickleChannel
- channel = PickleChannel(channel)
- from testing.pytest.dist.test_mypickle import A
- l = [A() for i in range(10)]
- channel.send(l)
- other_l = channel.receive()
- channel.send((l, other_l))
- channel.send(channel.receive())
- channel.receive()
- """)
- channel = PickleChannel(channel)
- l = [A() for i in range(10)]
- channel.send(l)
- other_l = channel.receive()
- channel.send(other_l)
- ret = channel.receive()
- assert ret[0] is other_l
- assert ret[1] is l
- back = channel.receive()
- assert other_l is other_l
- channel.send(None)
-
- #s1 = p1.dumps(a1)
- #s2 = p2.dumps(a2)
- #other_a1 = p2.loads(s1)
- #other_a2 = p1.loads(s2)
- #a1_back = p1.loads(p2.dumps(other_a1))
-
- def test_popen_with_callback(self):
- channel = self.gw.remote_exec("""
- from py.impl.test.dist.mypickle import PickleChannel
- channel = PickleChannel(channel)
- from testing.pytest.dist.test_mypickle import A
- a1 = A()
- a1.hello = 10
- channel.send(a1)
- a2 = channel.receive()
- channel.send(a2 is a1)
- """)
- channel = PickleChannel(channel)
- queue = Queue()
- channel.setcallback(queue.put)
- a_received = queue.get(timeout=TESTTIMEOUT)
- assert isinstance(a_received, A)
- assert a_received.hello == 10
- channel.send(a_received)
- #remote_a2_is_a1 = queue.get(timeout=TESTTIMEOUT)
- #assert remote_a2_is_a1
-
- def test_popen_with_callback_with_endmarker(self):
- channel = self.gw.remote_exec("""
- from py.impl.test.dist.mypickle import PickleChannel
- channel = PickleChannel(channel)
- from testing.pytest.dist.test_mypickle import A
- a1 = A()
- a1.hello = 10
- channel.send(a1)
- a2 = channel.receive()
- channel.send(a2 is a1)
- """)
- channel = PickleChannel(channel)
- queue = Queue()
- channel.setcallback(queue.put, endmarker=-1)
-
- a_received = queue.get(timeout=TESTTIMEOUT)
- assert isinstance(a_received, A)
- assert a_received.hello == 10
- channel.send(a_received)
- remote_a2_is_a1 = queue.get(timeout=TESTTIMEOUT)
- assert remote_a2_is_a1
- endmarker = queue.get(timeout=TESTTIMEOUT)
- assert endmarker == -1
-
- def test_popen_with_callback_with_endmarker_and_unpickling_error(self):
- channel = self.gw.remote_exec("""
- from py.impl.test.dist.mypickle import PickleChannel
- channel = PickleChannel(channel)
- from testing.pytest.dist.test_mypickle import A
- a1 = A()
- channel.send(a1)
- channel.send(a1)
- """)
- channel = PickleChannel(channel)
- queue = Queue()
- a = channel.receive()
- channel._ipickle._unpicklememo.clear()
- channel.setcallback(queue.put, endmarker=-1)
- next = queue.get(timeout=TESTTIMEOUT)
- assert next == -1
- error = channel._getremoteerror()
- assert isinstance(error, UnpickleError)
-
- def test_popen_with_various_methods(self):
- channel = self.gw.remote_exec("""
- from py.impl.test.dist.mypickle import PickleChannel
- channel = PickleChannel(channel)
- channel.receive()
- """)
- channel = PickleChannel(channel)
- assert not channel.isclosed()
- assert not channel._getremoteerror()
- channel.send(2)
- channel.waitclose(timeout=2)
-
-
--- a/testing/pytest/looponfail/test_remote.py
+++ /dev/null
@@ -1,151 +0,0 @@
-import py
-py.test.importorskip("execnet")
-from py.impl.test.looponfail.remote import LooponfailingSession, LoopState, RemoteControl
-
-class TestRemoteControl:
- def test_nofailures(self, testdir):
- item = testdir.getitem("def test_func(): pass\n")
- control = RemoteControl(item.config)
- control.setup()
- failures = control.runsession()
- assert not failures
-
- def test_failures_somewhere(self, testdir):
- item = testdir.getitem("def test_func(): assert 0\n")
- control = RemoteControl(item.config)
- control.setup()
- failures = control.runsession()
- assert failures
- control.setup()
- item.fspath.write("def test_func(): assert 1\n")
- pyc = item.fspath.new(ext=".pyc")
- if pyc.check():
- pyc.remove()
- failures = control.runsession(failures)
- assert not failures
-
- def test_failure_change(self, testdir):
- modcol = testdir.getitem("""
- def test_func():
- assert 0
- """)
- control = RemoteControl(modcol.config)
- control.setup()
- failures = control.runsession()
- assert failures
- control.setup()
- modcol.fspath.write(py.code.Source("""
- def test_func():
- assert 1
- def test_new():
- assert 0
- """))
- pyc = modcol.fspath.new(ext=".pyc")
- if pyc.check():
- pyc.remove()
- failures = control.runsession(failures)
- assert not failures
- control.setup()
- failures = control.runsession()
- assert failures
- assert str(failures).find("test_new") != -1
-
-class TestLooponFailing:
- def test_looponfail_from_fail_to_ok(self, testdir):
- modcol = testdir.getmodulecol("""
- def test_one():
- x = 0
- assert x == 1
- def test_two():
- assert 1
- """)
- session = LooponfailingSession(modcol.config)
- loopstate = LoopState()
- session.remotecontrol.setup()
- session.loop_once(loopstate)
- assert len(loopstate.colitems) == 1
-
- modcol.fspath.write(py.code.Source("""
- def test_one():
- x = 15
- assert x == 15
- def test_two():
- assert 1
- """))
- assert session.statrecorder.check()
- session.loop_once(loopstate)
- assert not loopstate.colitems
-
- def test_looponfail_from_one_to_two_tests(self, testdir):
- modcol = testdir.getmodulecol("""
- def test_one():
- assert 0
- """)
- session = LooponfailingSession(modcol.config)
- loopstate = LoopState()
- session.remotecontrol.setup()
- loopstate.colitems = []
- session.loop_once(loopstate)
- assert len(loopstate.colitems) == 1
-
- modcol.fspath.write(py.code.Source("""
- def test_one():
- assert 1 # passes now
- def test_two():
- assert 0 # new and fails
- """))
- assert session.statrecorder.check()
- session.loop_once(loopstate)
- assert len(loopstate.colitems) == 0
-
- session.loop_once(loopstate)
- assert len(loopstate.colitems) == 1
-
- def test_looponfail_removed_test(self, testdir):
- modcol = testdir.getmodulecol("""
- def test_one():
- assert 0
- def test_two():
- assert 0
- """)
- session = LooponfailingSession(modcol.config)
- loopstate = LoopState()
- session.remotecontrol.setup()
- loopstate.colitems = []
- session.loop_once(loopstate)
- assert len(loopstate.colitems) == 2
-
- modcol.fspath.write(py.code.Source("""
- def test_xxx(): # renamed test
- assert 0
- def test_two():
- assert 1 # pass now
- """))
- assert session.statrecorder.check()
- session.loop_once(loopstate)
- assert len(loopstate.colitems) == 0
-
- session.loop_once(loopstate)
- assert len(loopstate.colitems) == 1
-
-
- def test_looponfail_functional_fail_to_ok(self, testdir):
- p = testdir.makepyfile("""
- def test_one():
- x = 0
- assert x == 1
- """)
- child = testdir.spawn_pytest("-f %s" % p)
- child.expect("def test_one")
- child.expect("x == 1")
- child.expect("1 failed")
- child.expect("### LOOPONFAILING ####")
- child.expect("waiting for changes")
- p.write(py.code.Source("""
- def test_one():
- x = 1
- assert x == 1
- """))
- child.expect(".*1 passed.*")
- child.kill(15)
-
--- a/testing/pytest/dist/__init__.py
+++ /dev/null
@@ -1,1 +0,0 @@
-#
--- /dev/null
+++ b/doc/test/plugin/xdist.txt
@@ -0,0 +1,181 @@
+
+loop on failing tests, distribute test runs to CPUs and hosts.
+==============================================================
+
+
+.. contents::
+ :local:
+
+The `pytest-xdist`_ plugin extends py.test with some unique
+test execution modes:
+
+* Looponfail: run your tests in a subprocess. After it finishes py.test
+ waits until a file in your project changes and then re-runs only the
+ failing tests. This is repeated until all tests pass after which again
+ a full run is performed.
+
+* Load-balancing: if you have multiple CPUs or hosts you can use
+ those for a combined test run. This allows to speed up
+ development or to use special resources of remote machines.
+
+* Multi-Platform coverage: you can specify different Python interpreters
+ or different platforms and run tests in parallel on all of them.
+
+Before running tests remotely, ``py.test`` efficiently synchronizes your
+program source code to the remote place. All test results
+are reported back and displayed to your local test session.
+You may specify different Python versions and interpreters.
+
+
+Usage examples
+---------------------
+
+Speed up test runs by sending tests to multiple CPUs
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+
+To send tests to multiple CPUs, type::
+
+ py.test -n NUM
+
+Especially for longer running tests or tests requiring
+a lot of IO this can lead to considerable speed ups.
+
+
+Running tests in a Python subprocess
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+
+To instantiate a python2.4 sub process and send tests to it, you may type::
+
+ py.test -d --tx popen//python=python2.4
+
+This will start a subprocess which is run with the "python2.4"
+Python interpreter, found in your system binary lookup path.
+
+If you prefix the --tx option value like this::
+
+ --tx 3*popen//python=python2.4
+
+then three subprocesses would be created and tests
+will be load-balanced across these three processes.
+
+
+Sending tests to remote SSH accounts
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+
+Suppose you have a package ``mypkg`` which contains some
+tests that you can successfully run locally. And you
+have a ssh-reachable machine ``myhost``. Then
+you can ad-hoc distribute your tests by typing::
+
+ py.test -d --tx ssh=myhostpopen --rsyncdir mypkg mypkg
+
+This will synchronize your ``mypkg`` package directory
+to an remote ssh account and then locally collect tests
+and send them to remote places for execution.
+
+You can specify multiple ``--rsyncdir`` directories
+to be sent to the remote side.
+
+**NOTE:** For py.test to collect and send tests correctly
+you not only need to make sure all code and tests
+directories are rsynced, but that any test (sub) directory
+also has an ``__init__.py`` file because internally
+py.test references tests as a fully qualified python
+module path. **You will otherwise get strange errors**
+during setup of the remote side.
+
+Sending tests to remote Socket Servers
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+
+Download the single-module `socketserver.py`_ Python program
+and run it like this::
+
+ python socketserver.py
+
+It will tell you that it starts listening on the default
+port. You can now on your home machine specify this
+new socket host with something like this::
+
+ py.test -d --tx socket=192.168.1.102:8888 --rsyncdir mypkg mypkg
+
+
+.. _`atonce`:
+
+Running tests on many platforms at once
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+
+The basic command to run tests on multiple platforms is::
+
+ py.test --dist=each --tx=spec1 --tx=spec2
+
+If you specify a windows host, an OSX host and a Linux
+environment this command will send each tests to all
+platforms - and report back failures from all platforms
+at once. The specifications strings use the `xspec syntax`_.
+
+.. _`xspec syntax`: http://codespeak.net/execnet/trunk/basics.html#xspec
+
+.. _`socketserver.py`: http://codespeak.net/svn/py/dist/py/execnet/script/socketserver.py
+
+.. _`execnet`: http://codespeak.net/execnet
+
+Specifying test exec environments in a conftest.py
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+
+Instead of specifying command line options, you can
+put options values in a ``conftest.py`` file like this::
+
+ pytest_option_tx = ['ssh=myhost//python=python2.5', 'popen//python=python2.5']
+ pytest_option_dist = True
+
+Any commandline ``--tx`` specifictions will add to the list of available execution
+environments.
+
+Specifying "rsync" dirs in a conftest.py
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+
+In your ``mypkg/conftest.py`` you may specify directories to synchronise
+or to exclude::
+
+ rsyncdirs = ['.', '../plugins']
+ rsyncignore = ['_cache']
+
+These directory specifications are relative to the directory
+where the ``conftest.py`` is found.
+
+command line options
+--------------------
+
+
+``-f, --looponfail``
+ run tests in subprocess, wait for modified files and re-run failing test set until all pass.
+``-n numprocesses``
+ shortcut for '--dist=load --tx=NUM*popen'
+``--boxed``
+ box each test run in a separate process (unix)
+``--dist=distmode``
+ set mode for distributing tests to exec environments.
+
+ each: send each test to each available environment.
+
+ load: send each test to available environment.
+
+ (default) no: run tests inprocess, don't distribute.
+``--tx=xspec``
+ add a test execution environment. some examples: --tx popen//python=python2.5 --tx socket=192.168.1.102:8888 --tx ssh=user at codespeak.net//chdir=testcache
+``-d``
+ load-balance tests. shortcut for '--dist=load'
+``--rsyncdir=dir1``
+ add directory for rsyncing to remote tx nodes.
+
+Start improving this plugin in 30 seconds
+=========================================
+
+
+1. Download `plugin.py`_ plugin source code
+2. put it somewhere as ``plugin.py`` into your import path
+3. a subsequent ``py.test`` run will use your local version
+
+Checkout customize_, other plugins_ or `get in contact`_.
+
+.. include:: links.txt
--- a/bin-for-dist/test_install.py
+++ b/bin-for-dist/test_install.py
@@ -175,7 +175,8 @@ def test_cmdline_entrypoints(monkeypatch
for script in unversioned_scripts:
assert script in points
-def test_slave_popen_needs_no_pylib(testdir, venv):
+def test_slave_popen_needs_no_pylib(testdir, venv, pytestconfig):
+ pytestconfig.pluginmanager.skipifmissing("xdist")
venv.ensure()
#xxx execnet optimizes popen
#ch = venv.makegateway().remote_exec("import execnet")
@@ -192,8 +193,10 @@ def test_slave_popen_needs_no_pylib(test
"*1 passed*"
])
-def test_slave_needs_no_execnet(testdir, specssh):
- gw = execnet.makegateway(specssh)
+def test_slave_needs_no_execnet(testdir, sshhost, pytestconfig):
+ pytestconfig.pluginmanager.skipifmissing("xdist")
+ xspec = "ssh=%s" % sshhost
+ gw = execnet.makegateway("ssh=%s" % sshhost)
ch = gw.remote_exec("""
import os, subprocess
subprocess.call(["virtualenv", "--no-site-packages", "subdir"])
@@ -207,7 +210,7 @@ def test_slave_needs_no_execnet(testdir,
e = sys.exc_info()[1]
py.test.skip("could not prepare ssh slave:%s" % str(e))
gw.exit()
- newspec = "%s//python=%s//chdir=%s" % (specssh, path, chdir)
+ newspec = "%s//python=%s//chdir=%s" % (xspec, path, chdir)
gw = execnet.makegateway(newspec)
ch = gw.remote_exec("import execnet")
py.test.raises(ch.RemoteError, ch.waitclose)
--- a/testing/pytest/test_deprecated_api.py
+++ b/testing/pytest/test_deprecated_api.py
@@ -265,42 +265,6 @@ def test_config_cmdline_options(recwarn,
recwarn.pop(DeprecationWarning)
assert config.option.gdest == 17
-def test_dist_conftest_options(testdir):
- p1 = testdir.tmpdir.ensure("dir", 'p1.py')
- p1.dirpath("__init__.py").write("")
- p1.dirpath("conftest.py").write(py.code.Source("""
- import py
- from py.builtin import print_
- print_("importing conftest", __file__)
- Option = py.test.config.Option
- option = py.test.config.addoptions("someopt",
- Option('--someopt', action="store_true",
- dest="someopt", default=False))
- dist_rsync_roots = ['../dir']
- print_("added options", option)
- print_("config file seen from conftest", py.test.config)
- """))
- p1.write(py.code.Source("""
- import py
- from %s import conftest
- from py.builtin import print_
- def test_1():
- print_("config from test_1", py.test.config)
- print_("conftest from test_1", conftest.__file__)
- print_("test_1: py.test.config.option.someopt", py.test.config.option.someopt)
- print_("test_1: conftest", conftest)
- print_("test_1: conftest.option.someopt", conftest.option.someopt)
- assert conftest.option.someopt
- """ % p1.dirpath().purebasename ))
- result = testdir.runpytest('-d', '--tx=popen', p1, '--someopt')
- assert result.ret == 0
- result.stderr.fnmatch_lines([
- "*Deprecation*pytest_addoptions*",
- ])
- result.stdout.fnmatch_lines([
- "*1 passed*",
- ])
-
def test_conftest_non_python_items(recwarn, testdir):
testdir.makepyfile(conftest="""
import py
--- a/testing/pytest/looponfail/test_util.py
+++ /dev/null
@@ -1,61 +0,0 @@
-import py
-from py.impl.test.looponfail.util import StatRecorder
-
-def test_filechange(tmpdir):
- tmp = tmpdir
- hello = tmp.ensure("hello.py")
- sd = StatRecorder([tmp])
- changed = sd.check()
- assert not changed
-
- hello.write("world")
- changed = sd.check()
- assert changed
-
- tmp.ensure("new.py")
- changed = sd.check()
- assert changed
-
- tmp.join("new.py").remove()
- changed = sd.check()
- assert changed
-
- tmp.join("a", "b", "c.py").ensure()
- changed = sd.check()
- assert changed
-
- tmp.join("a", "c.txt").ensure()
- changed = sd.check()
- assert changed
- changed = sd.check()
- assert not changed
-
- tmp.join("a").remove()
- changed = sd.check()
- assert changed
-
-def test_pycremoval(tmpdir):
- tmp = tmpdir
- hello = tmp.ensure("hello.py")
- sd = StatRecorder([tmp])
- changed = sd.check()
- assert not changed
-
- pycfile = hello + "c"
- pycfile.ensure()
- changed = sd.check()
- assert not changed
-
- hello.write("world")
- changed = sd.check()
- assert not pycfile.check()
-
-
-def test_waitonchange(tmpdir, monkeypatch):
- tmp = tmpdir
- sd = StatRecorder([tmp])
-
- l = [True, False]
- monkeypatch.setattr(StatRecorder, 'check', lambda self: l.pop())
- sd.waitonchange(checkinterval=0.2)
- assert not l
--- a/py/impl/test/dist/__init__.py
+++ /dev/null
@@ -1,1 +0,0 @@
-#
--- a/doc/test/plugin/skipping.txt
+++ b/doc/test/plugin/skipping.txt
@@ -1,8 +1,7 @@
-
-pytest_skipping plugin
-======================
advanced skipping for python test functions, classes or modules.
+================================================================
+
.. contents::
:local:
--- a/py/impl/test/dist/mypickle.py
+++ /dev/null
@@ -1,187 +0,0 @@
-"""
-
- Pickling support for two processes that want to exchange
- *immutable* object instances. Immutable in the sense
- that the receiving side of an object can modify its
- copy but when it sends it back the original sending
- side will continue to see its unmodified version
- (and no actual state will go over the wire).
-
- This module also implements an experimental
- execnet pickling channel using this idea.
-
-"""
-
-import py
-import sys, os, struct
-#debug = open("log-mypickle-%d" % os.getpid(), 'w')
-
-if sys.version_info >= (3,0):
- makekey = lambda x: x
- fromkey = lambda x: x
- from pickle import _Pickler as Pickler
- from pickle import _Unpickler as Unpickler
-else:
- makekey = str
- fromkey = int
- from pickle import Pickler, Unpickler
-
-
-class MyPickler(Pickler):
- """ Pickler with a custom memoize()
- to take care of unique ID creation.
- See the usage in ImmutablePickler
- XXX we could probably extend Pickler
- and Unpickler classes to directly
- update the other'S memos.
- """
- def __init__(self, file, protocol, uneven):
- Pickler.__init__(self, file, protocol)
- self.uneven = uneven
-
- def memoize(self, obj):
- if self.fast:
- return
- assert id(obj) not in self.memo
- memo_len = len(self.memo)
- key = memo_len * 2 + self.uneven
- self.write(self.put(key))
- self.memo[id(obj)] = key, obj
-
- #if sys.version_info < (3,0):
- # def save_string(self, obj, pack=struct.pack):
- # obj = unicode(obj)
- # self.save_unicode(obj, pack=pack)
- # Pickler.dispatch[str] = save_string
-
-class ImmutablePickler:
- def __init__(self, uneven, protocol=0):
- """ ImmutablePicklers are instantiated in Pairs.
- The two sides need to create unique IDs
- while pickling their objects. This is
- done by using either even or uneven
- numbers, depending on the instantiation
- parameter.
- """
- self._picklememo = {}
- self._unpicklememo = {}
- self._protocol = protocol
- self.uneven = uneven and 1 or 0
-
- def selfmemoize(self, obj):
- # this is for feeding objects to ourselfes
- # which be the case e.g. if you want to pickle
- # from a forked process back to the original
- f = py.io.BytesIO()
- pickler = MyPickler(f, self._protocol, uneven=self.uneven)
- pickler.memo = self._picklememo
- pickler.memoize(obj)
- self._updateunpicklememo()
-
- def dumps(self, obj):
- f = py.io.BytesIO()
- pickler = MyPickler(f, self._protocol, uneven=self.uneven)
- pickler.memo = self._picklememo
- pickler.dump(obj)
- if obj is not None:
- self._updateunpicklememo()
- #print >>debug, "dumped", obj
- #print >>debug, "picklememo", self._picklememo
- return f.getvalue()
-
- def loads(self, string):
- f = py.io.BytesIO(string)
- unpickler = Unpickler(f)
- unpickler.memo = self._unpicklememo
- res = unpickler.load()
- self._updatepicklememo()
- #print >>debug, "loaded", res
- #print >>debug, "unpicklememo", self._unpicklememo
- return res
-
- def _updatepicklememo(self):
- for x, obj in self._unpicklememo.items():
- self._picklememo[id(obj)] = (fromkey(x), obj)
-
- def _updateunpicklememo(self):
- for key,obj in self._picklememo.values():
- key = makekey(key)
- if key in self._unpicklememo:
- assert self._unpicklememo[key] is obj
- self._unpicklememo[key] = obj
-
-NO_ENDMARKER_WANTED = object()
-
-class UnpickleError(Exception):
- """ Problems while unpickling. """
- def __init__(self, formatted):
- self.formatted = formatted
- Exception.__init__(self, formatted)
- def __str__(self):
- return self.formatted
-
-class PickleChannel(object):
- """ PickleChannels wrap execnet channels
- and allow to send/receive by using
- "immutable pickling".
- """
- _unpicklingerror = None
- def __init__(self, channel):
- self._channel = channel
- # we use the fact that each side of a
- # gateway connection counts with uneven
- # or even numbers depending on which
- # side it is (for the purpose of creating
- # unique ids - which is what we need it here for)
- uneven = channel.gateway._channelfactory.count % 2
- self._ipickle = ImmutablePickler(uneven=uneven)
- self.RemoteError = channel.RemoteError
-
- def send(self, obj):
- pickled_obj = self._ipickle.dumps(obj)
- self._channel.send(pickled_obj)
-
- def receive(self):
- pickled_obj = self._channel.receive()
- return self._unpickle(pickled_obj)
-
- def _unpickle(self, pickled_obj):
- if isinstance(pickled_obj, self._channel.__class__):
- return pickled_obj
- return self._ipickle.loads(pickled_obj)
-
- def _getremoteerror(self):
- return self._unpicklingerror or self._channel._getremoteerror()
-
- def close(self):
- return self._channel.close()
-
- def isclosed(self):
- return self._channel.isclosed()
-
- def waitclose(self, timeout=None):
- return self._channel.waitclose(timeout=timeout)
-
- def setcallback(self, callback, endmarker=NO_ENDMARKER_WANTED):
- if endmarker is NO_ENDMARKER_WANTED:
- def unpickle_callback(pickled_obj):
- obj = self._unpickle(pickled_obj)
- callback(obj)
- self._channel.setcallback(unpickle_callback)
- return
- uniqueendmarker = object()
- def unpickle_callback(pickled_obj):
- if pickled_obj is uniqueendmarker:
- return callback(endmarker)
- try:
- obj = self._unpickle(pickled_obj)
- except KeyboardInterrupt:
- raise
- except:
- excinfo = py.code.ExceptionInfo()
- formatted = str(excinfo.getrepr(showlocals=True,funcargs=True))
- self._unpicklingerror = UnpickleError(formatted)
- callback(endmarker)
- else:
- callback(obj)
- self._channel.setcallback(unpickle_callback, uniqueendmarker)
More information about the pytest-commit
mailing list