[py-svn] py-trunk commit 42a36fa714e6: remove/reduce internal global state: py._com.registry is now fully contained and always instantiated from the py.test PluginManager class.

commits-noreply at bitbucket.org commits-noreply at bitbucket.org
Tue Dec 29 12:37:46 CET 2009


# HG changeset patch -- Bitbucket.org
# Project py-trunk
# URL http://bitbucket.org/hpk42/py-trunk/overview/
# User holger krekel <holger at merlinux.eu>
# Date 1262086577 -3600
# Node ID 42a36fa714e665d79b69aa9aa8165871b6f7af2d
# Parent 756383d5451058b00a35aa42d815c9f75fccfdae
remove/reduce internal global state: py._com.registry is now fully contained and always instantiated from the py.test PluginManager class.

--- a/testing/pytest/test_pluginmanager.py
+++ b/testing/pytest/test_pluginmanager.py
@@ -1,5 +1,7 @@
 import py, os
 from py.impl.test.pluginmanager import PluginManager, canonical_importname
+from py.impl.test.pluginmanager import Registry, MultiCall, HookRelay, varnames
+
 
 class TestBootstrapping:
     def test_consider_env_fails_to_import(self, monkeypatch):
@@ -278,3 +280,184 @@ def test_namespace_has_default_and_env_p
     """)
     result = testdir.runpython(p)
     assert result.ret == 0
+
+def test_varnames():
+    def f(x):
+        pass
+    class A:
+        def f(self, y):
+            pass
+    assert varnames(f) == ("x",)
+    assert varnames(A().f) == ('y',)
+    
+class TestMultiCall:
+    def test_uses_copy_of_methods(self):
+        l = [lambda: 42]
+        mc = MultiCall(l, {})
+        repr(mc)
+        l[:] = []
+        res = mc.execute()
+        return res == 42
+
+    def test_call_passing(self):
+        class P1:
+            def m(self, __multicall__, x):
+                assert len(__multicall__.results) == 1
+                assert not __multicall__.methods
+                return 17
+
+        class P2:
+            def m(self, __multicall__, x):
+                assert __multicall__.results == []
+                assert __multicall__.methods
+                return 23 
+               
+        p1 = P1() 
+        p2 = P2() 
+        multicall = MultiCall([p1.m, p2.m], {'x': 23})
+        assert "23" in repr(multicall)
+        reslist = multicall.execute()
+        assert len(reslist) == 2
+        # ensure reversed order 
+        assert reslist == [23, 17]
+
+    def test_keyword_args(self):
+        def f(x): 
+            return x + 1
+        class A:
+            def f(self, x, y):
+                return x + y
+        multicall = MultiCall([f, A().f], dict(x=23, y=24))
+        assert "'x': 23" in repr(multicall)
+        assert "'y': 24" in repr(multicall)
+        reslist = multicall.execute()
+        assert reslist == [24+23, 24]
+        assert "2 results" in repr(multicall)
+
+    def test_keywords_call_error(self):
+        multicall = MultiCall([lambda x: x], {})
+        py.test.raises(TypeError, "multicall.execute()")
+
+    def test_call_subexecute(self):
+        def m(__multicall__):
+            subresult = __multicall__.execute()
+            return subresult + 1
+
+        def n():
+            return 1
+
+        call = MultiCall([n, m], {}, firstresult=True)
+        res = call.execute()
+        assert res == 2
+
+    def test_call_none_is_no_result(self):
+        def m1():
+            return 1
+        def m2():
+            return None
+        res = MultiCall([m1, m2], {}, firstresult=True).execute()
+        assert res == 1
+        res = MultiCall([m1, m2], {}).execute()
+        assert res == [1]
+
+class TestRegistry:
+
+    def test_register(self):
+        registry = Registry()
+        class MyPlugin:
+            pass
+        my = MyPlugin()
+        registry.register(my)
+        assert list(registry) == [my]
+        my2 = MyPlugin()
+        registry.register(my2)
+        assert list(registry) == [my, my2]
+
+        assert registry.isregistered(my)
+        assert registry.isregistered(my2)
+        registry.unregister(my)
+        assert not registry.isregistered(my)
+        assert list(registry) == [my2]
+
+    def test_listattr(self):
+        plugins = Registry()
+        class api1:
+            x = 41
+        class api2:
+            x = 42
+        class api3:
+            x = 43
+        plugins.register(api1())
+        plugins.register(api2())
+        plugins.register(api3())
+        l = list(plugins.listattr('x'))
+        assert l == [41, 42, 43]
+        l = list(plugins.listattr('x', reverse=True))
+        assert l == [43, 42, 41]
+
+        class api4: 
+            x = 44
+        l = list(plugins.listattr('x', extra=(api4,)))
+        assert l == [41,42,43,44]
+        assert len(list(plugins)) == 3  # otherwise extra added
+
+class TestHookRelay:
+    def test_happypath(self):
+        registry = Registry()
+        class Api:
+            def hello(self, arg):
+                pass
+
+        mcm = HookRelay(hookspecs=Api, registry=registry)
+        assert hasattr(mcm, 'hello')
+        assert repr(mcm.hello).find("hello") != -1
+        class Plugin:
+            def hello(self, arg):
+                return arg + 1
+        registry.register(Plugin())
+        l = mcm.hello(arg=3)
+        assert l == [4]
+        assert not hasattr(mcm, 'world')
+
+    def test_only_kwargs(self):
+        registry = Registry()
+        class Api:
+            def hello(self, arg):
+                pass
+        mcm = HookRelay(hookspecs=Api, registry=registry)
+        py.test.raises(TypeError, "mcm.hello(3)")
+
+    def test_firstresult_definition(self):
+        registry = Registry()
+        class Api:
+            def hello(self, arg): pass
+            hello.firstresult = True
+
+        mcm = HookRelay(hookspecs=Api, registry=registry)
+        class Plugin:
+            def hello(self, arg):
+                return arg + 1
+        registry.register(Plugin())
+        res = mcm.hello(arg=3)
+        assert res == 4
+
+    def test_hooks_extra_plugins(self):
+        registry = Registry()
+        class Api:
+            def hello(self, arg):
+                pass
+        hookrelay = HookRelay(hookspecs=Api, registry=registry)
+        hook_hello = hookrelay.hello
+        class Plugin:
+            def hello(self, arg):
+                return arg + 1
+        registry.register(Plugin())
+        class Plugin2:
+            def hello(self, arg):
+                return arg + 2
+        newhook = hookrelay._makecall("hello", extralookup=Plugin2())
+        l = newhook(arg=3)
+        assert l == [5, 4]
+        l2 = hook_hello(arg=3)
+        assert l2 == [4]
+        

--- a/testing/root/test_com.py
+++ b/testing/root/test_com.py
@@ -1,193 +1,3 @@
 
 import py
 import os
-from py.impl._com import Registry, MultiCall, HookRelay, varnames
-
-def test_varnames():
-    def f(x):
-        pass
-    class A:
-        def f(self, y):
-            pass
-    assert varnames(f) == ("x",)
-    assert varnames(A().f) == ('y',)
-    
-class TestMultiCall:
-    def test_uses_copy_of_methods(self):
-        l = [lambda: 42]
-        mc = MultiCall(l, {})
-        repr(mc)
-        l[:] = []
-        res = mc.execute()
-        return res == 42
-
-    def test_call_passing(self):
-        class P1:
-            def m(self, __multicall__, x):
-                assert len(__multicall__.results) == 1
-                assert not __multicall__.methods
-                return 17
-
-        class P2:
-            def m(self, __multicall__, x):
-                assert __multicall__.results == []
-                assert __multicall__.methods
-                return 23 
-               
-        p1 = P1() 
-        p2 = P2() 
-        multicall = MultiCall([p1.m, p2.m], {'x': 23})
-        assert "23" in repr(multicall)
-        reslist = multicall.execute()
-        assert len(reslist) == 2
-        # ensure reversed order 
-        assert reslist == [23, 17]
-
-    def test_keyword_args(self):
-        def f(x): 
-            return x + 1
-        class A:
-            def f(self, x, y):
-                return x + y
-        multicall = MultiCall([f, A().f], dict(x=23, y=24))
-        assert "'x': 23" in repr(multicall)
-        assert "'y': 24" in repr(multicall)
-        reslist = multicall.execute()
-        assert reslist == [24+23, 24]
-        assert "2 results" in repr(multicall)
-
-    def test_keywords_call_error(self):
-        multicall = MultiCall([lambda x: x], {})
-        py.test.raises(TypeError, "multicall.execute()")
-
-    def test_call_subexecute(self):
-        def m(__multicall__):
-            subresult = __multicall__.execute()
-            return subresult + 1
-
-        def n():
-            return 1
-
-        call = MultiCall([n, m], {}, firstresult=True)
-        res = call.execute()
-        assert res == 2
-
-    def test_call_none_is_no_result(self):
-        def m1():
-            return 1
-        def m2():
-            return None
-        res = MultiCall([m1, m2], {}, firstresult=True).execute()
-        assert res == 1
-        res = MultiCall([m1, m2], {}).execute()
-        assert res == [1]
-
-class TestRegistry:
-
-    def test_register(self):
-        registry = Registry()
-        class MyPlugin:
-            pass
-        my = MyPlugin()
-        registry.register(my)
-        assert list(registry) == [my]
-        my2 = MyPlugin()
-        registry.register(my2)
-        assert list(registry) == [my, my2]
-
-        assert registry.isregistered(my)
-        assert registry.isregistered(my2)
-        registry.unregister(my)
-        assert not registry.isregistered(my)
-        assert list(registry) == [my2]
-
-    def test_listattr(self):
-        plugins = Registry()
-        class api1:
-            x = 41
-        class api2:
-            x = 42
-        class api3:
-            x = 43
-        plugins.register(api1())
-        plugins.register(api2())
-        plugins.register(api3())
-        l = list(plugins.listattr('x'))
-        assert l == [41, 42, 43]
-        l = list(plugins.listattr('x', reverse=True))
-        assert l == [43, 42, 41]
-
-        class api4: 
-            x = 44
-        l = list(plugins.listattr('x', extra=(api4,)))
-        assert l == [41,42,43,44]
-        assert len(list(plugins)) == 3  # otherwise extra added
-
-def test_api_and_defaults():
-    assert isinstance(py._com.comregistry, Registry)
-
-class TestHookRelay:
-    def test_happypath(self):
-        registry = Registry()
-        class Api:
-            def hello(self, arg):
-                pass
-
-        mcm = HookRelay(hookspecs=Api, registry=registry)
-        assert hasattr(mcm, 'hello')
-        assert repr(mcm.hello).find("hello") != -1
-        class Plugin:
-            def hello(self, arg):
-                return arg + 1
-        registry.register(Plugin())
-        l = mcm.hello(arg=3)
-        assert l == [4]
-        assert not hasattr(mcm, 'world')
-
-    def test_only_kwargs(self):
-        registry = Registry()
-        class Api:
-            def hello(self, arg):
-                pass
-        mcm = HookRelay(hookspecs=Api, registry=registry)
-        py.test.raises(TypeError, "mcm.hello(3)")
-
-    def test_firstresult_definition(self):
-        registry = Registry()
-        class Api:
-            def hello(self, arg): pass
-            hello.firstresult = True
-
-        mcm = HookRelay(hookspecs=Api, registry=registry)
-        class Plugin:
-            def hello(self, arg):
-                return arg + 1
-        registry.register(Plugin())
-        res = mcm.hello(arg=3)
-        assert res == 4
-
-    def test_default_plugins(self):
-        class Api: pass 
-        mcm = HookRelay(hookspecs=Api, registry=py._com.comregistry)
-        assert mcm._registry == py._com.comregistry
-
-    def test_hooks_extra_plugins(self):
-        registry = Registry()
-        class Api:
-            def hello(self, arg):
-                pass
-        hookrelay = HookRelay(hookspecs=Api, registry=registry)
-        hook_hello = hookrelay.hello
-        class Plugin:
-            def hello(self, arg):
-                return arg + 1
-        registry.register(Plugin())
-        class Plugin2:
-            def hello(self, arg):
-                return arg + 2
-        newhook = hookrelay._makecall("hello", extralookup=Plugin2())
-        l = newhook(arg=3)
-        assert l == [5, 4]
-        l2 = hook_hello(arg=3)
-        assert l2 == [4]
-        

--- a/testing/pytest/test_config.py
+++ b/testing/pytest/test_config.py
@@ -264,9 +264,6 @@ def test_options_on_small_file_do_not_bl
                  ['--traceconfig'], ['-v'], ['-v', '-v']):
         runfiletest(opts + [path])
 
-def test_default_registry():
-    assert py.test.config.pluginmanager.comregistry is py._com.comregistry
-   
 def test_ensuretemp():
     # XXX test for deprecation
     d1 = py.test.ensuretemp('hello') 

--- a/py/plugin/pytest__pytest.py
+++ b/py/plugin/pytest__pytest.py
@@ -1,21 +1,17 @@
 import py
 
+from py.impl.test.pluginmanager import HookRelay
+
 def pytest_funcarg___pytest(request):
     return PytestArg(request)
 
 class PytestArg:
     def __init__(self, request):
         self.request = request 
-        self.monkeypatch = self.request.getfuncargvalue("monkeypatch")
-        self.comregistry = py._com.Registry()
-        self.monkeypatch.setattr(py._com, 'comregistry', self.comregistry)
 
-    def gethookrecorder(self, hookspecs, registry=None):
-        if registry is not None:
-            self.monkeypatch.setattr(py._com, 'comregistry', registry) 
-            self.comregistry = registry 
-        hookrecorder = HookRecorder(self.comregistry) 
-        hookrecorder.start_recording(hookspecs)
+    def gethookrecorder(self, hook):
+        hookrecorder = HookRecorder(hook._registry)
+        hookrecorder.start_recording(hook._hookspecs)
         self.request.addfinalizer(hookrecorder.finish_recording)
         return hookrecorder 
 
@@ -32,8 +28,8 @@ class ParsedCall:
         return "<ParsedCall %r(**%r)>" %(self._name, d)
 
 class HookRecorder:
-    def __init__(self, comregistry):
-        self._comregistry = comregistry
+    def __init__(self, registry):
+        self._registry = registry
         self.calls = []
         self._recorders = {}
         
@@ -46,12 +42,12 @@ class HookRecorder:
                 setattr(RecordCalls, name, self._makecallparser(method))
         recorder = RecordCalls()
         self._recorders[hookspecs] = recorder
-        self._comregistry.register(recorder)
-        self.hook = py._com.HookRelay(hookspecs, registry=self._comregistry)
+        self._registry.register(recorder)
+        self.hook = HookRelay(hookspecs, registry=self._registry)
 
     def finish_recording(self):
         for recorder in self._recorders.values():
-            self._comregistry.unregister(recorder)
+            self._registry.unregister(recorder)
         self._recorders.clear()
 
     def _makecallparser(self, method):

--- a/py/__init__.py
+++ b/py/__init__.py
@@ -25,12 +25,6 @@ py.apipkg.initpkg(__name__, dict(
     _pydirs = '.impl._metainfo:pydirs',
     version = 'py:__version__', # backward compatibility
 
-    _com = {
-        'Registry': '.impl._com:Registry',
-        'MultiCall':  '.impl._com:MultiCall',
-        'comregistry': '.impl._com:comregistry',
-        'HookRelay': '.impl._com:HookRelay',
-    },
     cmdline = {
         'pytest':     '.impl.cmdline.pytest:main',
         'pylookup':   '.impl.cmdline.pylookup:main',

--- a/testing/pytest/dist/test_gwmanage.py
+++ b/testing/pytest/dist/test_gwmanage.py
@@ -8,17 +8,17 @@
 import py
 import os
 from py.impl.test.dist.gwmanage import GatewayManager, HostRSync
+from py.impl.test.pluginmanager import HookRelay, Registry
 from py.plugin import hookspec
 import execnet
 
 def pytest_funcarg__hookrecorder(request):
     _pytest = request.getfuncargvalue('_pytest')
     hook = request.getfuncargvalue('hook')
-    return _pytest.gethookrecorder(hook._hookspecs, hook._registry)
+    return _pytest.gethookrecorder(hook)
 
 def pytest_funcarg__hook(request):
-    registry = py._com.Registry()
-    return py._com.HookRelay(hookspec, registry)
+    return HookRelay(hookspec, Registry())
 
 class TestGatewayManagerPopen:
     def test_popen_no_default_chdir(self, hook):
@@ -90,7 +90,6 @@ class pytest_funcarg__mysetup:
         tmp = request.getfuncargvalue('tmpdir')
         self.source = tmp.mkdir("source")
         self.dest = tmp.mkdir("dest")
-        request.getfuncargvalue("_pytest") # to have patching of py._com.comregistry
 
 class TestHRSync:
     def test_hrsync_filter(self, mysetup):

--- a/py/impl/_com.py
+++ /dev/null
@@ -1,125 +0,0 @@
-"""
-py lib plugins and plugin call management
-"""
-
-import py
-import inspect
-
-__all__ = ['Registry', 'MultiCall', 'comregistry', 'HookRelay']
-
-class MultiCall:
-    """ execute a call into multiple python functions/methods.  """
-
-    def __init__(self, methods, kwargs, firstresult=False):
-        self.methods = methods[:]
-        self.kwargs = kwargs.copy()
-        self.kwargs['__multicall__'] = self
-        self.results = []
-        self.firstresult = firstresult
-
-    def __repr__(self):
-        status = "%d results, %d meths" % (len(self.results), len(self.methods))
-        return "<MultiCall %s, kwargs=%r>" %(status, self.kwargs)
-
-    def execute(self):
-        while self.methods:
-            method = self.methods.pop()
-            kwargs = self.getkwargs(method)
-            res = method(**kwargs)
-            if res is not None:
-                self.results.append(res) 
-                if self.firstresult:
-                    return res
-        if not self.firstresult:
-            return self.results 
-
-    def getkwargs(self, method):
-        kwargs = {}
-        for argname in varnames(method):
-            try:
-                kwargs[argname] = self.kwargs[argname]
-            except KeyError:
-                pass # might be optional param
-        return kwargs 
-
-def varnames(func):
-    ismethod = inspect.ismethod(func)
-    rawcode = py.code.getrawcode(func)
-    try:
-        return rawcode.co_varnames[ismethod:]
-    except AttributeError:
-        return ()
-
-class Registry:
-    """
-        Manage Plugins: register/unregister call calls to plugins. 
-    """
-    def __init__(self, plugins=None):
-        if plugins is None:
-            plugins = []
-        self._plugins = plugins
-
-    def register(self, plugin):
-        assert not isinstance(plugin, str)
-        assert not plugin in self._plugins
-        self._plugins.append(plugin)
-
-    def unregister(self, plugin):
-        self._plugins.remove(plugin)
-
-    def isregistered(self, plugin):
-        return plugin in self._plugins 
-
-    def __iter__(self):
-        return iter(self._plugins)
-
-    def listattr(self, attrname, plugins=None, extra=(), reverse=False):
-        l = []
-        if plugins is None:
-            plugins = self._plugins
-        candidates = list(plugins) + list(extra)
-        for plugin in candidates:
-            try:
-                l.append(getattr(plugin, attrname))
-            except AttributeError:
-                continue 
-        if reverse:
-            l.reverse()
-        return l
-
-class HookRelay: 
-    def __init__(self, hookspecs, registry):
-        self._hookspecs = hookspecs
-        self._registry = registry
-        for name, method in vars(hookspecs).items():
-            if name[:1] != "_":
-                setattr(self, name, self._makecall(name))
-
-    def _makecall(self, name, extralookup=None):
-        hookspecmethod = getattr(self._hookspecs, name)
-        firstresult = getattr(hookspecmethod, 'firstresult', False)
-        return HookCaller(self, name, firstresult=firstresult,
-            extralookup=extralookup)
-
-    def _getmethods(self, name, extralookup=()):
-        return self._registry.listattr(name, extra=extralookup)
-
-    def _performcall(self, name, multicall):
-        return multicall.execute()
-        
-class HookCaller:
-    def __init__(self, hookrelay, name, firstresult, extralookup=None):
-        self.hookrelay = hookrelay 
-        self.name = name 
-        self.firstresult = firstresult 
-        self.extralookup = extralookup and [extralookup] or ()
-
-    def __repr__(self):
-        return "<HookCaller %r>" %(self.name,)
-
-    def __call__(self, **kwargs):
-        methods = self.hookrelay._getmethods(self.name, self.extralookup)
-        mc = MultiCall(methods, kwargs, firstresult=self.firstresult)
-        return self.hookrelay._performcall(self.name, mc)
-   
-comregistry = Registry([])

--- a/py/plugin/pytest_pytester.py
+++ b/py/plugin/pytest_pytester.py
@@ -22,11 +22,6 @@ def pytest_funcarg__testdir(request):
     tmptestdir = TmpTestdir(request)
     return tmptestdir
 
-def pytest_funcarg__reportrecorder(request):
-    reprec = ReportRecorder(py._com.comregistry)
-    request.addfinalizer(lambda: reprec.comregistry.unregister(reprec))
-    return reprec
-
 rex_outcome = re.compile("(\d+) (\w+)")
 class RunResult:
     def __init__(self, ret, outlines, errlines):
@@ -71,10 +66,10 @@ class TmpTestdir:
     def __repr__(self):
         return "<TmpTestdir %r>" % (self.tmpdir,)
 
-    def Config(self, comregistry=None, topdir=None):
+    def Config(self, registry=None, topdir=None):
         if topdir is None:
             topdir = self.tmpdir.dirpath()
-        return pytestConfig(comregistry, topdir=topdir)
+        return pytestConfig(registry, topdir=topdir)
 
     def finalize(self):
         for p in self._syspathremove:
@@ -89,19 +84,13 @@ class TmpTestdir:
                     del sys.modules[name]
 
     def getreportrecorder(self, obj):
-        if isinstance(obj, py._com.Registry):
-            registry = obj
-        elif hasattr(obj, 'comregistry'):
-            registry = obj.comregistry
-        elif hasattr(obj, 'pluginmanager'):
-            registry = obj.pluginmanager.comregistry
-        elif hasattr(obj, 'config'):
-            registry = obj.config.pluginmanager.comregistry
-        else:
-            raise ValueError("obj %r provides no comregistry" %(obj,))
-        assert isinstance(registry, py._com.Registry)
-        reprec = ReportRecorder(registry)
-        reprec.hookrecorder = self._pytest.gethookrecorder(hookspec, registry)
+        if hasattr(obj, 'config'):
+            obj = obj.config
+        if hasattr(obj, 'hook'):
+            obj = obj.hook
+        assert hasattr(obj, '_hookspecs'), obj
+        reprec = ReportRecorder(obj)
+        reprec.hookrecorder = self._pytest.gethookrecorder(obj)
         reprec.hook = reprec.hookrecorder.hook
         return reprec
 
@@ -334,9 +323,10 @@ class PseudoPlugin:
         self.__dict__.update(vars) 
 
 class ReportRecorder(object):
-    def __init__(self, comregistry):
-        self.comregistry = comregistry
-        comregistry.register(self)
+    def __init__(self, hook):
+        self.hook = hook
+        self.registry = hook._registry
+        self.registry.register(self)
 
     def getcall(self, name):
         return self.hookrecorder.getcall(name)
@@ -401,7 +391,7 @@ class ReportRecorder(object):
         self.hookrecorder.calls[:] = []
 
     def unregister(self):
-        self.comregistry.unregister(self)
+        self.registry.unregister(self)
         self.hookrecorder.finish_recording()
 
 class LineComp:

--- a/testing/plugin/test_pytest__pytest.py
+++ b/testing/plugin/test_pytest__pytest.py
@@ -1,9 +1,10 @@
 import py
+import sys
 from py.plugin.pytest__pytest import HookRecorder
+from py.impl.test.pluginmanager import Registry
 
 def test_hookrecorder_basic():
-    comregistry = py._com.Registry() 
-    rec = HookRecorder(comregistry)
+    rec = HookRecorder(Registry())
     class ApiClass:
         def xyz(self, arg):
             pass
@@ -15,9 +16,7 @@ def test_hookrecorder_basic():
     py.test.raises(ValueError, "rec.popcall('abc')")
 
 def test_hookrecorder_basic_no_args_hook():
-    import sys
-    comregistry = py._com.Registry() 
-    rec = HookRecorder(comregistry)
+    rec = HookRecorder(Registry())
     apimod = type(sys)('api')
     def xyz():
         pass
@@ -27,23 +26,20 @@ def test_hookrecorder_basic_no_args_hook
     call = rec.popcall("xyz")
     assert call._name == "xyz"
 
-reg = py._com.comregistry
-def test_functional_default(testdir, _pytest):
-    assert _pytest.comregistry == py._com.comregistry 
-    assert _pytest.comregistry != reg
-
 def test_functional(testdir, linecomp):
     reprec = testdir.inline_runsource("""
         import py
+        from py.impl.test.pluginmanager import HookRelay, Registry
         pytest_plugins="_pytest"
         def test_func(_pytest):
             class ApiClass:
                 def xyz(self, arg):  pass 
-            rec = _pytest.gethookrecorder(ApiClass)
+            hook = HookRelay(ApiClass, Registry())
+            rec = _pytest.gethookrecorder(hook)
             class Plugin:
                 def xyz(self, arg):
                     return arg + 1
-            rec._comregistry.register(Plugin())
+            rec._registry.register(Plugin())
             res = rec.hook.xyz(arg=41)
             assert res == [42]
     """)

--- a/testing/plugin/test_pytest_pytester.py
+++ b/testing/plugin/test_pytest_pytester.py
@@ -2,10 +2,9 @@ import py
 from py.plugin.pytest_pytester import LineMatcher, LineComp
 
 def test_reportrecorder(testdir):
-    registry = py._com.Registry()
-    recorder = testdir.getreportrecorder(registry)
+    item = testdir.getitem("def test_func(): pass")
+    recorder = testdir.getreportrecorder(item.config)
     assert not recorder.getfailures()
-    item = testdir.getitem("def test_func(): pass")
     class rep:
         excinfo = None
         passed = False

--- a/contrib/runtesthelper.py
+++ b/contrib/runtesthelper.py
@@ -14,6 +14,5 @@ def pytest(argv=None):
     except SystemExit:
         pass
     # we need to reset the global py.test.config object
-    py._com.comregistry = py._com.comregistry.__class__([])
     py.test.config = py.test.config.__class__(
-        pluginmanager=py.test._PluginManager(py._com.comregistry))
+        pluginmanager=py.test._PluginManager())

--- a/testing/pytest/test_pickling.py
+++ b/testing/pytest/test_pickling.py
@@ -3,14 +3,11 @@ import pickle
 
 def setglobals(request):
     oldconfig = py.test.config 
-    oldcom = py._com.comregistry 
     print("setting py.test.config to None")
     py.test.config = None
-    py._com.comregistry = py._com.Registry()
     def resetglobals():
         py.builtin.print_("setting py.test.config to", oldconfig)
         py.test.config = oldconfig
-        py._com.comregistry = oldcom
     request.addfinalizer(resetglobals)
 
 def pytest_funcarg__testdir(request):
@@ -190,7 +187,7 @@ def test_config__setstate__wired_correct
         from py.impl.test.dist.mypickle import PickleChannel
         channel = PickleChannel(channel)
         config = channel.receive()
-        assert py.test.config.pluginmanager.comregistry == py._com.comregistry, "comregistry wrong"
+        assert py.test.config == config 
     """)
     channel = PickleChannel(channel)
     config = testdir.parseconfig()

--- a/py/impl/test/config.py
+++ b/py/impl/test/config.py
@@ -107,9 +107,8 @@ class Config(object):
         # warning global side effects:
         # * registering to py lib plugins 
         # * setting py.test.config 
-        py._com.comregistry = py._com.Registry()
         self.__init__(
-            pluginmanager=py.test._PluginManager(py._com.comregistry),
+            pluginmanager=py.test._PluginManager(),
             topdir=py.path.local(),
         )
         # we have to set py.test.config because preparse()
@@ -310,6 +309,6 @@ def gettopdir(args):
 
 # this is the one per-process instance of py.test configuration 
 config_per_process = Config(
-    pluginmanager=py.test._PluginManager(py._com.comregistry)
+    pluginmanager=py.test._PluginManager()
 )
 

--- a/py/impl/test/pluginmanager.py
+++ b/py/impl/test/pluginmanager.py
@@ -2,6 +2,7 @@
 managing loading and interacting with pytest plugins. 
 """
 import py
+import inspect
 from py.plugin import hookspec
 from py.impl.test.outcome import Skipped
 
@@ -16,15 +17,10 @@ def check_old_use(mod, modname):
 class PluginManager(object):
     class Error(Exception):
         """signals a plugin specific error."""
-    def __init__(self, comregistry=None):
-        if comregistry is None: 
-            comregistry = py._com.Registry()
-        self.comregistry = comregistry 
+    def __init__(self):
+        self.registry = Registry()
         self._name2plugin = {}
-
-        self.hook = py._com.HookRelay(
-            hookspecs=hookspec, 
-            registry=self.comregistry) 
+        self.hook = HookRelay(hookspecs=hookspec, registry=self.registry) 
         self.register(self)
         for spec in default_plugins:
             self.import_plugin(spec) 
@@ -39,18 +35,18 @@ class PluginManager(object):
 
     def register(self, plugin, name=None):
         assert not self.isregistered(plugin), plugin
-        assert not self.comregistry.isregistered(plugin), plugin
+        assert not self.registry.isregistered(plugin), plugin
         name = self._getpluginname(plugin, name)
         if name in self._name2plugin:
             return False
         self._name2plugin[name] = plugin
         self.hook.pytest_plugin_registered(manager=self, plugin=plugin)
-        self.comregistry.register(plugin)
+        self.registry.register(plugin)
         return True
 
     def unregister(self, plugin):
         self.hook.pytest_plugin_unregistered(plugin=plugin)
-        self.comregistry.unregister(plugin)
+        self.registry.unregister(plugin)
         for name, value in list(self._name2plugin.items()):
             if value == plugin:
                 del self._name2plugin[name]
@@ -63,7 +59,7 @@ class PluginManager(object):
                 return True
 
     def getplugins(self):
-        return list(self.comregistry)
+        return list(self.registry)
 
     def getplugin(self, name):
         try:
@@ -143,7 +139,7 @@ class PluginManager(object):
     #
     # 
     def listattr(self, attrname, plugins=None, extra=()):
-        return self.comregistry.listattr(attrname, plugins=plugins, extra=extra)
+        return self.registry.listattr(attrname, plugins=plugins, extra=extra)
 
     def notify_exception(self, excinfo=None):
         if excinfo is None:
@@ -153,8 +149,8 @@ class PluginManager(object):
 
     def do_addoption(self, parser):
         mname = "pytest_addoption"
-        methods = self.comregistry.listattr(mname, reverse=True)
-        mc = py._com.MultiCall(methods, {'parser': parser})
+        methods = self.registry.listattr(mname, reverse=True)
+        mc = MultiCall(methods, {'parser': parser})
         mc.execute()
 
     def pytest_plugin_registered(self, plugin):
@@ -168,7 +164,7 @@ class PluginManager(object):
                 {'config': self._config})
 
     def call_plugin(self, plugin, methname, kwargs):
-        return py._com.MultiCall(
+        return MultiCall(
                 methods=self.listattr(methname, plugins=[plugin]), 
                 kwargs=kwargs, firstresult=True).execute()
 
@@ -210,3 +206,118 @@ def importplugin(importspec):
 
 
 
+class MultiCall:
+    """ execute a call into multiple python functions/methods.  """
+
+    def __init__(self, methods, kwargs, firstresult=False):
+        self.methods = methods[:]
+        self.kwargs = kwargs.copy()
+        self.kwargs['__multicall__'] = self
+        self.results = []
+        self.firstresult = firstresult
+
+    def __repr__(self):
+        status = "%d results, %d meths" % (len(self.results), len(self.methods))
+        return "<MultiCall %s, kwargs=%r>" %(status, self.kwargs)
+
+    def execute(self):
+        while self.methods:
+            method = self.methods.pop()
+            kwargs = self.getkwargs(method)
+            res = method(**kwargs)
+            if res is not None:
+                self.results.append(res) 
+                if self.firstresult:
+                    return res
+        if not self.firstresult:
+            return self.results 
+
+    def getkwargs(self, method):
+        kwargs = {}
+        for argname in varnames(method):
+            try:
+                kwargs[argname] = self.kwargs[argname]
+            except KeyError:
+                pass # might be optional param
+        return kwargs 
+
+def varnames(func):
+    ismethod = inspect.ismethod(func)
+    rawcode = py.code.getrawcode(func)
+    try:
+        return rawcode.co_varnames[ismethod:]
+    except AttributeError:
+        return ()
+
+class Registry:
+    """
+        Manage Plugins: register/unregister call calls to plugins. 
+    """
+    def __init__(self, plugins=None):
+        if plugins is None:
+            plugins = []
+        self._plugins = plugins
+
+    def register(self, plugin):
+        assert not isinstance(plugin, str)
+        assert not plugin in self._plugins
+        self._plugins.append(plugin)
+
+    def unregister(self, plugin):
+        self._plugins.remove(plugin)
+
+    def isregistered(self, plugin):
+        return plugin in self._plugins 
+
+    def __iter__(self):
+        return iter(self._plugins)
+
+    def listattr(self, attrname, plugins=None, extra=(), reverse=False):
+        l = []
+        if plugins is None:
+            plugins = self._plugins
+        candidates = list(plugins) + list(extra)
+        for plugin in candidates:
+            try:
+                l.append(getattr(plugin, attrname))
+            except AttributeError:
+                continue 
+        if reverse:
+            l.reverse()
+        return l
+
+class HookRelay: 
+    def __init__(self, hookspecs, registry):
+        self._hookspecs = hookspecs
+        self._registry = registry
+        for name, method in vars(hookspecs).items():
+            if name[:1] != "_":
+                setattr(self, name, self._makecall(name))
+
+    def _makecall(self, name, extralookup=None):
+        hookspecmethod = getattr(self._hookspecs, name)
+        firstresult = getattr(hookspecmethod, 'firstresult', False)
+        return HookCaller(self, name, firstresult=firstresult,
+            extralookup=extralookup)
+
+    def _getmethods(self, name, extralookup=()):
+        return self._registry.listattr(name, extra=extralookup)
+
+    def _performcall(self, name, multicall):
+        return multicall.execute()
+        
+class HookCaller:
+    def __init__(self, hookrelay, name, firstresult, extralookup=None):
+        self.hookrelay = hookrelay 
+        self.name = name 
+        self.firstresult = firstresult 
+        self.extralookup = extralookup and [extralookup] or ()
+
+    def __repr__(self):
+        return "<HookCaller %r>" %(self.name,)
+
+    def __call__(self, **kwargs):
+        methods = self.hookrelay._getmethods(self.name, self.extralookup)
+        mc = MultiCall(methods, kwargs, firstresult=self.firstresult)
+        return self.hookrelay._performcall(self.name, mc)
+



More information about the pytest-commit mailing list