[py-svn] r31587 - in py/branch/distributed/py/test/rsession: . testing

hpk at codespeak.net hpk at codespeak.net
Thu Aug 24 12:07:23 CEST 2006


Author: hpk
Date: Thu Aug 24 12:07:21 2006
New Revision: 31587

Modified:
   py/branch/distributed/py/test/rsession/rsession.py
   py/branch/distributed/py/test/rsession/testing/test_rsession.py
Log:
(fijal, hpk) cleanup initialisation and transformation from filesystem paths to collectors. 


Modified: py/branch/distributed/py/test/rsession/rsession.py
==============================================================================
--- py/branch/distributed/py/test/rsession/rsession.py	(original)
+++ py/branch/distributed/py/test/rsession/rsession.py	Thu Aug 24 12:07:21 2006
@@ -7,8 +7,6 @@
 import sys
 
 from py.__.test.rsession import report
-
-from py.__.test.terminal.remote import getrootdir
 from py.__.test.rsession.master import dispatch_loop
 from py.__.test.rsession.hostmanage import init_hosts, teardown_hosts
 
@@ -38,16 +36,26 @@
         dispatch_loop(nodes, itemgenerator, lambda : False)
         teardown_hosts(self.report, [node.channel for node in nodes])
         
-    def _map2colitems(items): 
-        # first convert all path objects into collectors 
-        from py.__.test.collect import getfscollector 
-        colitems = []
-        for item in items: 
-            if isinstance(item, (list, tuple)): 
-                colitems.extend(Session._map2colitems(item))
-            elif not isinstance(item, py.test.collect.Collector): 
-                colitems.append(getfscollector(item))
-            else:
-                colitems.append(item)
-        return colitems 
-    _map2colitems = staticmethod(_map2colitems) 
+    def make_colitems(paths, baseon): 
+        # we presume that from the base we can simply get to 
+        # the target paths by joining the basenames 
+        res = []
+        for x in paths: 
+            current = py.test.collect.Directory(baseon)  
+            relparts = x.relto(baseon).split(x.sep) 
+            assert relparts 
+            for part in relparts: 
+                next = current.join(part) 
+                assert next is not None, (current, part) 
+                current = next 
+            res.append(current) 
+        return res 
+    make_colitems = staticmethod(make_colitems) 
+
+    def getpkgdir(path):
+        path = py.path.local(path)
+        pkgpath = path.pypkgpath() 
+        assert pkgpath is not None, (
+               "no pkgdir for global scoped file %s" %(path,))
+        return pkgpath 
+    getpkgdir = staticmethod(getpkgdir) 

Modified: py/branch/distributed/py/test/rsession/testing/test_rsession.py
==============================================================================
--- py/branch/distributed/py/test/rsession/testing/test_rsession.py	(original)
+++ py/branch/distributed/py/test/rsession/testing/test_rsession.py	Thu Aug 24 12:07:21 2006
@@ -4,16 +4,13 @@
 
 import py
 from py.__.test.rsession import report
+from py.__.test.rsession.rsession import RSession
 from py.__.test.rsession.hostmanage import init_hosts, teardown_hosts
 from py.__.test.rsession.testing.test_slave import (funcfail_spec, 
     funcpass_spec, funcskip_spec)
 
 def setup_module(mod):
     mod.pkgdir = py.path.local(py.__file__).dirpath()
-    from py.__.test.rsession.conftest import option 
-    if not option.disthosts:
-        py.test.skip("no test distribution ssh hosts specified")
-    mod.hosts =  option.disthosts.split(",")
 
 def test_setup_non_existing_hosts(): 
     setup_events = []
@@ -21,57 +18,97 @@
     cmd = "init_hosts(setup_events.append, hosts, 'pytestest', pkgdir)"
     py.test.raises(py.process.cmdexec.Error, cmd) 
     #assert setup_events
-    
-def test_setup_teardown_ssh():
-    setup_events = []
-    teardown_events = []
-    
-    nodes = init_hosts(setup_events.append, hosts, 'pytestest', pkgdir)
-    teardown_hosts(teardown_events.append, 
-                   [node.channel for node in nodes])
-    
-    count_rsyn_calls = [i for i in setup_events 
-            if isinstance(i, report.CallStart)]
-    assert len(count_rsyn_calls) == len(hosts)
-    count_rsyn_ends = [i for i in setup_events 
-            if isinstance(i, report.CallFinish)]
-    assert len(count_rsyn_ends) == len(hosts)
-    
-    # same for teardown events
-    teardown_wait_starts = [i for i in teardown_events 
-                                if isinstance(i, report.CallStart)]
-    teardown_wait_ends = [i for i in teardown_events 
-                                if isinstance(i, report.CallFinish)]
-    assert len(teardown_wait_starts) == len(hosts)
-    assert len(teardown_wait_ends) == len(hosts)
-
-def test_setup_teardown_run_ssh():
-    allevents = []
-    
-    nodes = init_hosts(allevents.append, hosts, 'pytestest', pkgdir)
-    
-    from py.__.test.rsession.testing.test_executor \
-        import ItemTestPassing, ItemTestFailing, ItemTestSkipping
-    
-    rootcol = py.test.collect.Directory(pkgdir.dirpath())
-    itempass = rootcol.getitembynames(funcpass_spec)
-    itemfail = rootcol.getitembynames(funcfail_spec)
-    itemskip = rootcol.getitembynames(funcskip_spec)
-
-    # actually run some tests
-    for node in nodes:
-        node.send(itempass)
-        node.send(itemfail)
-        node.send(itemskip)
-
-    teardown_hosts(allevents.append, [node.channel for node in nodes])
-
-    events = [i for i in allevents 
-                    if isinstance(i, report.ReceivedItemOutcome)]
-    passed = [i for i in events 
-                    if i.outcome.passed]
-    skipped = [i for i in events 
-                    if i.outcome.skipped]
-    assert len(passed) == len(nodes)
-    assert len(skipped) == len(nodes)
-    assert len(events) == 3 * len(nodes)
+
+def test_getpkdir():
+    one = pkgdir.join("initpkg.py")
+    two = pkgdir.join("path", "__init__.py")
+    p1 = RSession.getpkgdir(one)
+    p2 = RSession.getpkgdir(two) 
+    assert p1 == p2
+    assert p1 == pkgdir 
+
+def test_getpkdir_no_inits():
+    tmp = py.test.ensuretemp("getpkdir1")
+    fn = tmp.ensure("hello.py")
+    py.test.raises(AssertionError, "RSession.getpkgdir(fn)")
+
+def test_make_colitems():
+    one = pkgdir.join("initpkg.py")
+    two = pkgdir.join("path", "__init__.py")
+
+    cols = RSession.make_colitems([one, two], baseon=pkgdir) 
+    assert len(cols) == 2
+    col_one, col_two = cols
+    assert col_one.listnames() == ["py", "initpkg.py"]
+    assert col_two.listnames() == ["py", "path", "__init__.py"]
+
+    cols = RSession.make_colitems([one, two], baseon=pkgdir.dirpath()) 
+    assert len(cols) == 2
+    col_one, col_two = cols
+    assert col_one.listnames() == [pkgdir.dirpath().basename, 
+                                   "py", "initpkg.py"]
+    assert col_two.listnames() == [pkgdir.dirpath().basename, 
+                                   "py", "path", "__init__.py"]
+
+class TestWithRealSshHosts: 
+    def setup_class(cls):
+        from py.__.test.rsession.conftest import option 
+        if not option.disthosts:
+            py.test.skip("no test distribution ssh hosts specified")
+        cls.hosts =  option.disthosts.split(",")
+
+    def test_setup_teardown_ssh(self):
+        hosts = self.hosts
+        setup_events = []
+        teardown_events = []
+        
+        nodes = init_hosts(setup_events.append, hosts, 'pytestest', pkgdir)
+        teardown_hosts(teardown_events.append, 
+                       [node.channel for node in nodes])
+        
+        count_rsyn_calls = [i for i in setup_events 
+                if isinstance(i, report.CallStart)]
+        assert len(count_rsyn_calls) == len(hosts)
+        count_rsyn_ends = [i for i in setup_events 
+                if isinstance(i, report.CallFinish)]
+        assert len(count_rsyn_ends) == len(hosts)
+        
+        # same for teardown events
+        teardown_wait_starts = [i for i in teardown_events 
+                                    if isinstance(i, report.CallStart)]
+        teardown_wait_ends = [i for i in teardown_events 
+                                    if isinstance(i, report.CallFinish)]
+        assert len(teardown_wait_starts) == len(hosts)
+        assert len(teardown_wait_ends) == len(hosts)
+
+    def test_setup_teardown_run_ssh(self):
+        hosts = self.hosts
+        allevents = []
+        
+        nodes = init_hosts(allevents.append, hosts, 'pytestest', pkgdir)
+        
+        from py.__.test.rsession.testing.test_executor \
+            import ItemTestPassing, ItemTestFailing, ItemTestSkipping
+        
+        rootcol = py.test.collect.Directory(pkgdir.dirpath())
+        itempass = rootcol.getitembynames(funcpass_spec)
+        itemfail = rootcol.getitembynames(funcfail_spec)
+        itemskip = rootcol.getitembynames(funcskip_spec)
+
+        # actually run some tests
+        for node in nodes:
+            node.send(itempass)
+            node.send(itemfail)
+            node.send(itemskip)
+
+        teardown_hosts(allevents.append, [node.channel for node in nodes])
+
+        events = [i for i in allevents 
+                        if isinstance(i, report.ReceivedItemOutcome)]
+        passed = [i for i in events 
+                        if i.outcome.passed]
+        skipped = [i for i in events 
+                        if i.outcome.skipped]
+        assert len(passed) == len(nodes)
+        assert len(skipped) == len(nodes)
+        assert len(events) == 3 * len(nodes)



More information about the pytest-commit mailing list