[py-svn] r51415 - py/branch/event/py/test2/rsession/testing

hpk at codespeak.net hpk at codespeak.net
Tue Feb 12 17:49:44 CET 2008


Author: hpk
Date: Tue Feb 12 17:49:44 2008
New Revision: 51415

Added:
   py/branch/event/py/test2/rsession/testing/test_masterslave.py
Log:
adding some fast master/slave rsession related tests


Added: py/branch/event/py/test2/rsession/testing/test_masterslave.py
==============================================================================
--- (empty file)
+++ py/branch/event/py/test2/rsession/testing/test_masterslave.py	Tue Feb 12 17:49:44 2008
@@ -0,0 +1,54 @@
+
+import py
+from py.__.test2.rsession.master import MasterNode 
+from py.__.test2.rsession.hostmanage import HostInfo 
+from basetest import BasicRsessionTest
+from py.__.test2 import repevent 
+
+
+class TestMasterSlaveConnection(BasicRsessionTest):
+    def makereportqueue(self, filterevent=repevent.ItemTestReport):
+        queue = py.std.Queue.Queue()
+        def append(event):
+            if isinstance(event, filterevent):
+                queue.put(event) 
+        self.config.hub.append(append) 
+        return queue 
+
+    def test_node_down(self):
+        host = HostInfo("localhost") 
+        host.initgateway()
+        node = MasterNode(host, self.config)
+        assert not node.channel.isclosed()
+
+        queue = self.makereportqueue(repevent.HostDown)
+        node.send(None)
+        event = queue.get(timeout=1.0)
+        assert event.host == host 
+
+    def test_send_one(self):
+        host = HostInfo("localhost") 
+        host.initgateway()
+        node = MasterNode(host, self.config)
+        assert not node.channel.isclosed()
+
+        queue = self.makereportqueue()
+        node.send(self.getexample("passed"))
+        event = queue.get(timeout=2.0)
+        assert event.passed 
+        assert not node.pending
+
+    def test_send_multiple(self):
+        host = HostInfo("localhost") 
+        host.initgateway()
+        node = MasterNode(host, self.config)
+        assert not node.channel.isclosed()
+
+        queue = self.makereportqueue()
+        for outcome in "passed failed skipped".split():
+            node.send(self.getexample(outcome))
+            event = queue.get(timeout=2.0)
+            assert getattr(event, outcome) 
+        assert not node.pending
+        
+        



More information about the pytest-commit mailing list