[py-svn] py-trunk commit 42a36fa714e6: remove/reduce internal global state: py._com.registry is now fully contained and always instantiated from the py.test PluginManager class.
commits-noreply at bitbucket.org
commits-noreply at bitbucket.org
Tue Dec 29 12:37:46 CET 2009
# HG changeset patch -- Bitbucket.org
# Project py-trunk
# URL http://bitbucket.org/hpk42/py-trunk/overview/
# User holger krekel <holger at merlinux.eu>
# Date 1262086577 -3600
# Node ID 42a36fa714e665d79b69aa9aa8165871b6f7af2d
# Parent 756383d5451058b00a35aa42d815c9f75fccfdae
remove/reduce internal global state: py._com.registry is now fully contained and always instantiated from the py.test PluginManager class.
--- a/testing/pytest/test_pluginmanager.py
+++ b/testing/pytest/test_pluginmanager.py
@@ -1,5 +1,7 @@
import py, os
from py.impl.test.pluginmanager import PluginManager, canonical_importname
+from py.impl.test.pluginmanager import Registry, MultiCall, HookRelay, varnames
+
class TestBootstrapping:
def test_consider_env_fails_to_import(self, monkeypatch):
@@ -278,3 +280,184 @@ def test_namespace_has_default_and_env_p
""")
result = testdir.runpython(p)
assert result.ret == 0
+
+def test_varnames():
+ def f(x):
+ pass
+ class A:
+ def f(self, y):
+ pass
+ assert varnames(f) == ("x",)
+ assert varnames(A().f) == ('y',)
+
+class TestMultiCall:
+ def test_uses_copy_of_methods(self):
+ l = [lambda: 42]
+ mc = MultiCall(l, {})
+ repr(mc)
+ l[:] = []
+ res = mc.execute()
+ return res == 42
+
+ def test_call_passing(self):
+ class P1:
+ def m(self, __multicall__, x):
+ assert len(__multicall__.results) == 1
+ assert not __multicall__.methods
+ return 17
+
+ class P2:
+ def m(self, __multicall__, x):
+ assert __multicall__.results == []
+ assert __multicall__.methods
+ return 23
+
+ p1 = P1()
+ p2 = P2()
+ multicall = MultiCall([p1.m, p2.m], {'x': 23})
+ assert "23" in repr(multicall)
+ reslist = multicall.execute()
+ assert len(reslist) == 2
+ # ensure reversed order
+ assert reslist == [23, 17]
+
+ def test_keyword_args(self):
+ def f(x):
+ return x + 1
+ class A:
+ def f(self, x, y):
+ return x + y
+ multicall = MultiCall([f, A().f], dict(x=23, y=24))
+ assert "'x': 23" in repr(multicall)
+ assert "'y': 24" in repr(multicall)
+ reslist = multicall.execute()
+ assert reslist == [24+23, 24]
+ assert "2 results" in repr(multicall)
+
+ def test_keywords_call_error(self):
+ multicall = MultiCall([lambda x: x], {})
+ py.test.raises(TypeError, "multicall.execute()")
+
+ def test_call_subexecute(self):
+ def m(__multicall__):
+ subresult = __multicall__.execute()
+ return subresult + 1
+
+ def n():
+ return 1
+
+ call = MultiCall([n, m], {}, firstresult=True)
+ res = call.execute()
+ assert res == 2
+
+ def test_call_none_is_no_result(self):
+ def m1():
+ return 1
+ def m2():
+ return None
+ res = MultiCall([m1, m2], {}, firstresult=True).execute()
+ assert res == 1
+ res = MultiCall([m1, m2], {}).execute()
+ assert res == [1]
+
+class TestRegistry:
+
+ def test_register(self):
+ registry = Registry()
+ class MyPlugin:
+ pass
+ my = MyPlugin()
+ registry.register(my)
+ assert list(registry) == [my]
+ my2 = MyPlugin()
+ registry.register(my2)
+ assert list(registry) == [my, my2]
+
+ assert registry.isregistered(my)
+ assert registry.isregistered(my2)
+ registry.unregister(my)
+ assert not registry.isregistered(my)
+ assert list(registry) == [my2]
+
+ def test_listattr(self):
+ plugins = Registry()
+ class api1:
+ x = 41
+ class api2:
+ x = 42
+ class api3:
+ x = 43
+ plugins.register(api1())
+ plugins.register(api2())
+ plugins.register(api3())
+ l = list(plugins.listattr('x'))
+ assert l == [41, 42, 43]
+ l = list(plugins.listattr('x', reverse=True))
+ assert l == [43, 42, 41]
+
+ class api4:
+ x = 44
+ l = list(plugins.listattr('x', extra=(api4,)))
+ assert l == [41,42,43,44]
+ assert len(list(plugins)) == 3 # otherwise extra added
+
+class TestHookRelay:
+ def test_happypath(self):
+ registry = Registry()
+ class Api:
+ def hello(self, arg):
+ pass
+
+ mcm = HookRelay(hookspecs=Api, registry=registry)
+ assert hasattr(mcm, 'hello')
+ assert repr(mcm.hello).find("hello") != -1
+ class Plugin:
+ def hello(self, arg):
+ return arg + 1
+ registry.register(Plugin())
+ l = mcm.hello(arg=3)
+ assert l == [4]
+ assert not hasattr(mcm, 'world')
+
+ def test_only_kwargs(self):
+ registry = Registry()
+ class Api:
+ def hello(self, arg):
+ pass
+ mcm = HookRelay(hookspecs=Api, registry=registry)
+ py.test.raises(TypeError, "mcm.hello(3)")
+
+ def test_firstresult_definition(self):
+ registry = Registry()
+ class Api:
+ def hello(self, arg): pass
+ hello.firstresult = True
+
+ mcm = HookRelay(hookspecs=Api, registry=registry)
+ class Plugin:
+ def hello(self, arg):
+ return arg + 1
+ registry.register(Plugin())
+ res = mcm.hello(arg=3)
+ assert res == 4
+
+ def test_hooks_extra_plugins(self):
+ registry = Registry()
+ class Api:
+ def hello(self, arg):
+ pass
+ hookrelay = HookRelay(hookspecs=Api, registry=registry)
+ hook_hello = hookrelay.hello
+ class Plugin:
+ def hello(self, arg):
+ return arg + 1
+ registry.register(Plugin())
+ class Plugin2:
+ def hello(self, arg):
+ return arg + 2
+ newhook = hookrelay._makecall("hello", extralookup=Plugin2())
+ l = newhook(arg=3)
+ assert l == [5, 4]
+ l2 = hook_hello(arg=3)
+ assert l2 == [4]
+
--- a/testing/root/test_com.py
+++ b/testing/root/test_com.py
@@ -1,193 +1,3 @@
import py
import os
-from py.impl._com import Registry, MultiCall, HookRelay, varnames
-
-def test_varnames():
- def f(x):
- pass
- class A:
- def f(self, y):
- pass
- assert varnames(f) == ("x",)
- assert varnames(A().f) == ('y',)
-
-class TestMultiCall:
- def test_uses_copy_of_methods(self):
- l = [lambda: 42]
- mc = MultiCall(l, {})
- repr(mc)
- l[:] = []
- res = mc.execute()
- return res == 42
-
- def test_call_passing(self):
- class P1:
- def m(self, __multicall__, x):
- assert len(__multicall__.results) == 1
- assert not __multicall__.methods
- return 17
-
- class P2:
- def m(self, __multicall__, x):
- assert __multicall__.results == []
- assert __multicall__.methods
- return 23
-
- p1 = P1()
- p2 = P2()
- multicall = MultiCall([p1.m, p2.m], {'x': 23})
- assert "23" in repr(multicall)
- reslist = multicall.execute()
- assert len(reslist) == 2
- # ensure reversed order
- assert reslist == [23, 17]
-
- def test_keyword_args(self):
- def f(x):
- return x + 1
- class A:
- def f(self, x, y):
- return x + y
- multicall = MultiCall([f, A().f], dict(x=23, y=24))
- assert "'x': 23" in repr(multicall)
- assert "'y': 24" in repr(multicall)
- reslist = multicall.execute()
- assert reslist == [24+23, 24]
- assert "2 results" in repr(multicall)
-
- def test_keywords_call_error(self):
- multicall = MultiCall([lambda x: x], {})
- py.test.raises(TypeError, "multicall.execute()")
-
- def test_call_subexecute(self):
- def m(__multicall__):
- subresult = __multicall__.execute()
- return subresult + 1
-
- def n():
- return 1
-
- call = MultiCall([n, m], {}, firstresult=True)
- res = call.execute()
- assert res == 2
-
- def test_call_none_is_no_result(self):
- def m1():
- return 1
- def m2():
- return None
- res = MultiCall([m1, m2], {}, firstresult=True).execute()
- assert res == 1
- res = MultiCall([m1, m2], {}).execute()
- assert res == [1]
-
-class TestRegistry:
-
- def test_register(self):
- registry = Registry()
- class MyPlugin:
- pass
- my = MyPlugin()
- registry.register(my)
- assert list(registry) == [my]
- my2 = MyPlugin()
- registry.register(my2)
- assert list(registry) == [my, my2]
-
- assert registry.isregistered(my)
- assert registry.isregistered(my2)
- registry.unregister(my)
- assert not registry.isregistered(my)
- assert list(registry) == [my2]
-
- def test_listattr(self):
- plugins = Registry()
- class api1:
- x = 41
- class api2:
- x = 42
- class api3:
- x = 43
- plugins.register(api1())
- plugins.register(api2())
- plugins.register(api3())
- l = list(plugins.listattr('x'))
- assert l == [41, 42, 43]
- l = list(plugins.listattr('x', reverse=True))
- assert l == [43, 42, 41]
-
- class api4:
- x = 44
- l = list(plugins.listattr('x', extra=(api4,)))
- assert l == [41,42,43,44]
- assert len(list(plugins)) == 3 # otherwise extra added
-
-def test_api_and_defaults():
- assert isinstance(py._com.comregistry, Registry)
-
-class TestHookRelay:
- def test_happypath(self):
- registry = Registry()
- class Api:
- def hello(self, arg):
- pass
-
- mcm = HookRelay(hookspecs=Api, registry=registry)
- assert hasattr(mcm, 'hello')
- assert repr(mcm.hello).find("hello") != -1
- class Plugin:
- def hello(self, arg):
- return arg + 1
- registry.register(Plugin())
- l = mcm.hello(arg=3)
- assert l == [4]
- assert not hasattr(mcm, 'world')
-
- def test_only_kwargs(self):
- registry = Registry()
- class Api:
- def hello(self, arg):
- pass
- mcm = HookRelay(hookspecs=Api, registry=registry)
- py.test.raises(TypeError, "mcm.hello(3)")
-
- def test_firstresult_definition(self):
- registry = Registry()
- class Api:
- def hello(self, arg): pass
- hello.firstresult = True
-
- mcm = HookRelay(hookspecs=Api, registry=registry)
- class Plugin:
- def hello(self, arg):
- return arg + 1
- registry.register(Plugin())
- res = mcm.hello(arg=3)
- assert res == 4
-
- def test_default_plugins(self):
- class Api: pass
- mcm = HookRelay(hookspecs=Api, registry=py._com.comregistry)
- assert mcm._registry == py._com.comregistry
-
- def test_hooks_extra_plugins(self):
- registry = Registry()
- class Api:
- def hello(self, arg):
- pass
- hookrelay = HookRelay(hookspecs=Api, registry=registry)
- hook_hello = hookrelay.hello
- class Plugin:
- def hello(self, arg):
- return arg + 1
- registry.register(Plugin())
- class Plugin2:
- def hello(self, arg):
- return arg + 2
- newhook = hookrelay._makecall("hello", extralookup=Plugin2())
- l = newhook(arg=3)
- assert l == [5, 4]
- l2 = hook_hello(arg=3)
- assert l2 == [4]
-
--- a/testing/pytest/test_config.py
+++ b/testing/pytest/test_config.py
@@ -264,9 +264,6 @@ def test_options_on_small_file_do_not_bl
['--traceconfig'], ['-v'], ['-v', '-v']):
runfiletest(opts + [path])
-def test_default_registry():
- assert py.test.config.pluginmanager.comregistry is py._com.comregistry
-
def test_ensuretemp():
# XXX test for deprecation
d1 = py.test.ensuretemp('hello')
--- a/py/plugin/pytest__pytest.py
+++ b/py/plugin/pytest__pytest.py
@@ -1,21 +1,17 @@
import py
+from py.impl.test.pluginmanager import HookRelay
+
def pytest_funcarg___pytest(request):
return PytestArg(request)
class PytestArg:
def __init__(self, request):
self.request = request
- self.monkeypatch = self.request.getfuncargvalue("monkeypatch")
- self.comregistry = py._com.Registry()
- self.monkeypatch.setattr(py._com, 'comregistry', self.comregistry)
- def gethookrecorder(self, hookspecs, registry=None):
- if registry is not None:
- self.monkeypatch.setattr(py._com, 'comregistry', registry)
- self.comregistry = registry
- hookrecorder = HookRecorder(self.comregistry)
- hookrecorder.start_recording(hookspecs)
+ def gethookrecorder(self, hook):
+ hookrecorder = HookRecorder(hook._registry)
+ hookrecorder.start_recording(hook._hookspecs)
self.request.addfinalizer(hookrecorder.finish_recording)
return hookrecorder
@@ -32,8 +28,8 @@ class ParsedCall:
return "<ParsedCall %r(**%r)>" %(self._name, d)
class HookRecorder:
- def __init__(self, comregistry):
- self._comregistry = comregistry
+ def __init__(self, registry):
+ self._registry = registry
self.calls = []
self._recorders = {}
@@ -46,12 +42,12 @@ class HookRecorder:
setattr(RecordCalls, name, self._makecallparser(method))
recorder = RecordCalls()
self._recorders[hookspecs] = recorder
- self._comregistry.register(recorder)
- self.hook = py._com.HookRelay(hookspecs, registry=self._comregistry)
+ self._registry.register(recorder)
+ self.hook = HookRelay(hookspecs, registry=self._registry)
def finish_recording(self):
for recorder in self._recorders.values():
- self._comregistry.unregister(recorder)
+ self._registry.unregister(recorder)
self._recorders.clear()
def _makecallparser(self, method):
--- a/py/__init__.py
+++ b/py/__init__.py
@@ -25,12 +25,6 @@ py.apipkg.initpkg(__name__, dict(
_pydirs = '.impl._metainfo:pydirs',
version = 'py:__version__', # backward compatibility
- _com = {
- 'Registry': '.impl._com:Registry',
- 'MultiCall': '.impl._com:MultiCall',
- 'comregistry': '.impl._com:comregistry',
- 'HookRelay': '.impl._com:HookRelay',
- },
cmdline = {
'pytest': '.impl.cmdline.pytest:main',
'pylookup': '.impl.cmdline.pylookup:main',
--- a/testing/pytest/dist/test_gwmanage.py
+++ b/testing/pytest/dist/test_gwmanage.py
@@ -8,17 +8,17 @@
import py
import os
from py.impl.test.dist.gwmanage import GatewayManager, HostRSync
+from py.impl.test.pluginmanager import HookRelay, Registry
from py.plugin import hookspec
import execnet
def pytest_funcarg__hookrecorder(request):
_pytest = request.getfuncargvalue('_pytest')
hook = request.getfuncargvalue('hook')
- return _pytest.gethookrecorder(hook._hookspecs, hook._registry)
+ return _pytest.gethookrecorder(hook)
def pytest_funcarg__hook(request):
- registry = py._com.Registry()
- return py._com.HookRelay(hookspec, registry)
+ return HookRelay(hookspec, Registry())
class TestGatewayManagerPopen:
def test_popen_no_default_chdir(self, hook):
@@ -90,7 +90,6 @@ class pytest_funcarg__mysetup:
tmp = request.getfuncargvalue('tmpdir')
self.source = tmp.mkdir("source")
self.dest = tmp.mkdir("dest")
- request.getfuncargvalue("_pytest") # to have patching of py._com.comregistry
class TestHRSync:
def test_hrsync_filter(self, mysetup):
--- a/py/impl/_com.py
+++ /dev/null
@@ -1,125 +0,0 @@
-"""
-py lib plugins and plugin call management
-"""
-
-import py
-import inspect
-
-__all__ = ['Registry', 'MultiCall', 'comregistry', 'HookRelay']
-
-class MultiCall:
- """ execute a call into multiple python functions/methods. """
-
- def __init__(self, methods, kwargs, firstresult=False):
- self.methods = methods[:]
- self.kwargs = kwargs.copy()
- self.kwargs['__multicall__'] = self
- self.results = []
- self.firstresult = firstresult
-
- def __repr__(self):
- status = "%d results, %d meths" % (len(self.results), len(self.methods))
- return "<MultiCall %s, kwargs=%r>" %(status, self.kwargs)
-
- def execute(self):
- while self.methods:
- method = self.methods.pop()
- kwargs = self.getkwargs(method)
- res = method(**kwargs)
- if res is not None:
- self.results.append(res)
- if self.firstresult:
- return res
- if not self.firstresult:
- return self.results
-
- def getkwargs(self, method):
- kwargs = {}
- for argname in varnames(method):
- try:
- kwargs[argname] = self.kwargs[argname]
- except KeyError:
- pass # might be optional param
- return kwargs
-
-def varnames(func):
- ismethod = inspect.ismethod(func)
- rawcode = py.code.getrawcode(func)
- try:
- return rawcode.co_varnames[ismethod:]
- except AttributeError:
- return ()
-
-class Registry:
- """
- Manage Plugins: register/unregister call calls to plugins.
- """
- def __init__(self, plugins=None):
- if plugins is None:
- plugins = []
- self._plugins = plugins
-
- def register(self, plugin):
- assert not isinstance(plugin, str)
- assert not plugin in self._plugins
- self._plugins.append(plugin)
-
- def unregister(self, plugin):
- self._plugins.remove(plugin)
-
- def isregistered(self, plugin):
- return plugin in self._plugins
-
- def __iter__(self):
- return iter(self._plugins)
-
- def listattr(self, attrname, plugins=None, extra=(), reverse=False):
- l = []
- if plugins is None:
- plugins = self._plugins
- candidates = list(plugins) + list(extra)
- for plugin in candidates:
- try:
- l.append(getattr(plugin, attrname))
- except AttributeError:
- continue
- if reverse:
- l.reverse()
- return l
-
-class HookRelay:
- def __init__(self, hookspecs, registry):
- self._hookspecs = hookspecs
- self._registry = registry
- for name, method in vars(hookspecs).items():
- if name[:1] != "_":
- setattr(self, name, self._makecall(name))
-
- def _makecall(self, name, extralookup=None):
- hookspecmethod = getattr(self._hookspecs, name)
- firstresult = getattr(hookspecmethod, 'firstresult', False)
- return HookCaller(self, name, firstresult=firstresult,
- extralookup=extralookup)
-
- def _getmethods(self, name, extralookup=()):
- return self._registry.listattr(name, extra=extralookup)
-
- def _performcall(self, name, multicall):
- return multicall.execute()
-
-class HookCaller:
- def __init__(self, hookrelay, name, firstresult, extralookup=None):
- self.hookrelay = hookrelay
- self.name = name
- self.firstresult = firstresult
- self.extralookup = extralookup and [extralookup] or ()
-
- def __repr__(self):
- return "<HookCaller %r>" %(self.name,)
-
- def __call__(self, **kwargs):
- methods = self.hookrelay._getmethods(self.name, self.extralookup)
- mc = MultiCall(methods, kwargs, firstresult=self.firstresult)
- return self.hookrelay._performcall(self.name, mc)
-
-comregistry = Registry([])
--- a/py/plugin/pytest_pytester.py
+++ b/py/plugin/pytest_pytester.py
@@ -22,11 +22,6 @@ def pytest_funcarg__testdir(request):
tmptestdir = TmpTestdir(request)
return tmptestdir
-def pytest_funcarg__reportrecorder(request):
- reprec = ReportRecorder(py._com.comregistry)
- request.addfinalizer(lambda: reprec.comregistry.unregister(reprec))
- return reprec
-
rex_outcome = re.compile("(\d+) (\w+)")
class RunResult:
def __init__(self, ret, outlines, errlines):
@@ -71,10 +66,10 @@ class TmpTestdir:
def __repr__(self):
return "<TmpTestdir %r>" % (self.tmpdir,)
- def Config(self, comregistry=None, topdir=None):
+ def Config(self, registry=None, topdir=None):
if topdir is None:
topdir = self.tmpdir.dirpath()
- return pytestConfig(comregistry, topdir=topdir)
+ return pytestConfig(registry, topdir=topdir)
def finalize(self):
for p in self._syspathremove:
@@ -89,19 +84,13 @@ class TmpTestdir:
del sys.modules[name]
def getreportrecorder(self, obj):
- if isinstance(obj, py._com.Registry):
- registry = obj
- elif hasattr(obj, 'comregistry'):
- registry = obj.comregistry
- elif hasattr(obj, 'pluginmanager'):
- registry = obj.pluginmanager.comregistry
- elif hasattr(obj, 'config'):
- registry = obj.config.pluginmanager.comregistry
- else:
- raise ValueError("obj %r provides no comregistry" %(obj,))
- assert isinstance(registry, py._com.Registry)
- reprec = ReportRecorder(registry)
- reprec.hookrecorder = self._pytest.gethookrecorder(hookspec, registry)
+ if hasattr(obj, 'config'):
+ obj = obj.config
+ if hasattr(obj, 'hook'):
+ obj = obj.hook
+ assert hasattr(obj, '_hookspecs'), obj
+ reprec = ReportRecorder(obj)
+ reprec.hookrecorder = self._pytest.gethookrecorder(obj)
reprec.hook = reprec.hookrecorder.hook
return reprec
@@ -334,9 +323,10 @@ class PseudoPlugin:
self.__dict__.update(vars)
class ReportRecorder(object):
- def __init__(self, comregistry):
- self.comregistry = comregistry
- comregistry.register(self)
+ def __init__(self, hook):
+ self.hook = hook
+ self.registry = hook._registry
+ self.registry.register(self)
def getcall(self, name):
return self.hookrecorder.getcall(name)
@@ -401,7 +391,7 @@ class ReportRecorder(object):
self.hookrecorder.calls[:] = []
def unregister(self):
- self.comregistry.unregister(self)
+ self.registry.unregister(self)
self.hookrecorder.finish_recording()
class LineComp:
--- a/testing/plugin/test_pytest__pytest.py
+++ b/testing/plugin/test_pytest__pytest.py
@@ -1,9 +1,10 @@
import py
+import sys
from py.plugin.pytest__pytest import HookRecorder
+from py.impl.test.pluginmanager import Registry
def test_hookrecorder_basic():
- comregistry = py._com.Registry()
- rec = HookRecorder(comregistry)
+ rec = HookRecorder(Registry())
class ApiClass:
def xyz(self, arg):
pass
@@ -15,9 +16,7 @@ def test_hookrecorder_basic():
py.test.raises(ValueError, "rec.popcall('abc')")
def test_hookrecorder_basic_no_args_hook():
- import sys
- comregistry = py._com.Registry()
- rec = HookRecorder(comregistry)
+ rec = HookRecorder(Registry())
apimod = type(sys)('api')
def xyz():
pass
@@ -27,23 +26,20 @@ def test_hookrecorder_basic_no_args_hook
call = rec.popcall("xyz")
assert call._name == "xyz"
-reg = py._com.comregistry
-def test_functional_default(testdir, _pytest):
- assert _pytest.comregistry == py._com.comregistry
- assert _pytest.comregistry != reg
-
def test_functional(testdir, linecomp):
reprec = testdir.inline_runsource("""
import py
+ from py.impl.test.pluginmanager import HookRelay, Registry
pytest_plugins="_pytest"
def test_func(_pytest):
class ApiClass:
def xyz(self, arg): pass
- rec = _pytest.gethookrecorder(ApiClass)
+ hook = HookRelay(ApiClass, Registry())
+ rec = _pytest.gethookrecorder(hook)
class Plugin:
def xyz(self, arg):
return arg + 1
- rec._comregistry.register(Plugin())
+ rec._registry.register(Plugin())
res = rec.hook.xyz(arg=41)
assert res == [42]
""")
--- a/testing/plugin/test_pytest_pytester.py
+++ b/testing/plugin/test_pytest_pytester.py
@@ -2,10 +2,9 @@ import py
from py.plugin.pytest_pytester import LineMatcher, LineComp
def test_reportrecorder(testdir):
- registry = py._com.Registry()
- recorder = testdir.getreportrecorder(registry)
+ item = testdir.getitem("def test_func(): pass")
+ recorder = testdir.getreportrecorder(item.config)
assert not recorder.getfailures()
- item = testdir.getitem("def test_func(): pass")
class rep:
excinfo = None
passed = False
--- a/contrib/runtesthelper.py
+++ b/contrib/runtesthelper.py
@@ -14,6 +14,5 @@ def pytest(argv=None):
except SystemExit:
pass
# we need to reset the global py.test.config object
- py._com.comregistry = py._com.comregistry.__class__([])
py.test.config = py.test.config.__class__(
- pluginmanager=py.test._PluginManager(py._com.comregistry))
+ pluginmanager=py.test._PluginManager())
--- a/testing/pytest/test_pickling.py
+++ b/testing/pytest/test_pickling.py
@@ -3,14 +3,11 @@ import pickle
def setglobals(request):
oldconfig = py.test.config
- oldcom = py._com.comregistry
print("setting py.test.config to None")
py.test.config = None
- py._com.comregistry = py._com.Registry()
def resetglobals():
py.builtin.print_("setting py.test.config to", oldconfig)
py.test.config = oldconfig
- py._com.comregistry = oldcom
request.addfinalizer(resetglobals)
def pytest_funcarg__testdir(request):
@@ -190,7 +187,7 @@ def test_config__setstate__wired_correct
from py.impl.test.dist.mypickle import PickleChannel
channel = PickleChannel(channel)
config = channel.receive()
- assert py.test.config.pluginmanager.comregistry == py._com.comregistry, "comregistry wrong"
+ assert py.test.config == config
""")
channel = PickleChannel(channel)
config = testdir.parseconfig()
--- a/py/impl/test/config.py
+++ b/py/impl/test/config.py
@@ -107,9 +107,8 @@ class Config(object):
# warning global side effects:
# * registering to py lib plugins
# * setting py.test.config
- py._com.comregistry = py._com.Registry()
self.__init__(
- pluginmanager=py.test._PluginManager(py._com.comregistry),
+ pluginmanager=py.test._PluginManager(),
topdir=py.path.local(),
)
# we have to set py.test.config because preparse()
@@ -310,6 +309,6 @@ def gettopdir(args):
# this is the one per-process instance of py.test configuration
config_per_process = Config(
- pluginmanager=py.test._PluginManager(py._com.comregistry)
+ pluginmanager=py.test._PluginManager()
)
--- a/py/impl/test/pluginmanager.py
+++ b/py/impl/test/pluginmanager.py
@@ -2,6 +2,7 @@
managing loading and interacting with pytest plugins.
"""
import py
+import inspect
from py.plugin import hookspec
from py.impl.test.outcome import Skipped
@@ -16,15 +17,10 @@ def check_old_use(mod, modname):
class PluginManager(object):
class Error(Exception):
"""signals a plugin specific error."""
- def __init__(self, comregistry=None):
- if comregistry is None:
- comregistry = py._com.Registry()
- self.comregistry = comregistry
+ def __init__(self):
+ self.registry = Registry()
self._name2plugin = {}
-
- self.hook = py._com.HookRelay(
- hookspecs=hookspec,
- registry=self.comregistry)
+ self.hook = HookRelay(hookspecs=hookspec, registry=self.registry)
self.register(self)
for spec in default_plugins:
self.import_plugin(spec)
@@ -39,18 +35,18 @@ class PluginManager(object):
def register(self, plugin, name=None):
assert not self.isregistered(plugin), plugin
- assert not self.comregistry.isregistered(plugin), plugin
+ assert not self.registry.isregistered(plugin), plugin
name = self._getpluginname(plugin, name)
if name in self._name2plugin:
return False
self._name2plugin[name] = plugin
self.hook.pytest_plugin_registered(manager=self, plugin=plugin)
- self.comregistry.register(plugin)
+ self.registry.register(plugin)
return True
def unregister(self, plugin):
self.hook.pytest_plugin_unregistered(plugin=plugin)
- self.comregistry.unregister(plugin)
+ self.registry.unregister(plugin)
for name, value in list(self._name2plugin.items()):
if value == plugin:
del self._name2plugin[name]
@@ -63,7 +59,7 @@ class PluginManager(object):
return True
def getplugins(self):
- return list(self.comregistry)
+ return list(self.registry)
def getplugin(self, name):
try:
@@ -143,7 +139,7 @@ class PluginManager(object):
#
#
def listattr(self, attrname, plugins=None, extra=()):
- return self.comregistry.listattr(attrname, plugins=plugins, extra=extra)
+ return self.registry.listattr(attrname, plugins=plugins, extra=extra)
def notify_exception(self, excinfo=None):
if excinfo is None:
@@ -153,8 +149,8 @@ class PluginManager(object):
def do_addoption(self, parser):
mname = "pytest_addoption"
- methods = self.comregistry.listattr(mname, reverse=True)
- mc = py._com.MultiCall(methods, {'parser': parser})
+ methods = self.registry.listattr(mname, reverse=True)
+ mc = MultiCall(methods, {'parser': parser})
mc.execute()
def pytest_plugin_registered(self, plugin):
@@ -168,7 +164,7 @@ class PluginManager(object):
{'config': self._config})
def call_plugin(self, plugin, methname, kwargs):
- return py._com.MultiCall(
+ return MultiCall(
methods=self.listattr(methname, plugins=[plugin]),
kwargs=kwargs, firstresult=True).execute()
@@ -210,3 +206,118 @@ def importplugin(importspec):
+class MultiCall:
+ """ execute a call into multiple python functions/methods. """
+
+ def __init__(self, methods, kwargs, firstresult=False):
+ self.methods = methods[:]
+ self.kwargs = kwargs.copy()
+ self.kwargs['__multicall__'] = self
+ self.results = []
+ self.firstresult = firstresult
+
+ def __repr__(self):
+ status = "%d results, %d meths" % (len(self.results), len(self.methods))
+ return "<MultiCall %s, kwargs=%r>" %(status, self.kwargs)
+
+ def execute(self):
+ while self.methods:
+ method = self.methods.pop()
+ kwargs = self.getkwargs(method)
+ res = method(**kwargs)
+ if res is not None:
+ self.results.append(res)
+ if self.firstresult:
+ return res
+ if not self.firstresult:
+ return self.results
+
+ def getkwargs(self, method):
+ kwargs = {}
+ for argname in varnames(method):
+ try:
+ kwargs[argname] = self.kwargs[argname]
+ except KeyError:
+ pass # might be optional param
+ return kwargs
+
+def varnames(func):
+ ismethod = inspect.ismethod(func)
+ rawcode = py.code.getrawcode(func)
+ try:
+ return rawcode.co_varnames[ismethod:]
+ except AttributeError:
+ return ()
+
+class Registry:
+ """
+ Manage Plugins: register/unregister call calls to plugins.
+ """
+ def __init__(self, plugins=None):
+ if plugins is None:
+ plugins = []
+ self._plugins = plugins
+
+ def register(self, plugin):
+ assert not isinstance(plugin, str)
+ assert not plugin in self._plugins
+ self._plugins.append(plugin)
+
+ def unregister(self, plugin):
+ self._plugins.remove(plugin)
+
+ def isregistered(self, plugin):
+ return plugin in self._plugins
+
+ def __iter__(self):
+ return iter(self._plugins)
+
+ def listattr(self, attrname, plugins=None, extra=(), reverse=False):
+ l = []
+ if plugins is None:
+ plugins = self._plugins
+ candidates = list(plugins) + list(extra)
+ for plugin in candidates:
+ try:
+ l.append(getattr(plugin, attrname))
+ except AttributeError:
+ continue
+ if reverse:
+ l.reverse()
+ return l
+
+class HookRelay:
+ def __init__(self, hookspecs, registry):
+ self._hookspecs = hookspecs
+ self._registry = registry
+ for name, method in vars(hookspecs).items():
+ if name[:1] != "_":
+ setattr(self, name, self._makecall(name))
+
+ def _makecall(self, name, extralookup=None):
+ hookspecmethod = getattr(self._hookspecs, name)
+ firstresult = getattr(hookspecmethod, 'firstresult', False)
+ return HookCaller(self, name, firstresult=firstresult,
+ extralookup=extralookup)
+
+ def _getmethods(self, name, extralookup=()):
+ return self._registry.listattr(name, extra=extralookup)
+
+ def _performcall(self, name, multicall):
+ return multicall.execute()
+
+class HookCaller:
+ def __init__(self, hookrelay, name, firstresult, extralookup=None):
+ self.hookrelay = hookrelay
+ self.name = name
+ self.firstresult = firstresult
+ self.extralookup = extralookup and [extralookup] or ()
+
+ def __repr__(self):
+ return "<HookCaller %r>" %(self.name,)
+
+ def __call__(self, **kwargs):
+ methods = self.hookrelay._getmethods(self.name, self.extralookup)
+ mc = MultiCall(methods, kwargs, firstresult=self.firstresult)
+ return self.hookrelay._performcall(self.name, mc)
+
More information about the pytest-commit
mailing list