[py-svn] r31587 - in py/branch/distributed/py/test/rsession: . testing
hpk at codespeak.net
hpk at codespeak.net
Thu Aug 24 12:07:23 CEST 2006
Author: hpk
Date: Thu Aug 24 12:07:21 2006
New Revision: 31587
Modified:
py/branch/distributed/py/test/rsession/rsession.py
py/branch/distributed/py/test/rsession/testing/test_rsession.py
Log:
(fijal, hpk) cleanup initialisation and transformation from filesystem paths to collectors.
Modified: py/branch/distributed/py/test/rsession/rsession.py
==============================================================================
--- py/branch/distributed/py/test/rsession/rsession.py (original)
+++ py/branch/distributed/py/test/rsession/rsession.py Thu Aug 24 12:07:21 2006
@@ -7,8 +7,6 @@
import sys
from py.__.test.rsession import report
-
-from py.__.test.terminal.remote import getrootdir
from py.__.test.rsession.master import dispatch_loop
from py.__.test.rsession.hostmanage import init_hosts, teardown_hosts
@@ -38,16 +36,26 @@
dispatch_loop(nodes, itemgenerator, lambda : False)
teardown_hosts(self.report, [node.channel for node in nodes])
- def _map2colitems(items):
- # first convert all path objects into collectors
- from py.__.test.collect import getfscollector
- colitems = []
- for item in items:
- if isinstance(item, (list, tuple)):
- colitems.extend(Session._map2colitems(item))
- elif not isinstance(item, py.test.collect.Collector):
- colitems.append(getfscollector(item))
- else:
- colitems.append(item)
- return colitems
- _map2colitems = staticmethod(_map2colitems)
+ def make_colitems(paths, baseon):
+ # we presume that from the base we can simply get to
+ # the target paths by joining the basenames
+ res = []
+ for x in paths:
+ current = py.test.collect.Directory(baseon)
+ relparts = x.relto(baseon).split(x.sep)
+ assert relparts
+ for part in relparts:
+ next = current.join(part)
+ assert next is not None, (current, part)
+ current = next
+ res.append(current)
+ return res
+ make_colitems = staticmethod(make_colitems)
+
+ def getpkgdir(path):
+ path = py.path.local(path)
+ pkgpath = path.pypkgpath()
+ assert pkgpath is not None, (
+ "no pkgdir for global scoped file %s" %(path,))
+ return pkgpath
+ getpkgdir = staticmethod(getpkgdir)
Modified: py/branch/distributed/py/test/rsession/testing/test_rsession.py
==============================================================================
--- py/branch/distributed/py/test/rsession/testing/test_rsession.py (original)
+++ py/branch/distributed/py/test/rsession/testing/test_rsession.py Thu Aug 24 12:07:21 2006
@@ -4,16 +4,13 @@
import py
from py.__.test.rsession import report
+from py.__.test.rsession.rsession import RSession
from py.__.test.rsession.hostmanage import init_hosts, teardown_hosts
from py.__.test.rsession.testing.test_slave import (funcfail_spec,
funcpass_spec, funcskip_spec)
def setup_module(mod):
mod.pkgdir = py.path.local(py.__file__).dirpath()
- from py.__.test.rsession.conftest import option
- if not option.disthosts:
- py.test.skip("no test distribution ssh hosts specified")
- mod.hosts = option.disthosts.split(",")
def test_setup_non_existing_hosts():
setup_events = []
@@ -21,57 +18,97 @@
cmd = "init_hosts(setup_events.append, hosts, 'pytestest', pkgdir)"
py.test.raises(py.process.cmdexec.Error, cmd)
#assert setup_events
-
-def test_setup_teardown_ssh():
- setup_events = []
- teardown_events = []
-
- nodes = init_hosts(setup_events.append, hosts, 'pytestest', pkgdir)
- teardown_hosts(teardown_events.append,
- [node.channel for node in nodes])
-
- count_rsyn_calls = [i for i in setup_events
- if isinstance(i, report.CallStart)]
- assert len(count_rsyn_calls) == len(hosts)
- count_rsyn_ends = [i for i in setup_events
- if isinstance(i, report.CallFinish)]
- assert len(count_rsyn_ends) == len(hosts)
-
- # same for teardown events
- teardown_wait_starts = [i for i in teardown_events
- if isinstance(i, report.CallStart)]
- teardown_wait_ends = [i for i in teardown_events
- if isinstance(i, report.CallFinish)]
- assert len(teardown_wait_starts) == len(hosts)
- assert len(teardown_wait_ends) == len(hosts)
-
-def test_setup_teardown_run_ssh():
- allevents = []
-
- nodes = init_hosts(allevents.append, hosts, 'pytestest', pkgdir)
-
- from py.__.test.rsession.testing.test_executor \
- import ItemTestPassing, ItemTestFailing, ItemTestSkipping
-
- rootcol = py.test.collect.Directory(pkgdir.dirpath())
- itempass = rootcol.getitembynames(funcpass_spec)
- itemfail = rootcol.getitembynames(funcfail_spec)
- itemskip = rootcol.getitembynames(funcskip_spec)
-
- # actually run some tests
- for node in nodes:
- node.send(itempass)
- node.send(itemfail)
- node.send(itemskip)
-
- teardown_hosts(allevents.append, [node.channel for node in nodes])
-
- events = [i for i in allevents
- if isinstance(i, report.ReceivedItemOutcome)]
- passed = [i for i in events
- if i.outcome.passed]
- skipped = [i for i in events
- if i.outcome.skipped]
- assert len(passed) == len(nodes)
- assert len(skipped) == len(nodes)
- assert len(events) == 3 * len(nodes)
+
+def test_getpkdir():
+ one = pkgdir.join("initpkg.py")
+ two = pkgdir.join("path", "__init__.py")
+ p1 = RSession.getpkgdir(one)
+ p2 = RSession.getpkgdir(two)
+ assert p1 == p2
+ assert p1 == pkgdir
+
+def test_getpkdir_no_inits():
+ tmp = py.test.ensuretemp("getpkdir1")
+ fn = tmp.ensure("hello.py")
+ py.test.raises(AssertionError, "RSession.getpkgdir(fn)")
+
+def test_make_colitems():
+ one = pkgdir.join("initpkg.py")
+ two = pkgdir.join("path", "__init__.py")
+
+ cols = RSession.make_colitems([one, two], baseon=pkgdir)
+ assert len(cols) == 2
+ col_one, col_two = cols
+ assert col_one.listnames() == ["py", "initpkg.py"]
+ assert col_two.listnames() == ["py", "path", "__init__.py"]
+
+ cols = RSession.make_colitems([one, two], baseon=pkgdir.dirpath())
+ assert len(cols) == 2
+ col_one, col_two = cols
+ assert col_one.listnames() == [pkgdir.dirpath().basename,
+ "py", "initpkg.py"]
+ assert col_two.listnames() == [pkgdir.dirpath().basename,
+ "py", "path", "__init__.py"]
+
+class TestWithRealSshHosts:
+ def setup_class(cls):
+ from py.__.test.rsession.conftest import option
+ if not option.disthosts:
+ py.test.skip("no test distribution ssh hosts specified")
+ cls.hosts = option.disthosts.split(",")
+
+ def test_setup_teardown_ssh(self):
+ hosts = self.hosts
+ setup_events = []
+ teardown_events = []
+
+ nodes = init_hosts(setup_events.append, hosts, 'pytestest', pkgdir)
+ teardown_hosts(teardown_events.append,
+ [node.channel for node in nodes])
+
+ count_rsyn_calls = [i for i in setup_events
+ if isinstance(i, report.CallStart)]
+ assert len(count_rsyn_calls) == len(hosts)
+ count_rsyn_ends = [i for i in setup_events
+ if isinstance(i, report.CallFinish)]
+ assert len(count_rsyn_ends) == len(hosts)
+
+ # same for teardown events
+ teardown_wait_starts = [i for i in teardown_events
+ if isinstance(i, report.CallStart)]
+ teardown_wait_ends = [i for i in teardown_events
+ if isinstance(i, report.CallFinish)]
+ assert len(teardown_wait_starts) == len(hosts)
+ assert len(teardown_wait_ends) == len(hosts)
+
+ def test_setup_teardown_run_ssh(self):
+ hosts = self.hosts
+ allevents = []
+
+ nodes = init_hosts(allevents.append, hosts, 'pytestest', pkgdir)
+
+ from py.__.test.rsession.testing.test_executor \
+ import ItemTestPassing, ItemTestFailing, ItemTestSkipping
+
+ rootcol = py.test.collect.Directory(pkgdir.dirpath())
+ itempass = rootcol.getitembynames(funcpass_spec)
+ itemfail = rootcol.getitembynames(funcfail_spec)
+ itemskip = rootcol.getitembynames(funcskip_spec)
+
+ # actually run some tests
+ for node in nodes:
+ node.send(itempass)
+ node.send(itemfail)
+ node.send(itemskip)
+
+ teardown_hosts(allevents.append, [node.channel for node in nodes])
+
+ events = [i for i in allevents
+ if isinstance(i, report.ReceivedItemOutcome)]
+ passed = [i for i in events
+ if i.outcome.passed]
+ skipped = [i for i in events
+ if i.outcome.skipped]
+ assert len(passed) == len(nodes)
+ assert len(skipped) == len(nodes)
+ assert len(events) == 3 * len(nodes)
More information about the pytest-commit
mailing list