[py-svn] r51567 - in py/branch/event/py/test2: . rep rep/testing rep/webdata rsession rsession/testing rsession/webdata testing
hpk at codespeak.net
hpk at codespeak.net
Sun Feb 17 21:22:22 CET 2008
Author: hpk
Date: Sun Feb 17 21:22:21 2008
New Revision: 51567
Added:
py/branch/event/py/test2/rep/ (props changed)
py/branch/event/py/test2/rep/__init__.py (contents, props changed)
py/branch/event/py/test2/rep/reporter.py
- copied unchanged from r51566, py/branch/event/py/test2/reporter.py
py/branch/event/py/test2/rep/testing/ (props changed)
py/branch/event/py/test2/rep/testing/__init__.py (contents, props changed)
py/branch/event/py/test2/rep/testing/test_reporter.py
- copied unchanged from r51566, py/branch/event/py/test2/testing/test_reporter.py
py/branch/event/py/test2/rep/testing/test_rest.py
- copied unchanged from r51566, py/branch/event/py/test2/rsession/testing/test_rest.py
py/branch/event/py/test2/rep/testing/test_web.py
- copied unchanged from r51566, py/branch/event/py/test2/rsession/testing/test_web.py
py/branch/event/py/test2/rep/testing/test_webjs.py
- copied unchanged from r51566, py/branch/event/py/test2/rsession/testing/test_webjs.py
py/branch/event/py/test2/rep/web.py
- copied unchanged from r51566, py/branch/event/py/test2/rsession/web.py
py/branch/event/py/test2/rep/webdata/
- copied from r51566, py/branch/event/py/test2/rsession/webdata/
py/branch/event/py/test2/rep/webjs.py
- copied unchanged from r51566, py/branch/event/py/test2/rsession/webjs.py
Removed:
py/branch/event/py/test2/reporter.py
py/branch/event/py/test2/rsession/testing/test_rest.py
py/branch/event/py/test2/rsession/testing/test_web.py
py/branch/event/py/test2/rsession/testing/test_webjs.py
py/branch/event/py/test2/rsession/web.py
py/branch/event/py/test2/rsession/webdata/
py/branch/event/py/test2/rsession/webjs.py
py/branch/event/py/test2/testing/test_reporter.py
Modified:
py/branch/event/py/test2/conftesthandle.py (props changed)
py/branch/event/py/test2/custompdb.py (props changed)
py/branch/event/py/test2/genitem.py (props changed)
py/branch/event/py/test2/outcome.py (props changed)
py/branch/event/py/test2/present.py (props changed)
py/branch/event/py/test2/rsession/testing/basetest.py (props changed)
py/branch/event/py/test2/rsession/testing/test_hostmanage.py (props changed)
py/branch/event/py/test2/rsession/testing/test_masterslave.py (props changed)
py/branch/event/py/test2/testing/setupdata.py (props changed)
py/branch/event/py/test2/testing/suptest.py (props changed)
py/branch/event/py/test2/testing/test_conftesthandle.py (props changed)
py/branch/event/py/test2/testing/test_present.py (props changed)
py/branch/event/py/test2/testing/test_repevent.py (props changed)
Log:
move all reporters and their (skipped) tests into
a new "rep" directory
Added: py/branch/event/py/test2/rep/__init__.py
==============================================================================
--- (empty file)
+++ py/branch/event/py/test2/rep/__init__.py Sun Feb 17 21:22:21 2008
@@ -0,0 +1 @@
+#
Added: py/branch/event/py/test2/rep/testing/__init__.py
==============================================================================
--- (empty file)
+++ py/branch/event/py/test2/rep/testing/__init__.py Sun Feb 17 21:22:21 2008
@@ -0,0 +1 @@
+#
Deleted: /py/branch/event/py/test2/reporter.py
==============================================================================
--- /py/branch/event/py/test2/reporter.py Sun Feb 17 21:22:21 2008
+++ (empty file)
@@ -1,439 +0,0 @@
-
-""" reporter - different reporter for different purposes ;-)
- Still lacks:
-
- 1. Hanging nodes are not done well
-"""
-
-import py
-
-from py.__.test2.terminal.out import getout
-from py.__.test2 import repevent
-from py.__.test2 import outcome
-from py.__.misc.terminal_helper import ansi_print, get_terminal_width
-from py.__.test2.present import Presenter, repr_pythonversion,\
- getrelpath
-
-import sys
-
-from time import time as now
-
-def choose_reporter(reporterclass, config):
- option = config.option
- if option.startserver or option.runbrowser:
- from py.__.test2.rsession.web import WebReporter
- return WebReporter
- if option.restreport:
- from py.__.test2.rsession.rest import RestReporter
- return RestReporter
- else:
- return reporterclass
-
-class TestReporter(object):
- """ Simple test reporter which tracks failures
- and also calls arbitrary provided function,
- useful for tests
- """
- def __init__(self, reportfun):
- self.reportfun = reportfun
- self.flag = False
-
- def report(self, event):
- #if event.is_failure():
- # self.flag = True
- self.reportfun(event)
-
- __call__ = report
-
- def was_failure(self):
- return self.flag
-
-class AbstractReporter(object):
- def __init__(self, config, hosts):
- self.config = config
- self.hosts = hosts
- self.failed_tests_outcome = []
- self.skipped_tests_outcome = []
- self.out = getout(py.std.sys.stdout)
- self.presenter = Presenter(self.out, config)
- self.to_rsync = {}
-
- def get_item_name(self, event, colitem):
- return "/".join(colitem.listnames())
-
- def report(self, what):
- repfun = getattr(self, "report_" + what.__class__.__name__,
- self.report_unknown)
- try:
- return repfun(what)
- except (KeyboardInterrupt, SystemExit):
- raise
- except:
- print "Internal reporting problem"
- excinfo = py.code.ExceptionInfo()
- for i in excinfo.traceback:
- print str(i)[2:-1]
- print excinfo
- # XXX reenable test before removing below line and
- # run it with raise
- raise
-
- __call__ = report
-
- def report_unknown(self, what):
- if self.config.option.verbose:
- print "Unknown report: %s" % what
-
- def report_SendItem(self, item):
- address = item.host.hostname
- assert isinstance(item.host.hostname, str)
- if self.config.option.verbose:
- print "Sending %s to %s" % (item.item,
- address)
-
- def report_HostRSyncing(self, item):
- hostrepr = self._hostrepr(item.host)
- if item.synced:
- if (item.host.hostname == "localhost" and
- item.root == item.remotepath):
- print "%15s: skipping inplace rsync of %r" %(
- hostrepr, item.remotepath)
- else:
- print "%15s: skip duplicate rsync to %r" % (
- hostrepr, item.remotepath)
- else:
- print "%15s: rsync %r to remote %r" % (hostrepr,
- item.root.basename,
- item.remotepath)
-
- def report_HostGatewayReady(self, item):
- self.to_rsync[item.host] = len(item.roots)
- hostrepr = self._hostrepr(item.host)
- self.out.write("%15s: gateway initialised (remote topdir: %s)\n"\
- % (hostrepr, item.host.gw_remotepath))
-
- def report_HostRSyncRootReady(self, item):
- self.to_rsync[item.host] -= 1
- if not self.to_rsync[item.host]:
- self._host_ready(item)
-
- def _host_ready(self, item):
- self.hosts_to_rsync -= 1
- hostrepr = self._hostrepr(item.host)
- if self.hosts_to_rsync:
- print "%15s: READY (still %d to go)" % (hostrepr,
- self.hosts_to_rsync)
- else:
- print "%15s: READY" % hostrepr
-
- def report_TestStarted(self, item):
- topdir = item.config.topdir
- hostreprs = [self._hostrepr(host) for host in item.hosts]
- txt = " Test started, hosts: %s " % ", ".join(hostreprs)
- self.hosts_to_rsync = len(item.hosts)
- self.out.sep("=", txt)
- self.timestart = item.timestart
- self.out.write("local top directory: %s\n" % topdir)
- for i, root in py.builtin.enumerate(item.roots):
- outof = "%d/%d" %(i+1, len(item.roots))
- self.out.write("local RSync root [%s]: %s\n" %
- (outof, root))
-
- def report_RsyncFinished(self, item):
- self.timersync = item.time
-
- def report_ImmediateFailure(self, event):
- self.out.line()
- self.repr_failure(event.item, event.outcome)
-
- def report_SessionFinish(self, item):
- self.out.line()
- assert hasattr(self, 'timestart')
- self.timeend = item.timeend
- self.skips()
- self.failures()
- if hasattr(self, 'nodes'): # XXX: Testing
- self.hangs()
- self.summary()
- return len(self.failed_tests_outcome) > 0
-
- report_InterruptedExecution = report_SessionFinish
- report_CrashedExecution = report_SessionFinish
-
- def hangs(self):
- h = []
- if self.config.option.exitfirst:
- # reporting hanging nodes in that case makes no sense at all
- # but we should share some code in all reporters than
- return
- for node in self.nodes:
- h += [(i, node.channel.gateway.sshaddress) for i in node.pending]
- if h:
- self.out.sep("=", " HANGING NODES ")
- for i, node in h:
- self.out.line("%s on %s" % (" ".join(i.listnames()), node))
-
- def failures(self):
- if self.failed_tests_outcome:
- self.out.sep("=", " FAILURES ")
- for event in self.failed_tests_outcome:
- if isinstance(event, repevent.ItemFinish):
- host = self.gethost(event)
- self.out.sep('_', "%s on %s" %
- (" ".join(event.item.listnames()), host))
- if event.outcome.signal:
- self.presenter.repr_item_info(event.item)
- self.repr_signal(event.item, event.outcome)
- else:
- self.repr_failure(event.item, event.outcome)
- else:
- self.out.sep('_', " ".join(event.item.listnames()))
- out = outcome.SerializableOutcome(excinfo=event.excinfo)
- self.repr_failure(event.item, outcome.ReprOutcome(out.make_repr()))
-
- def gethost(self, event):
- return event.host.hostname
-
- def repr_failure(self, item, outcome):
- excinfo = outcome.excinfo
- traceback = excinfo.traceback
- if not traceback:
- self.out.line("empty traceback from item %r" % (item,))
- return
-
- handler = getattr(self.presenter, 'repr_failure_tb%s' % self.config.option.tbstyle)
- handler(item, excinfo, traceback, lambda: self.repr_out_err(outcome))
-
- def repr_out_err(self, outcome):
- if outcome.stdout:
- self.out.sep('-', " Captured process stdout: ")
- self.out.write(outcome.stdout)
- if outcome.stderr:
- self.out.sep('-', " Captured process stderr: ")
- self.out.write(outcome.stderr)
-
- def repr_signal(self, item, outcome):
- signal = outcome.signal
- self.out.line("Received signal: %d" % outcome.signal)
- self.repr_out_err(outcome)
-
- def _hostrepr(self, host):
- return host.hostid
-
- def skips(self):
- # XXX review and test below, fix too many lines of
- # skips that happen in the same file/lineno
- # (this is correct in py-dist)
- texts = {}
- for event in self.skipped_tests_outcome:
- colitem = event.item
- if isinstance(event, repevent.ItemFinish):
- outcome = event.outcome
- text = outcome.skipped.value
- itemname = repr(outcome.skipped.traceback[-2]).split("\n")[0]
- elif isinstance(event, repevent.DeselectedTest):
- text = str(event.excinfo.value)
- itemname = "/".join(colitem.listnames())
- if text not in texts:
- texts[text] = [itemname]
- else:
- texts[text].append(itemname)
-
- if texts:
- self.out.line()
- self.out.sep('_', 'reasons for skipped tests')
- for text, items in texts.items():
- for item in items:
- self.out.line('Skipped in %s' % item)
- self.out.line("reason: %s" % text[1:-1])
- self.out.line()
-
- def summary(self):
- def gather(dic):
- # XXX hack to handle dicts & ints here, get rid of it
- if isinstance(dic, int):
- return dic
- total = 0
- for key, val in dic.iteritems():
- total += val
- return total
-
- def create_str(name, count):
- if count:
- return ", %d %s" % (count, name)
- return ""
-
- total_passed = gather(self.passed)
- total_failed = gather(self.failed)
- total_skipped = gather(self.skipped)
- total = total_passed + total_failed + total_skipped
- skipped_str = create_str("skipped", total_skipped)
- failed_str = create_str("failed", total_failed)
- self.out.line()
- self.print_summary(total, skipped_str, failed_str)
-
- def print_summary(self, total, skipped_str, failed_str):
- self.out.sep("=", " %d test run%s%s in %.2fs (rsync: %.2f)" %
- (total, skipped_str, failed_str, self.timeend - self.timestart,
- self.timersync - self.timestart))
-
- def report_DeselectedTest(self, event):
- #event.outcome.excinfo.source =
- self.skipped_tests_outcome.append(event)
-
- def report_FailedTryiter(self, event):
- pass
- # XXX: right now we do not do anything with it
-
- def report_ItemFinish(self, event):
- host = event.host
- hostrepr = self._hostrepr(host)
- if event.outcome.passed:
- self.passed[host] += 1
- sys.stdout.write("%15s: PASSED " % hostrepr)
- elif event.outcome.skipped:
- self.skipped_tests_outcome.append(event)
- self.skipped[host] += 1
- sys.stdout.write("%15s: SKIPPED " % hostrepr)
- else:
- self.failed[host] += 1
- self.failed_tests_outcome.append(event)
- sys.stdout.write("%15s: " % hostrepr)
- ansi_print("FAILED", esc=(31,1), newline=False, file=sys.stdout)
- sys.stdout.write(" ")
- # we should have printed 20 characters to this point
- itempath = ".".join(event.item.listnames()[1:-1])
- funname = event.item.listnames()[-1]
- lgt = get_terminal_width() - 20
- # mark the function name, to be sure
- to_display = len(itempath) + len(funname) + 1
- if to_display > lgt:
- sys.stdout.write("..." + itempath[to_display-lgt+4:])
- else:
- sys.stdout.write(itempath)
- sys.stdout.write(" ")
- ansi_print(funname, esc=32, file=sys.stdout)
-
- def report_Nodes(self, event):
- self.nodes = event.nodes
-
- def was_failure(self):
- return sum(self.failed.values()) > 0
-
-class RemoteReporter(AbstractReporter):
- def __init__(self, config, hosts):
- super(RemoteReporter, self).__init__(config, hosts)
- self.failed = dict([(host, 0) for host in hosts])
- self.skipped = dict([(host, 0) for host in hosts])
- self.passed = dict([(host, 0) for host in hosts])
-
- def get_item_name(self, event, colitem):
- return event.host.hostname + ":" + \
- "/".join(colitem.listnames())
-
- def report_FailedTryiter(self, event):
- self.out.line("FAILED TO LOAD MODULE: %s\n" % "/".join(event.item.listnames()))
- self.failed_tests_outcome.append(event)
- # argh! bad hack, need to fix it
- self.failed[self.hosts[0]] += 1
-
- def report_DeselectedTest(self, event):
- self.out.line("Skipped (%s) %s\n" % (str(event.excinfo.value), "/".
- join(event.item.listnames())))
-
-class LocalReporter(AbstractReporter):
- def __init__(self, config, hosts=None):
- assert not hosts
- super(LocalReporter, self).__init__(config, hosts)
- self.failed = 0
- self.skipped = 0
- self.passed = 0
-
- def report_TestStarted(self, item):
- colitems = item.config.getcolitems()
- txt = " test process starts "
- self.out.sep("=", txt)
- self.timestart = item.timestart
- self.out.line("executable: %s (%s)" %
- (py.std.sys.executable, repr_pythonversion()))
- rev = py.__package__.getrev()
- self.out.line("using py lib: %s <rev %s>" % (
- py.path.local(py.__file__).dirpath(), rev))
- config = item.config
- if config.option.traceconfig or config.option.verbose:
-
- for x in colitems:
- self.out.line("test target: %s" %(x.fspath,))
-
- conftestmodules = config._conftest.getconftestmodules(None)
- for i,x in py.builtin.enumerate(conftestmodules):
- self.out.line("initial conf %d: %s" %(i, x.__file__))
-
- def get_item_name(self, event, colitem):
- return "/".join(colitem.listnames())
-
- def print_summary(self, total, skipped_str, failed_str):
- self.out.sep("=", " %d test run%s%s in %.2fs" %
- (total, skipped_str, failed_str, self.timeend - self.timestart))
-
- def report_DeselectedTest(self, event):
- #self.show_item(event.item, False)
- if isinstance(event.item, py.test2.collect.Module):
- self.out.write("- skipped (%s)" % event.excinfo.value)
- else:
- self.out.write("s")
- self.skipped_tests_outcome.append(event)
-
- def report_FailedTryiter(self, event):
- #self.show_item(event.item, False)
- self.out.write("- FAILED TO LOAD MODULE")
- self.failed_tests_outcome.append(event)
- self.failed += 1
-
- def report_ItemFinish(self, event):
- if event.outcome.passed:
- self.passed += 1
- self.out.write(".")
- elif event.outcome.skipped:
- self.skipped_tests_outcome.append(event)
- self.skipped += 1
- self.out.write("s")
- else:
- self.failed += 1
- self.failed_tests_outcome.append(event)
- self.out.write("F")
-
- def report_ItemStart(self, event):
- # XXX
- event.item.start = now()
- self.show_item(event.item)
-
- def show_item(self, item, count_elems = True):
- if isinstance(item, py.test2.collect.Module):
- self.show_Module(item)
- if self.config.option.verbose > 0 and\
- isinstance(item, py.test2.collect.Item):
- self.show_ItemVerbose(item)
-
- def show_ItemVerbose(self, item):
- realpath, lineno = item._getpathlineno()
- location = "%s:%d" % (realpath.basename, lineno+1)
- self.out.write("%-20s %s " % (location, item._getmodpath()))
-
- def show_Module(self, mod):
- lgt = len(list(mod._tryiter()))
- if self.config.option.verbose == 0:
- base = getrelpath(py.path.local(), mod.fspath)
- self.out.write("\n%s[%d] " % (base, lgt))
- else:
- self.out.line()
- self.out.line('+ testmodule: %s[%d]' % (mod.fspath, lgt))
-
- def gethost(self, event):
- return 'localhost'
-
- def hangs(self):
- pass
-
- def was_failure(self):
- return self.failed > 0
Deleted: /py/branch/event/py/test2/rsession/testing/test_rest.py
==============================================================================
--- /py/branch/event/py/test2/rsession/testing/test_rest.py Sun Feb 17 21:22:21 2008
+++ (empty file)
@@ -1,356 +0,0 @@
-
-""" tests of rest reporter backend
-"""
-
-import py
-
-py.test.skip("refactor ReST reporter tests")
-
-from py.__.test2.testing.test_reporter import AbstractTestReporter,\
- DummyChannel
-from py.__.test2 import repevent
-from py.__.test2.rsession.rest import RestReporter, NoLinkWriter
-from py.__.rest.rst import *
-from py.__.test2.rsession.hostmanage import HostInfo
-from py.__.test2.outcome import SerializableOutcome
-
-class Container(object):
- def __init__(self, **args):
- for arg, val in args.items():
- setattr(self, arg, val)
-
-class RestTestReporter(RestReporter):
- def __init__(self, *args, **kwargs):
- if args:
- super(RestReporter, self).__init__(*args, **kwargs)
-
-class TestRestUnits(object):
- def setup_method(self, method):
- config = py.test2.config._reparse(["some_sub"])
- config.option.verbose = False
- self.config = config
- hosts = [HostInfo('localhost')]
- method.im_func.func_globals['ch'] = DummyChannel(hosts[0])
- method.im_func.func_globals['reporter'] = r = RestReporter(config,
- hosts)
- method.im_func.func_globals['stdout'] = s = py.std.StringIO.StringIO()
- r.out = s # XXX will need to become a real reporter some time perhaps?
- r.linkwriter = NoLinkWriter()
-
- def test_report_unknown(self):
- self.config.option.verbose = True
- reporter.report_unknown('foo')
- assert stdout.getvalue() == 'Unknown report\\: foo\n\n'
- self.config.option.verbose = False
-
- def test_report_SendItem(self):
- event = repevent.SendItem(item='foo/bar.py', channel=ch)
- reporter.report(event)
- assert stdout.getvalue() == ''
- stdout.seek(0)
- stdout.truncate()
- reporter.config.option.verbose = True
- reporter.report(event)
- assert stdout.getvalue() == ('sending item foo/bar.py to '
- 'localhost\n\n')
-
- def test_report_HostRSyncing(self):
- event = repevent.HostRSyncing(HostInfo('localhost:/foo/bar'), "a",
- "b", False)
- reporter.report(event)
- assert stdout.getvalue() == ('::\n\n localhost: RSYNC ==> '
- '/foo/bar\n\n')
-
- def test_report_HostRSyncRootReady(self):
- h = HostInfo('localhost')
- reporter.hosts_to_rsync = 1
- reporter.report(repevent.HostGatewayReady(h, ["a"]))
- event = repevent.HostRSyncRootReady(h, "a")
- reporter.report(event)
- assert stdout.getvalue() == '::\n\n localhost: READY\n\n'
-
- def test_report_TestStarted(self):
- event = repevent.TestStarted([HostInfo('localhost'),
- HostInfo('foo.com')],
- "aa", ["a", "b"])
- reporter.report(event)
- assert stdout.getvalue() == """\
-===========================================
-Running tests on hosts\: localhost, foo.com
-===========================================
-
-"""
-
- def test_report_ItemStart(self):
- class FakeModule(py.test2.collect.Module):
- def __init__(self, parent):
- self.parent = parent
- self.fspath = py.path.local('.')
- def _tryiter(self):
- return ['test_foo', 'test_bar']
- def listnames(self):
- return ['package', 'foo', 'bar.py']
-
- parent = Container(parent=None, fspath=py.path.local('.'))
- event = repevent.ItemStart(item=FakeModule(parent))
- reporter.report(event)
- assert stdout.getvalue() == """\
-Testing module foo/bar.py (2 items)
------------------------------------
-
-"""
-
- def test_print_summary(self):
- reporter.timestart = 10
- reporter.timeend = 20
- reporter.timersync = 15
- reporter.print_summary(10, '', '')
- assert stdout.getvalue() == """\
-10 tests run in 10.00s (rsync\: 5.00)
--------------------------------------
-
-"""
-
- def test_ItemFinish_PASSED(self):
- outcome = SerializableOutcome()
- item = Container(listnames=lambda: ['', 'foo.py', 'bar', '()', 'baz'])
- event = repevent.ItemFinish(channel=ch, outcome=outcome, item=item)
- reporter.report(event)
- assert stdout.getvalue() == ('* localhost\\: **PASSED** '
- 'foo.py/bar()/baz\n\n')
-
- def test_ItemFinish_SKIPPED(self):
- outcome = SerializableOutcome(skipped="reason")
- item = Container(listnames=lambda: ['', 'foo.py', 'bar', '()', 'baz'])
- event = repevent.ItemFinish(channel=ch, outcome=outcome, item=item)
- reporter.report(event)
- assert stdout.getvalue() == ('* localhost\\: **SKIPPED** '
- 'foo.py/bar()/baz\n\n')
-
- def test_ItemFinish_FAILED(self):
- outcome = SerializableOutcome(excinfo="xxx")
- item = Container(listnames=lambda: ['', 'foo.py', 'bar', '()', 'baz'])
- event = repevent.ItemFinish(channel=ch, outcome=outcome, item=item)
- reporter.report(event)
- assert stdout.getvalue() == """\
-* localhost\: **FAILED** `traceback0`_ foo.py/bar()/baz
-
-"""
-
- def test_ItemFinish_FAILED_stdout(self):
- excinfo = Container(
- typename='FooError',
- value='A foo has occurred',
- traceback=[
- Container(
- path='foo/bar.py',
- lineno=1,
- relline=1,
- source='foo()',
- ),
- Container(
- path='foo/baz.py',
- lineno=4,
- relline=1,
- source='raise FooError("A foo has occurred")',
- ),
- ]
- )
- outcome = SerializableOutcome(excinfo=excinfo)
- outcome.stdout = '<printed>'
- outcome.stderr = ''
- parent = Container(parent=None, fspath=py.path.local('.'))
- item = Container(listnames=lambda: ['', 'foo.py', 'bar', '()', 'baz'],
- parent=parent, fspath=py.path.local('foo'))
- event = repevent.ItemFinish(channel=ch, outcome=outcome,
- item=item)
- reporter.report(event)
- reporter.timestart = 10
- reporter.timeend = 20
- reporter.timersync = 15
- reporter.print_summary(10, '', '')
-
- reporter.print_summary(1, 'skipped', 'failed')
- out = stdout.getvalue()
- assert out.find('<printed>') > -1
-
- def test_skips(self):
- class FakeOutcome(Container, repevent.ItemFinish):
- pass
-
- class FakeTryiter(Container, repevent.DeselectedTest):
- pass
-
- reporter.skips()
- assert stdout.getvalue() == ''
- reporter.skipped_tests_outcome = [
- FakeOutcome(outcome=Container(skipped='problem X'),
- item=Container(listnames=lambda: ['foo', 'bar.py'])),
- FakeTryiter(excinfo=Container(value='problem Y'),
- item=Container(listnames=lambda: ['foo', 'baz.py']))]
- reporter.skips()
- assert stdout.getvalue() == """\
-Reasons for skipped tests\:
-+++++++++++++++++++++++++++
-
-* foo/bar.py\: problem X
-
-* foo/baz.py\: problem Y
-
-"""
-
- def test_failures(self):
- class FakeOutcome(Container, repevent.ItemFinish):
- pass
-
- parent = Container(parent=None, fspath=py.path.local('.'))
- reporter.failed_tests_outcome = [
- FakeOutcome(
- outcome=Container(
- signal=False,
- excinfo=Container(
- typename='FooError',
- value='A foo has occurred',
- traceback=[
- Container(
- path='foo/bar.py',
- lineno=1,
- relline=1,
- source='foo()',
- ),
- Container(
- path='foo/baz.py',
- lineno=4,
- relline=1,
- source='raise FooError("A foo has occurred")',
- ),
- ]
- ),
- stdout='',
- stderr='',
- ),
- item=Container(
- listnames=lambda: ['package', 'foo', 'bar.py',
- 'baz', '()'],
- parent=parent,
- fspath=py.path.local('.'),
- ),
- channel=ch,
- ),
- ]
- reporter.config.option.tbstyle = 'no'
- reporter.failures()
- expected = """\
-Exceptions\:
-++++++++++++
-
-foo/bar.py/baz() on localhost
-+++++++++++++++++++++++++++++
-
-.. _`traceback0`:
-
-
-FooError
-++++++++
-
-::
-
- A foo has occurred
-
-"""
- assert stdout.getvalue() == expected
-
- reporter.config.option.tbstyle = 'short'
- stdout.seek(0)
- stdout.truncate()
- reporter.failures()
- expected = """\
-Exceptions\:
-++++++++++++
-
-foo/bar.py/baz() on localhost
-+++++++++++++++++++++++++++++
-
-.. _`traceback0`:
-
-
-::
-
- foo/bar.py line 1
- foo()
- foo/baz.py line 4
- raise FooError("A foo has occurred")
-
-FooError
-++++++++
-
-::
-
- A foo has occurred
-
-"""
- assert stdout.getvalue() == expected
-
- reporter.config.option.tbstyle = 'long'
- stdout.seek(0)
- stdout.truncate()
- reporter.failures()
- expected = """\
-Exceptions\:
-++++++++++++
-
-foo/bar.py/baz() on localhost
-+++++++++++++++++++++++++++++
-
-.. _`traceback0`:
-
-
-+++++++++++++++++
-foo/bar.py line 1
-+++++++++++++++++
-
-::
-
- foo()
-
-+++++++++++++++++
-foo/baz.py line 4
-+++++++++++++++++
-
-::
-
- raise FooError("A foo has occurred")
-
-FooError
-++++++++
-
-::
-
- A foo has occurred
-
-"""
- assert stdout.getvalue() == expected
-
-
-class TestRestReporter(AbstractTestReporter):
- reporter = RestReporter
-
- def get_hosts(self):
- return [HostInfo('localhost')]
-
- def test_failed_to_load(self):
- py.test.skip("Not implemented")
-
- def test_report_received_item_outcome(self):
- val = self.report_received_item_outcome()
- expected_list = [
- "**FAILED**",
- "**SKIPPED**",
- "**PASSED**",
- "* localhost\:",
- "`traceback0`_ test\_one.py/funcpass",
- "test\_one.py/funcpass"]
- for expected in expected_list:
- assert val.find(expected) != -1
-
-
Deleted: /py/branch/event/py/test2/rsession/testing/test_web.py
==============================================================================
--- /py/branch/event/py/test2/rsession/testing/test_web.py Sun Feb 17 21:22:21 2008
+++ (empty file)
@@ -1,90 +0,0 @@
-
-""" webtest
-"""
-
-import py
-
-def setup_module(mod):
- try:
- from pypy.translator.js.main import rpython2javascript
- from pypy.translator.js import commproxy
- mod.commproxy = commproxy
- mod.rpython2javascript = rpython2javascript
- except ImportError:
- py.test.skip("PyPy not found")
- mod.commproxy.USE_MOCHIKIT = False
- mod.rpython2javascript = rpython2javascript
- mod.commproxy = mod.commproxy
- from py.__.test2.rsession.web import TestHandler as _TestHandler
- from py.__.test2.rsession.web import MultiQueue
- mod._TestHandler = _TestHandler
- mod.MultiQueue = MultiQueue
-
-def test_js_generate():
- from py.__.test2.rsession import webjs
- from py.__.test2.rsession.web import FUNCTION_LIST, IMPORTED_PYPY
-
- source = rpython2javascript(webjs, FUNCTION_LIST, use_pdb=False)
- assert source
-
-def test_parse_args():
- class TestTestHandler(_TestHandler):
- def __init__(self):
- pass
- h = TestTestHandler()
- assert h.parse_args('foo=bar') == {'foo': 'bar'}
- assert h.parse_args('foo=bar%20baz') == {'foo': 'bar baz'}
- assert h.parse_args('foo%20bar=baz') == {'foo bar': 'baz'}
- assert h.parse_args('foo=bar%baz') == {'foo': 'bar\xbaz'}
- py.test2.raises(ValueError, 'h.parse_args("foo")')
-
-class TestMultiQueue(object):
- def test_get_one_sessid(self):
- mq = MultiQueue()
- mq.put(1)
- result = mq.get(1234)
- assert result == 1
-
- def test_get_two_sessid(self):
- mq = MultiQueue()
- mq.put(1)
- result = mq.get(1234)
- assert result == 1
- mq.put(2)
- result = mq.get(1234)
- assert result == 2
- result = mq.get(5678)
- assert result == 1
- result = mq.get(5678)
- assert result == 2
-
- def test_get_blocking(self):
- import thread
- result = []
- def getitem(mq, sessid):
- result.append(mq.get(sessid))
- mq = MultiQueue()
- thread.start_new_thread(getitem, (mq, 1234))
- assert not result
- mq.put(1)
- py.std.time.sleep(0.1)
- assert result == [1]
-
- def test_empty(self):
- mq = MultiQueue()
- assert mq.empty()
- mq.put(1)
- assert not mq.empty()
- result = mq.get(1234)
- result == 1
- assert mq.empty()
- mq.put(2)
- result = mq.get(4567)
- assert result == 1
- result = mq.get(1234)
- assert result == 2
- assert not mq.empty()
- result = mq.get(4567)
- assert result == 2
- assert mq.empty()
-
Deleted: /py/branch/event/py/test2/rsession/testing/test_webjs.py
==============================================================================
--- /py/branch/event/py/test2/rsession/testing/test_webjs.py Sun Feb 17 21:22:21 2008
+++ (empty file)
@@ -1,143 +0,0 @@
-import py
-
-def check(mod):
- try:
- import pypy
- from pypy.translator.js.modules import dom
- from pypy.translator.js.tester import schedule_callbacks
- dom.Window # check whether dom was properly imported or is just a
- # leftover in sys.modules
- except (ImportError, AttributeError):
- py.test.skip('PyPy not found')
- mod.dom = dom
- mod.schedule_callbacks = schedule_callbacks
-
- from py.__.test2.rsession import webjs
- from py.__.test2.rsession.web import exported_methods
- mod.webjs = webjs
- mod.exported_methods = exported_methods
- mod.here = py.magic.autopath().dirpath()
-
-def setup_module(mod):
- check(mod)
-
- # load HTML into window object
- html = here.join('../webdata/index.html').read()
- mod.html = html
- from pypy.translator.js.modules import dom
- mod.dom = dom
- dom.window = dom.Window(html)
- dom.document = dom.window.document
- from py.__.test2.rsession import webjs
- from py.__.test2.rsession.web import exported_methods
- mod.webjs = webjs
- mod.exported_methods = exported_methods
-
-def setup_function(f):
- dom.window = dom.Window(html)
- dom.document = dom.window.document
-
-def test_html_loaded():
- body = dom.window.document.getElementsByTagName('body')[0]
- assert len(body.childNodes) > 0
- assert str(body.childNodes[1].nodeName) == 'A'
-
-def test_set_msgbox():
- py.test.skip("not implemented in genjs")
- msgbox = dom.window.document.getElementById('messagebox')
- assert len(msgbox.childNodes) == 0
- webjs.set_msgbox('foo', 'bar')
- assert len(msgbox.childNodes) == 1
- assert msgbox.childNodes[0].nodeName == 'PRE'
- assert msgbox.childNodes[0].childNodes[0].nodeValue == 'foo\nbar'
-
-def test_show_info():
- info = dom.window.document.getElementById('info')
- info.style.visibility = 'hidden'
- info.innerHTML = ''
- webjs.show_info('foobar')
- content = info.innerHTML
- assert content == 'foobar'
- bgcolor = info.style.backgroundColor
- assert bgcolor == 'beige'
-
-def test_hide_info():
- info = dom.window.document.getElementById('info')
- info.style.visibility = 'visible'
- webjs.hide_info()
- assert info.style.visibility == 'hidden'
-
-def test_process():
- main_t = dom.window.document.getElementById('main_table')
- assert len(main_t.getElementsByTagName('tr')) == 0
- assert not webjs.process({})
-
- msg = {'type': 'ItemStart',
- 'itemtype': 'Module',
- 'itemname': 'foo.py',
- 'fullitemname': 'modules/foo.py',
- 'length': 10,
- }
- assert webjs.process(msg)
- trs = main_t.getElementsByTagName('tr')
- assert len(trs) == 1
- tr = trs[0]
- assert len(tr.childNodes) == 2
- assert tr.childNodes[0].nodeName == 'TD'
- assert tr.childNodes[0].innerHTML == 'foo.py[0/10]'
- assert tr.childNodes[1].nodeName == 'TD'
- assert tr.childNodes[1].childNodes[0].nodeName == 'TABLE'
- assert len(tr.childNodes[1].getElementsByTagName('tr')) == 0
-
-def test_process_two():
- main_t = dom.window.document.getElementById('main_table')
- msg = {'type': 'ItemStart',
- 'itemtype': 'Module',
- 'itemname': 'foo.py',
- 'fullitemname': 'modules/foo.py',
- 'length': 10,
- }
- webjs.process(msg)
- msg = {'type': 'ItemFinish',
- 'fullmodulename': 'modules/foo.py',
- 'passed' : 'True',
- 'fullitemname' : 'modules/foo.py/test_item',
- 'hostkey': None,
- }
- webjs.process(msg)
- trs = main_t.getElementsByTagName('tr')
- tds = trs[0].getElementsByTagName('td')
- # two cells in the row, one in the table inside one of the cells
- assert len(tds) == 3
- html = tds[0].innerHTML
- assert html == 'foo.py[1/10]'
- assert tds[2].innerHTML == '.'
-
-def test_signal():
- main_t = dom.window.document.getElementById('main_table')
- msg = {'type': 'ItemStart',
- 'itemtype': 'Module',
- 'itemname': 'foo.py',
- 'fullitemname': 'modules/foo.py',
- 'length': 10,
- }
- webjs.process(msg)
- msg = {'type': 'ItemFinish',
- 'fullmodulename': 'modules/foo.py',
- 'passed' : 'False',
- 'fullitemname' : 'modules/foo.py/test_item',
- 'hostkey': None,
- 'signal': '10',
- 'skipped': 'False',
- }
- exported_methods.fail_reasons['modules/foo.py/test_item'] = 'Received signal 10'
- exported_methods.stdout['modules/foo.py/test_item'] = ''
- exported_methods.stderr['modules/foo.py/test_item'] = ''
- webjs.process(msg)
- schedule_callbacks(exported_methods)
- # ouch
- assert dom.document.getElementById('modules/foo.py').childNodes[0].\
- childNodes[0].childNodes[0].childNodes[0].nodeValue == 'F'
-
-# XXX: Write down test for full run
-
Deleted: /py/branch/event/py/test2/rsession/web.py
==============================================================================
--- /py/branch/event/py/test2/rsession/web.py Sun Feb 17 21:22:21 2008
+++ (empty file)
@@ -1,473 +0,0 @@
-
-""" web server for py.test
-"""
-
-from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
-
-import thread, threading
-import re
-import time
-import random
-import Queue
-import os
-import sys
-import socket
-
-import py
-from py.__.test2.rsession.rsession import RSession
-from py.__.test2 import repevent
-from py.__.test2 import collect
-from py.__.test2.rsession.webdata import json
-
-DATADIR = py.path.local(__file__).dirpath("webdata")
-FUNCTION_LIST = ["main", "show_skip", "show_traceback", "show_info", "hide_info",
- "show_host", "hide_host", "hide_messagebox", "opt_scroll"]
-
-try:
- from pypy.rpython.ootypesystem.bltregistry import MethodDesc, BasicExternal
- from pypy.translator.js.main import rpython2javascript
- from pypy.translator.js import commproxy
- from pypy.translator.js.lib.support import callback
-
- commproxy.USE_MOCHIKIT = False
- IMPORTED_PYPY = True
-except (ImportError, NameError):
- class BasicExternal(object):
- pass
-
- def callback(*args, **kwargs):
- def decorator(func):
- return func
- return decorator
-
- IMPORTED_PYPY = False
-
-def add_item(event):
- """ A little helper
- """
- item = event.item
- itemtype = item.__class__.__name__
- itemname = item.name
- fullitemname = "/".join(item.listnames())
- d = {'fullitemname': fullitemname, 'itemtype': itemtype,
- 'itemname': itemname}
- #if itemtype == 'Module':
- try:
- d['length'] = str(len(list(event.item._tryiter())))
- except:
- d['length'] = "?"
- return d
-
-class MultiQueue(object):
- """ a tailor-made queue (internally using Queue) for py.test2.rsession.web
-
- API-wise the main difference is that the get() method gets a sessid
- argument, which is used to determine what data to feed to the client
-
- when a data queue for a sessid doesn't yet exist, it is created, and
- filled with data that has already been fed to the other clients
- """
- def __init__(self):
- self._cache = []
- self._session_queues = {}
- self._lock = py.std.thread.allocate_lock()
-
- def put(self, item):
- self._lock.acquire()
- try:
- self._cache.append(item)
- for key, q in self._session_queues.items():
- q.put(item)
- finally:
- self._lock.release()
-
- def _del(self, sessid):
- self._lock.acquire()
- try:
- del self._session_queues[sessid]
- finally:
- self._lock.release()
-
- def get(self, sessid):
- self._lock.acquire()
- try:
- if not sessid in self._session_queues:
- self._create_session_queue(sessid)
- finally:
- self._lock.release()
- return self._session_queues[sessid].get(sessid)
-
- def empty(self):
- self._lock.acquire()
- try:
- if not self._session_queues:
- return not len(self._cache)
- for q in self._session_queues.values():
- if not q.empty():
- return False
- finally:
- self._lock.release()
- return True
-
- def empty_queue(self, sessid):
- return self._session_queues[sessid].empty()
-
- def _create_session_queue(self, sessid):
- self._session_queues[sessid] = q = Queue.Queue()
- for item in self._cache:
- q.put(item)
-
-class ExportedMethods(BasicExternal):
- _render_xmlhttp = True
- def __init__(self):
- self.pending_events = MultiQueue()
- self.start_event = threading.Event()
- self.end_event = threading.Event()
- self.skip_reasons = {}
- self.fail_reasons = {}
- self.stdout = {}
- self.stderr = {}
- self.all = 0
- self.to_rsync = {}
-
- def findmodule(self, item):
- # find the most outwards parent which is module
- current = item
- while current:
- if isinstance(current, collect.Module):
- break
- current = current.parent
-
- if current is not None:
- return str(current.name), str("/".join(current.listnames()))
- else:
- return str(item.parent.name), str("/".join(item.parent.listnames()))
-
- def show_hosts(self):
- self.start_event.wait()
- to_send = {}
- for host in self.hosts:
- to_send[host.hostid] = host.hostname
- return to_send
- show_hosts = callback(retval={str:str})(show_hosts)
-
- def show_skip(self, item_name="aa"):
- return {'item_name': item_name,
- 'reason': self.skip_reasons[item_name]}
- show_skip = callback(retval={str:str})(show_skip)
-
- def show_fail(self, item_name="aa"):
- return {'item_name':item_name,
- 'traceback':str(self.fail_reasons[item_name]),
- 'stdout':self.stdout[item_name],
- 'stderr':self.stderr[item_name]}
- show_fail = callback(retval={str:str})(show_fail)
-
- _sessids = None
- _sesslock = py.std.thread.allocate_lock()
- def show_sessid(self):
- if not self._sessids:
- self._sessids = []
- self._sesslock.acquire()
- try:
- while 1:
- sessid = ''.join(py.std.random.sample(
- py.std.string.lowercase, 8))
- if sessid not in self._sessids:
- self._sessids.append(sessid)
- break
- finally:
- self._sesslock.release()
- return sessid
- show_sessid = callback(retval=str)(show_sessid)
-
- def failed(self, **kwargs):
- if not 'sessid' in kwargs:
- return
- sessid = kwargs['sessid']
- to_del = -1
- for num, i in enumerate(self._sessids):
- if i == sessid:
- to_del = num
- if to_del != -1:
- del self._sessids[to_del]
- self.pending_events._del(kwargs['sessid'])
-
- def show_all_statuses(self, sessid='xx'):
- retlist = [self.show_status_change(sessid)]
- while not self.pending_events.empty_queue(sessid):
- retlist.append(self.show_status_change(sessid))
- retval = retlist
- return retval
- show_all_statuses = callback(retval=[{str:str}])(show_all_statuses)
-
- def show_status_change(self, sessid):
- event = self.pending_events.get(sessid)
- if event is None:
- self.end_event.set()
- return {}
- # some dispatcher here
- if isinstance(event, repevent.ItemFinish):
- args = {}
- outcome = event.outcome
- for key, val in outcome.__dict__.iteritems():
- args[key] = str(val)
- args.update(add_item(event))
- mod_name, mod_fullname = self.findmodule(event.item)
- args['modulename'] = str(mod_name)
- args['fullmodulename'] = str(mod_fullname)
- fullitemname = args['fullitemname']
- if outcome.skipped:
- self.skip_reasons[fullitemname] = self.repr_failure_tblong(
- event.item,
- outcome.skipped,
- outcome.skipped.traceback)
- elif outcome.excinfo:
- self.fail_reasons[fullitemname] = self.repr_failure_tblong(
- event.item, outcome.excinfo, outcome.excinfo.traceback)
- self.stdout[fullitemname] = outcome.stdout
- self.stderr[fullitemname] = outcome.stderr
- elif outcome.signal:
- self.fail_reasons[fullitemname] = "Received signal %d" % outcome.signal
- self.stdout[fullitemname] = outcome.stdout
- self.stderr[fullitemname] = outcome.stderr
- if event.channel:
- args['hostkey'] = event.channel.gateway.host.hostid
- else:
- args['hostkey'] = ''
- elif isinstance(event, repevent.ItemStart):
- args = add_item(event)
- elif isinstance(event, repevent.TestSessionFinish):
- args = {}
- args['run'] = str(self.all)
- args['fails'] = str(len(self.fail_reasons))
- args['skips'] = str(len(self.skip_reasons))
- elif isinstance(event, repevent.SendItem):
- args = add_item(event)
- args['hostkey'] = event.channel.gateway.host.hostid
- elif isinstance(event, repevent.HostRSyncRootReady):
- self.ready_hosts[event.host] = True
- args = {'hostname' : event.host.hostname, 'hostkey' : event.host.hostid}
- elif isinstance(event, repevent.FailedTryiter):
- args = add_item(event)
- elif isinstance(event, repevent.DeselectedTest):
- args = add_item(event)
- args['reason'] = str(event.excinfo.value)
- else:
- args = {}
- args['event'] = str(event)
- args['type'] = event.__class__.__name__
- return args
-
- def repr_failure_tblong(self, item, excinfo, traceback):
- lines = []
- for index, entry in py.builtin.enumerate(traceback):
- lines.append('----------')
- lines.append("%s: %s" % (entry.path, entry.lineno))
- lines += self.repr_source(entry.relline, entry.source)
- lines.append("%s: %s" % (excinfo.typename, excinfo.value))
- return "\n".join(lines)
-
- def repr_source(self, relline, source):
- lines = []
- for num, line in enumerate(str(source).split("\n")):
- if num == relline:
- lines.append(">>>>" + line)
- else:
- lines.append(" " + line)
- return lines
-
- def report_ItemFinish(self, event):
- self.all += 1
- self.pending_events.put(event)
-
- def report_FailedTryiter(self, event):
- fullitemname = "/".join(event.item.listnames())
- self.fail_reasons[fullitemname] = self.repr_failure_tblong(
- event.item, event.excinfo, event.excinfo.traceback)
- self.stdout[fullitemname] = ''
- self.stderr[fullitemname] = ''
- self.pending_events.put(event)
-
- def report_ItemStart(self, event):
- if isinstance(event.item, py.test2.collect.Module):
- self.pending_events.put(event)
-
- def report_unknown(self, event):
- # XXX: right now, we just pass it for showing
- self.pending_events.put(event)
-
- def _host_ready(self, event):
- self.pending_events.put(event)
-
- def report_HostGatewayReady(self, item):
- self.to_rsync[item.host] = len(item.roots)
-
- def report_HostRSyncRootReady(self, item):
- self.to_rsync[item.host] -= 1
- if not self.to_rsync[item.host]:
- self._host_ready(item)
-
- def report_TestStarted(self, event):
- # XXX: It overrides our self.hosts
- self.hosts = {}
- self.ready_hosts = {}
- if not event.hosts:
- self.hosts = []
- else:
- for host in event.hosts:
- self.hosts[host] = host
- self.ready_hosts[host] = False
- self.start_event.set()
- self.pending_events.put(event)
-
- def report_TestSessionFinish(self, event):
- self.pending_events.put(event)
- kill_server()
-
- report_InterruptedExecution = report_TestSessionFinish
- report_CrashedExecution = report_TestSessionFinish
-
- def report(self, what):
- repfun = getattr(self, "report_" + what.__class__.__name__,
- self.report_unknown)
- try:
- repfun(what)
- except (KeyboardInterrupt, SystemExit):
- raise
- except:
- print "Internal reporting problem"
- excinfo = py.code.ExceptionInfo()
- for i in excinfo.traceback:
- print str(i)[2:-1]
- print excinfo
-
-exported_methods = ExportedMethods()
-
-class TestHandler(BaseHTTPRequestHandler):
- exported_methods = exported_methods
-
- def do_GET(self):
- path = self.path
- if path.endswith("/"):
- path = path[:-1]
- if path.startswith("/"):
- path = path[1:]
- m = re.match('^(.*)\?(.*)$', path)
- if m:
- path = m.group(1)
- getargs = m.group(2)
- else:
- getargs = ""
- name_path = path.replace(".", "_")
- method_to_call = getattr(self, "run_" + name_path, None)
- if method_to_call is None:
- exec_meth = getattr(self.exported_methods, name_path, None)
- if exec_meth is None:
- self.send_error(404, "File %s not found" % path)
- else:
- try:
- self.serve_data('text/json',
- json.write(exec_meth(**self.parse_args(getargs))))
- except socket.error:
- # client happily disconnected
- exported_methods.failed(**self.parse_args(getargs))
- else:
- method_to_call()
-
- def parse_args(self, getargs):
- # parse get argument list
- if getargs == "":
- return {}
-
- unquote = py.std.urllib.unquote
- args = {}
- arg_pairs = getargs.split("&")
- for arg in arg_pairs:
- key, value = arg.split("=")
- args[unquote(key)] = unquote(value)
- return args
-
- def log_message(self, format, *args):
- # XXX just discard it
- pass
-
- do_POST = do_GET
-
- def run_(self):
- self.run_index()
-
- def run_index(self):
- data = py.path.local(DATADIR).join("index.html").open().read()
- self.serve_data("text/html", data)
-
- def run_jssource(self):
- js_name = py.path.local(__file__).dirpath("webdata").join("source.js")
- web_name = py.path.local(__file__).dirpath().join("webjs.py")
- if IMPORTED_PYPY and web_name.mtime() > js_name.mtime() or \
- (not js_name.check()):
- from py.__.test2.rsession import webjs
-
- javascript_source = rpython2javascript(webjs,
- FUNCTION_LIST, use_pdb=False)
- open(str(js_name), "w").write(javascript_source)
- self.serve_data("text/javascript", javascript_source)
- else:
- js_source = open(str(js_name), "r").read()
- self.serve_data("text/javascript", js_source)
-
- def serve_data(self, content_type, data):
- self.send_response(200)
- self.send_header("Content-type", content_type)
- self.send_header("Content-length", len(data))
- self.end_headers()
- self.wfile.write(data)
-
-class WebReporter(object):
- """ A simple wrapper, this file needs ton of refactoring
- anyway, so this is just to satisfy things below
- (and start to create saner interface as well)
- """
- def __init__(self, config, hosts):
- start_server_from_config(config)
-
- def was_failure(self):
- return sum(exported_methods.fail_reasons.values()) > 0
-
- # rebind
- report = exported_methods.report
- __call__ = report
-
-def start_server_from_config(config):
- if config.option.runbrowser:
- port = socket.INADDR_ANY
- else:
- port = 8000
-
- httpd = start_server(server_address = ('', port))
- port = httpd.server_port
- if config.option.runbrowser:
- import webbrowser, thread
- # webbrowser.open() may block until the browser finishes or not
- url = "http://localhost:%d" % (port,)
- thread.start_new_thread(webbrowser.open, (url,))
-
- return exported_methods.report
-
-def start_server(server_address = ('', 8000), handler=TestHandler, start_new=True):
- httpd = HTTPServer(server_address, handler)
-
- if start_new:
- thread.start_new_thread(httpd.serve_forever, ())
- print "Server started, listening on port %d" % (httpd.server_port,)
- return httpd
- else:
- print "Server started, listening on port %d" % (httpd.server_port,)
- httpd.serve_forever()
-
-def kill_server():
- exported_methods.pending_events.put(None)
- while not exported_methods.pending_events.empty():
- time.sleep(.1)
- exported_methods.end_event.wait()
-
Deleted: /py/branch/event/py/test2/rsession/webjs.py
==============================================================================
--- /py/branch/event/py/test2/rsession/webjs.py Sun Feb 17 21:22:21 2008
+++ (empty file)
@@ -1,355 +0,0 @@
-
-""" javascript source for py.test2 distributed
-"""
-
-import py
-from py.__.test2.rsession.web import exported_methods
-try:
- from pypy.translator.js.modules import dom
- from pypy.translator.js.helper import __show_traceback
-except ImportError:
- py.test2.skip("PyPy not found")
-
-def create_elem(s):
- return dom.document.createElement(s)
-
-def get_elem(el):
- return dom.document.getElementById(el)
-
-def create_text_elem(txt):
- return dom.document.createTextNode(txt)
-
-tracebacks = {}
-skips = {}
-counters = {}
-max_items = {}
-short_item_names = {}
-
-MAX_COUNTER = 30 # Maximal size of one-line table
-
-class Globals(object):
- def __init__(self):
- self.pending = []
- self.host = ""
- self.data_empty = True
-
-glob = Globals()
-
-class Options(object):
- """ Store global options
- """
- def __init__(self):
- self.scroll = True
-
-opts = Options()
-
-def comeback(msglist):
- if len(msglist) == 0:
- return
- for item in glob.pending[:]:
- if not process(item):
- return
- glob.pending = []
- for msg in msglist:
- if not process(msg):
- return
- exported_methods.show_all_statuses(glob.sessid, comeback)
-
-def show_info(data="aa"):
- info = dom.document.getElementById("info")
- info.style.visibility = "visible"
- while len(info.childNodes):
- info.removeChild(info.childNodes[0])
- txt = create_text_elem(data)
- info.appendChild(txt)
- info.style.backgroundColor = "beige"
- # XXX: Need guido
-
-def hide_info():
- info = dom.document.getElementById("info")
- info.style.visibility = "hidden"
-
-def show_interrupt():
- glob.finished = True
- dom.document.title = "Py.test [interrupted]"
- dom.document.getElementById("Tests").childNodes[0].nodeValue = "Tests [interrupted]"
-
-def show_crash():
- glob.finished = True
- dom.document.title = "Py.test [crashed]"
- dom.document.getElementById("Tests").childNodes[0].nodeValue = "Tests [crashed]"
-
-SCROLL_LINES = 50
-
-def opt_scroll():
- if opts.scroll:
- opts.scroll = False
- else:
- opts.scroll = True
-
-def scroll_down_if_needed(mbox):
- if not opts.scroll:
- return
- #if dom.window.scrollMaxY - dom.window.scrollY < SCROLL_LINES:
- mbox.parentNode.scrollIntoView()
-
-def hide_messagebox():
- mbox = dom.document.getElementById("messagebox")
- while mbox.childNodes:
- mbox.removeChild(mbox.childNodes[0])
-
-def make_module_box(msg):
- tr = create_elem("tr")
- td = create_elem("td")
- tr.appendChild(td)
- td.appendChild(create_text_elem("%s[0/%s]" % (msg['itemname'],
- msg['length'])))
- max_items[msg['fullitemname']] = int(msg['length'])
- short_item_names[msg['fullitemname']] = msg['itemname']
- td.id = '_txt_' + msg['fullitemname']
- #tr.setAttribute("id", msg['fullitemname'])
- td.setAttribute("onmouseover",
- "show_info('%s')" % (msg['fullitemname'],))
- td.setAttribute("onmouseout", "hide_info()")
- td2 = create_elem('td')
- tr.appendChild(td2)
- table = create_elem("table")
- td2.appendChild(table)
- tbody = create_elem('tbody')
- tbody.id = msg['fullitemname']
- table.appendChild(tbody)
- counters[msg['fullitemname']] = 0
- return tr
-
-def add_received_item_outcome(msg, module_part):
- if msg['hostkey']:
- host_elem = dom.document.getElementById(msg['hostkey'])
- glob.host_pending[msg['hostkey']].pop()
- count = len(glob.host_pending[msg['hostkey']])
- host_elem.childNodes[0].nodeValue = '%s[%s]' % (
- glob.host_dict[msg['hostkey']], count)
-
- td = create_elem("td")
- td.setAttribute("onmouseover", "show_info('%s')" % (
- msg['fullitemname'],))
- td.setAttribute("onmouseout", "hide_info()")
- item_name = msg['fullitemname']
- # TODO: dispatch output
- if msg["passed"] == 'True':
- txt = create_text_elem(".")
- td.appendChild(txt)
- elif msg["skipped"] != 'None' and msg["skipped"] != "False":
- exported_methods.show_skip(item_name, skip_come_back)
- link = create_elem("a")
- link.setAttribute("href", "javascript:show_skip('%s')" % (
- msg['fullitemname'],))
- txt = create_text_elem('s')
- link.appendChild(txt)
- td.appendChild(link)
- else:
- link = create_elem("a")
- link.setAttribute("href", "javascript:show_traceback('%s')" % (
- msg['fullitemname'],))
- txt = create_text_elem('F')
- link.setAttribute('class', 'error')
- link.appendChild(txt)
- td.appendChild(link)
- exported_methods.show_fail(item_name, fail_come_back)
-
- if counters[msg['fullmodulename']] == 0:
- tr = create_elem("tr")
- module_part.appendChild(tr)
-
- name = msg['fullmodulename']
- counters[name] += 1
- counter_part = get_elem('_txt_' + name)
- newcontent = "%s[%d/%d]" % (short_item_names[name], counters[name],
- max_items[name])
- counter_part.childNodes[0].nodeValue = newcontent
- module_part.childNodes[-1].appendChild(td)
-
-def process(msg):
- if len(msg) == 0:
- return False
- elem = dom.document.getElementById("testmain")
- #elem.innerHTML += '%s<br/>' % msg['event']
- main_t = dom.document.getElementById("main_table")
- if msg['type'] == 'ItemStart':
- # we start a new directory or what
- #if msg['itemtype'] == 'Module':
- tr = make_module_box(msg)
- main_t.appendChild(tr)
- elif msg['type'] == 'SendItem':
- host_elem = dom.document.getElementById(msg['hostkey'])
- glob.host_pending[msg['hostkey']].insert(0, msg['fullitemname'])
- count = len(glob.host_pending[msg['hostkey']])
- host_elem.childNodes[0].nodeValue = '%s[%s]' % (
- glob.host_dict[msg['hostkey']], count)
-
- elif msg['type'] == 'HostRSyncRootReady':
- host_elem = dom.document.getElementById(msg['hostkey'])
- host_elem.style.background = \
- "#00ff00"
- host_elem.childNodes[0].nodeValue = '%s[0]' % (
- glob.host_dict[msg['hostkey']],)
- elif msg['type'] == 'ItemFinish':
- module_part = get_elem(msg['fullmodulename'])
- if not module_part:
- glob.pending.append(msg)
- return True
-
- add_received_item_outcome(msg, module_part)
- elif msg['type'] == 'TestSessionFinish':
- text = "FINISHED %s run, %s failures, %s skipped" % (msg['run'], msg['fails'], msg['skips'])
- glob.finished = True
- dom.document.title = "Py.test %s" % text
- dom.document.getElementById("Tests").childNodes[0].nodeValue = \
- "Tests [%s]" % text
- elif msg['type'] == 'FailedTryiter':
- module_part = get_elem(msg['fullitemname'])
- if not module_part:
- glob.pending.append(msg)
- return True
- tr = create_elem("tr")
- td = create_elem("td")
- a = create_elem("a")
- a.setAttribute("href", "javascript:show_traceback('%s')" % (
- msg['fullitemname'],))
- txt = create_text_elem("- FAILED TO LOAD MODULE")
- a.appendChild(txt)
- td.appendChild(a)
- tr.appendChild(td)
- module_part.appendChild(tr)
- item_name = msg['fullitemname']
- exported_methods.show_fail(item_name, fail_come_back)
- elif msg['type'] == 'DeselectedTest':
- module_part = get_elem(msg['fullitemname'])
- if not module_part:
- glob.pending.append(msg)
- return True
- tr = create_elem("tr")
- td = create_elem("td")
- txt = create_text_elem("- skipped (%s)" % (msg['reason'],))
- td.appendChild(txt)
- tr.appendChild(td)
- module_part.appendChild(tr)
- elif msg['type'] == 'RsyncFinished':
- glob.rsync_done = True
- elif msg['type'] == 'InterruptedExecution':
- show_interrupt()
- elif msg['type'] == 'CrashedExecution':
- show_crash()
- if glob.data_empty:
- mbox = dom.document.getElementById('messagebox')
- scroll_down_if_needed(mbox)
- return True
-
-def show_skip(item_name="aa"):
- set_msgbox(item_name, skips[item_name])
-
-def set_msgbox(item_name, data):
- msgbox = get_elem("messagebox")
- while len(msgbox.childNodes):
- msgbox.removeChild(msgbox.childNodes[0])
- pre = create_elem("pre")
- txt = create_text_elem(item_name + "\n" + data)
- pre.appendChild(txt)
- msgbox.appendChild(pre)
- dom.window.location.assign("#message")
- glob.data_empty = False
-
-def show_traceback(item_name="aa"):
- data = ("====== Traceback: =========\n%s\n======== Stdout: ========\n%s\n"
- "========== Stderr: ==========\n%s\n" % tracebacks[item_name])
- set_msgbox(item_name, data)
-
-def fail_come_back(msg):
- tracebacks[msg['item_name']] = (msg['traceback'], msg['stdout'],
- msg['stderr'])
-
-def skip_come_back(msg):
- skips[msg['item_name']] = msg['reason']
-
-def reshow_host():
- if glob.host == "":
- return
- show_host(glob.host)
-
-def show_host(host_name="aa"):
- elem = dom.document.getElementById("jobs")
- if elem.childNodes:
- elem.removeChild(elem.childNodes[0])
- tbody = create_elem("tbody")
- for item in glob.host_pending[host_name]:
- tr = create_elem("tr")
- td = create_elem("td")
- td.appendChild(create_text_elem(item))
- tr.appendChild(td)
- tbody.appendChild(tr)
- elem.appendChild(tbody)
- elem.style.visibility = "visible"
- glob.host = host_name
- dom.setTimeout(reshow_host, 100)
-
-def hide_host():
- elem = dom.document.getElementById("jobs")
- while len(elem.childNodes):
- elem.removeChild(elem.childNodes[0])
- elem.style.visibility = "hidden"
- glob.host = ""
-
-def update_rsync():
- if glob.finished:
- return
- elem = dom.document.getElementById("Tests")
- if glob.rsync_done is True:
- elem.childNodes[0].nodeValue = "Tests"
- return
- text = "Rsyncing" + '.' * glob.rsync_dots
- glob.rsync_dots += 1
- if glob.rsync_dots > 5:
- glob.rsync_dots = 0
- elem.childNodes[0].nodeValue = "Tests [%s]" % text
- dom.setTimeout(update_rsync, 1000)
-
-def host_init(host_dict):
- tbody = dom.document.getElementById("hostsbody")
- for host in host_dict.keys():
- tr = create_elem('tr')
- tbody.appendChild(tr)
- td = create_elem("td")
- td.style.background = "#ff0000"
- txt = create_text_elem(host_dict[host])
- td.appendChild(txt)
- td.id = host
- tr.appendChild(td)
- td.setAttribute("onmouseover", "show_host('%s')" % host)
- td.setAttribute("onmouseout", "hide_host()")
- glob.rsync_dots = 0
- glob.rsync_done = False
- dom.setTimeout(update_rsync, 1000)
- glob.host_dict = host_dict
- glob.host_pending = {}
- for key in host_dict.keys():
- glob.host_pending[key] = []
-
-def key_pressed(key):
- if key.charCode == ord('s'):
- scroll_box = dom.document.getElementById("opt_scroll")
- if opts.scroll:
- scroll_box.removeAttribute("checked")
- opts.scroll = False
- else:
- scroll_box.setAttribute("checked", "true")
- opts.scroll = True
-
-def sessid_comeback(id):
- glob.sessid = id
- exported_methods.show_all_statuses(id, comeback)
-
-def main():
- glob.finished = False
- exported_methods.show_hosts(host_init)
- exported_methods.show_sessid(sessid_comeback)
- dom.document.onkeypress = key_pressed
- dom.document.getElementById("opt_scroll").setAttribute("checked", "True")
Deleted: /py/branch/event/py/test2/testing/test_reporter.py
==============================================================================
--- /py/branch/event/py/test2/testing/test_reporter.py Sun Feb 17 21:22:21 2008
+++ (empty file)
@@ -1,282 +0,0 @@
-""" reporter tests.
-
-XXX there are a few disabled reporting tests because
-they test for exact formatting as far as i can see.
-I think it's rather better to directly invoke a
-reporter and pass it some hand-prepared events to see
-that running the reporter doesn't break shallowly.
-
-Otherwise, i suppose that some "visual" testing can usually be driven
-manually by user-input. And when passing particular events
-to a reporter it's also easier to check for one line
-instead of having to know the order in which things are printed
-etc.
-
-
-"""
-
-
-import py, os
-
-py.test.skip("rewrite reporter tests completely, take existing as hints")
-
-from py.__.test2.session import AbstractSession, itemgen
-from py.__.test2.reporter import RemoteReporter, LocalReporter, choose_reporter
-from py.__.test2 import repevent
-from py.__.test2.outcome import ReprOutcome, SerializableOutcome
-from py.__.test2.rsession.hostmanage import HostInfo
-from py.__.test2.box import Box
-from py.__.test2.rsession.testing.basetest import BasicRsessionTest
-import sys
-from StringIO import StringIO
-
-class MockSession(object):
- def __init__(self, reporter):
- self.reporter = reporter
-
- def start(self, item):
- self.reporter(repevent.ItemStart(item))
-
- def finish(self, item):
- pass
-
-class DummyGateway(object):
- def __init__(self, host):
- self.host = host
-
-class DummyChannel(object):
- def __init__(self, host):
- self.gateway = DummyGateway(host)
-
-class AbstractTestReporter(BasicRsessionTest):
- def prepare_outcomes(self):
- # possible outcomes
- try:
- 1/0
- except:
- exc = py.code.ExceptionInfo()
-
- try:
- py.test.skip("xxx")
- except:
- skipexc = py.code.ExceptionInfo()
-
- outcomes = [SerializableOutcome(()),
- SerializableOutcome(skipped=skipexc),
- SerializableOutcome(excinfo=exc),
- SerializableOutcome()]
-
- outcomes = [ReprOutcome(outcome.make_repr()) for outcome in outcomes]
- outcomes[3].signal = 11
- outcomes[0].passed = False
-
- return outcomes
-
- def report_received_item_outcome(self):
- item = self.getexample("pass")
- outcomes = self.prepare_outcomes()
-
- def boxfun(config, item, outcomes):
- hosts = self.get_hosts()
- r = self.reporter(config, hosts)
- if hosts:
- ch = DummyChannel(hosts[0])
- else:
- ch = None
- for outcome in outcomes:
- r.report(repevent.ItemFinish(ch, item, outcome))
-
- cap = py.io.StdCaptureFD()
- boxfun(self.config, item, outcomes)
- out, err = cap.reset()
- assert not err
- return out
-
- def _test_module(self):
- funcitem = self.getexample("pass")
- moditem = self.getmod()
- outcomes = self.prepare_outcomes()
-
- def boxfun(config, item, funcitem, outcomes):
- hosts = self.get_hosts()
- r = self.reporter(config, hosts)
- r.report(repevent.ItemStart(item))
- if hosts:
- ch = DummyChannel(hosts[0])
- else:
- ch = None
- for outcome in outcomes:
- r.report(repevent.ItemFinish(ch, funcitem, outcome))
-
- cap = py.io.StdCaptureFD()
- boxfun(self.config, moditem, funcitem, outcomes)
- out, err = cap.reset()
- assert not err
- return out
-
- def _test_full_module(self):
- tmpdir = py.test2.ensuretemp("repmod")
- tmpdir.ensure("__init__.py")
- tmpdir.ensure("test_one.py").write(py.code.Source("""
- def test_x():
- pass
- """))
- tmpdir.ensure("test_two.py").write(py.code.Source("""
- import py
- py.test.skip("reason")
- """))
- tmpdir.ensure("test_three.py").write(py.code.Source("""
- sadsadsa
- """))
-
- def boxfun():
- config = py.test2.config._reparse([str(tmpdir)])
- rootcol = py.test2.collect.Directory(tmpdir)
- hosts = self.get_hosts()
- r = self.reporter(config, hosts)
- list(itemgen(MockSession(r), [rootcol], r.report))
-
- cap = py.io.StdCaptureFD()
- boxfun()
- out, err = cap.reset()
- assert not err
- return out
-
- def test_failed_to_load(self):
- tmpdir = py.test2.ensuretemp("failedtoload")
- tmpdir.ensure("__init__.py")
- tmpdir.ensure("test_three.py").write(py.code.Source("""
- sadsadsa
- """))
- def boxfun():
- config = py.test2.config._reparse([str(tmpdir)])
- rootcol = py.test2.collect.Directory(tmpdir)
- hosts = self.get_hosts()
- r = self.reporter(config, hosts)
- r.report(repevent.TestStarted(hosts, config, ["a"]))
- r.report(repevent.RsyncFinished())
- list(itemgen(MockSession(r), [rootcol], r.report))
- r.report(repevent.TestSessionFinish())
- return r
-
- cap = py.io.StdCaptureFD()
- r = boxfun()
- out, err = cap.reset()
- assert not err
- assert out.find("1 failed in") != -1
- assert out.find("NameError: name 'sadsadsa' is not defined") != -1
-
- def _test_verbose(self):
- tmpdir = py.test2.ensuretemp("reporterverbose")
- tmpdir.ensure("__init__.py")
- tmpdir.ensure("test_one.py").write("def test_x(): pass")
- cap = py.io.StdCaptureFD()
- config = py.test2.config._reparse([str(tmpdir), '-v'])
- hosts = self.get_hosts()
- r = self.reporter(config, hosts)
- r.report(repevent.TestStarted(hosts, config, []))
- r.report(repevent.RsyncFinished())
- rootcol = py.test2.collect.Directory(tmpdir)
- list(itemgen(MockSession(r), [rootcol], r.report))
- r.report(repevent.TestSessionFinish())
- out, err = cap.reset()
- assert not err
- for i in ['+ testmodule:', 'test_one.py[1]']: # XXX finish
- assert i in out
-
- def _test_still_to_go(self):
- tmpdir = py.test2.ensuretemp("stilltogo")
- tmpdir.ensure("__init__.py")
- cap = py.io.StdCaptureFD()
- config = py.test2.config._reparse([str(tmpdir)])
- hosts = [HostInfo(i) for i in ["host1", "host2", "host3"]]
- for host in hosts:
- host.gw_remotepath = ''
- r = self.reporter(config, hosts)
- r.report(repevent.TestStarted(hosts, config, ["a", "b", "c"]))
- for host in hosts:
- r.report(repevent.HostGatewayReady(host, ["a", "b", "c"]))
- for host in hosts:
- for root in ["a", "b", "c"]:
- r.report(repevent.HostRSyncRootReady(host, root))
- out, err = cap.reset()
- assert not err
- expected1 = "Test started, hosts: host1[0], host2[0], host3[0]"
- assert out.find(expected1) != -1
- for expected in py.code.Source("""
- host1[0]: READY (still 2 to go)
- host2[0]: READY (still 1 to go)
- host3[0]: READY
- """).lines:
- expected = expected.strip()
- assert out.find(expected) != -1
-
-class TestLocalReporter(AbstractTestReporter):
- reporter = LocalReporter
-
- def get_hosts(self):
- return None
-
- def test_report_received_item_outcome(self):
- assert self.report_received_item_outcome() == 'FsF.'
-
- def test_verbose(self):
- self._test_verbose()
-
- def test_module(self):
- output = self._test_module()
- assert output.find("test_one") != -1
- assert output.endswith("FsF."), output
-
- def test_full_module(self):
- py.test.skip("fix exact output matching test")
- received = self._test_full_module()
- expected_lst = ["repmod/test_one.py", "FAILED TO LOAD MODULE",
- "skipped", "reason"]
- for i in expected_lst:
- assert received.find(i) != -1
-
-class TestRemoteReporter(AbstractTestReporter):
- reporter = RemoteReporter
-
- def get_hosts(self):
- return [HostInfo("host")]
-
- def test_still_to_go(self):
- self._test_still_to_go()
-
- def test_report_received_item_outcome(self):
- val = self.report_received_item_outcome()
- expected_lst = ["host", "FAILED",
- "funcpass", "test_one",
- "SKIPPED",
- "PASSED"]
- for expected in expected_lst:
- assert val.find(expected) != -1
-
- def test_module(self):
- val = self._test_module()
- expected_lst = ["host", "FAILED",
- "funcpass", "test_one",
- "SKIPPED",
- "PASSED"]
- for expected in expected_lst:
- assert val.find(expected) != -1
-
- def test_full_module(self):
- py.test.skip("fix exact output matching test")
- val = self._test_full_module()
- assert val.find("FAILED TO LOAD MODULE: repmod/test_three.py\n"\
- "\nSkipped ('reason') repmod/test_two.py") != -1
-
-def test_reporter_choice():
- from py.__.test2.rsession.web import WebReporter
- from py.__.test2.rsession.rest import RestReporter
- choices = [
- (['-d', '--rest'], RestReporter),
- (['-w'], WebReporter),
- (['-r'], WebReporter)]
- for opts, reporter in choices:
- config = py.test2.config._reparse(['xxx'] + opts)
- assert choose_reporter(None, config) is reporter
-
More information about the pytest-commit
mailing list