[py-svn] r57546 - in py/trunk/py/execnet: . testing
hpk at codespeak.net
hpk at codespeak.net
Thu Aug 21 14:04:46 CEST 2008
Author: hpk
Date: Thu Aug 21 14:04:43 2008
New Revision: 57546
Modified:
py/trunk/py/execnet/channel.py
py/trunk/py/execnet/testing/test_gateway.py
Log:
* channels now also provide makefile(mode) with mode = 'r'
for providing file-like read/readline/close methods.
* added and refined crash and finalization tests
Modified: py/trunk/py/execnet/channel.py
==============================================================================
--- py/trunk/py/execnet/channel.py (original)
+++ py/trunk/py/execnet/channel.py Thu Aug 21 14:04:43 2008
@@ -103,13 +103,16 @@
return self._closed
def makefile(self, mode='w', proxyclose=False):
- """ return a file-like object. Only supported mode right
- now is 'w' for binary writes. If you want to have
- a subsequent file.close() mean to close the channel
- as well, then pass proxyclose=True.
+ """ return a file-like object.
+ mode: 'w' for binary writes, 'r' for binary reads
+ proxyclose: set to true if you want to have a
+ subsequent file.close() automatically close the channel.
"""
- assert mode == 'w', "mode %r not availabe" %(mode,)
- return ChannelFile(channel=self, proxyclose=proxyclose)
+ if mode == "w":
+ return ChannelFileWrite(channel=self, proxyclose=proxyclose)
+ elif mode == "r":
+ return ChannelFileRead(channel=self, proxyclose=proxyclose)
+ raise ValueError("mode %r not availabe" %(mode,))
def close(self, error=None):
""" close down this channel on both sides. """
@@ -299,18 +302,11 @@
for id in self._callbacks.keys():
self._close_callback(id)
-
-class ChannelFile:
+class ChannelFile(object):
def __init__(self, channel, proxyclose=True):
self.channel = channel
self._proxyclose = proxyclose
- def write(self, out):
- self.channel.send(out)
-
- def flush(self):
- pass
-
def close(self):
if self._proxyclose:
self.channel.close()
@@ -319,3 +315,38 @@
state = self.channel.isclosed() and 'closed' or 'open'
return '<ChannelFile %d %s>' %(self.channel.id, state)
+class ChannelFileWrite(ChannelFile):
+ def write(self, out):
+ self.channel.send(out)
+
+ def flush(self):
+ pass
+
+class ChannelFileRead(ChannelFile):
+ def __init__(self, channel, proxyclose=True):
+ super(ChannelFileRead, self).__init__(channel, proxyclose)
+ self._buffer = ""
+
+ def read(self, n):
+ while len(self._buffer) < n:
+ try:
+ self._buffer += self.channel.receive()
+ except EOFError:
+ self.close()
+ break
+ ret = self._buffer[:n]
+ self._buffer = self._buffer[n:]
+ return ret
+
+ def readline(self):
+ i = self._buffer.find("\n")
+ if i != -1:
+ return self.read(i+1)
+ line = self.read(len(self._buffer)+1)
+ while line and line[-1] != "\n":
+ c = self.read(1)
+ if not c:
+ break
+ line += c
+ return line
+
Modified: py/trunk/py/execnet/testing/test_gateway.py
==============================================================================
--- py/trunk/py/execnet/testing/test_gateway.py (original)
+++ py/trunk/py/execnet/testing/test_gateway.py Thu Aug 21 14:04:43 2008
@@ -74,6 +74,11 @@
channel = self.fac.new()
py.test.raises(IOError, channel.waitclose, timeout=0.01)
+ def test_channel_makefile_incompatmode(self):
+ channel = self.fac.new()
+ py.test.raises(ValueError, 'channel.makefile("rw")')
+
+
class PopenGatewayTestSetup:
def setup_class(cls):
cls.gw = py.execnet.PopenGateway()
@@ -291,6 +296,19 @@
assert isinstance(l[2], channel.__class__)
assert l[3] == 999
+ def test_channel_endmarker_callback_error(self):
+ from Queue import Queue
+ q = Queue()
+ channel = self.gw.remote_exec(source='''
+ raise ValueError()
+ ''')
+ channel.setcallback(q.put, endmarker=999)
+ val = q.get(TESTTIMEOUT)
+ assert val == 999
+ err = channel._getremoteerror()
+ assert err
+ assert str(err).find("ValueError") != -1
+
def test_remote_redirect_stdout(self):
out = py.std.StringIO.StringIO()
handle = self.gw._remote_redirect(stdout=out)
@@ -315,7 +333,7 @@
s = subl[0]
assert s.strip() == str(i)
- def test_channel_file(self):
+ def test_channel_file_write(self):
channel = self.gw.remote_exec("""
f = channel.makefile()
print >>f, "hello world"
@@ -344,6 +362,43 @@
assert first.strip() == 'hello world'
py.test.raises(EOFError, channel.receive)
+ def test_channel_file_read(self):
+ channel = self.gw.remote_exec("""
+ f = channel.makefile(mode='r')
+ s = f.read(2)
+ channel.send(s)
+ s = f.read(5)
+ channel.send(s)
+ """)
+ channel.send("xyabcde")
+ s1 = channel.receive()
+ s2 = channel.receive()
+ assert s1 == "xy"
+ assert s2 == "abcde"
+
+ def test_channel_file_read_empty(self):
+ channel = self.gw.remote_exec("pass")
+ f = channel.makefile(mode="r")
+ s = f.read(3)
+ assert s == ""
+ s = f.read(5)
+ assert s == ""
+
+ def test_channel_file_readline_remote(self):
+ channel = self.gw.remote_exec("""
+ channel.send('123\\n45')
+ """)
+ channel.waitclose(TESTTIMEOUT)
+ f = channel.makefile(mode="r")
+ s = f.readline()
+ assert s == "123\n"
+ s = f.readline()
+ assert s == "45"
+
+ def test_channel_makefile_incompatmode(self):
+ channel = self.gw.newchannel()
+ py.test.raises(ValueError, 'channel.makefile("rw")')
+
def test_confusion_from_os_write_stdout(self):
channel = self.gw.remote_exec("""
import os
@@ -383,7 +438,26 @@
""")
text = c1.receive()
assert text.find("execution disallowed") != -1
-
+
+
+def test_channel_endmarker_remote_killterm():
+ gw = py.execnet.PopenGateway()
+ try:
+ from Queue import Queue
+ q = Queue()
+ channel = gw.remote_exec(source='''
+ import os
+ os.kill(os.getpid(), 15)
+ ''')
+ channel.setcallback(q.put, endmarker=999)
+ val = q.get(TESTTIMEOUT)
+ assert val == 999
+ err = channel._getremoteerror()
+ finally:
+ gw.exit()
+ py.test.skip("provide information on causes/signals "
+ "of dying remote gateways")
+
#class TestBlockingIssues:
# def test_join_blocked_execution_gateway(self):
# gateway = py.execnet.PopenGateway()
@@ -437,29 +511,44 @@
ret = channel.receive()
assert ret == 42
- def disabled_test_remote_is_killed_while_sending(self):
+ def test_waitclose_on_remote_killed(self):
+ py.test.skip("fix needed: dying remote process does not cause waitclose() to fail")
+ if not hasattr(py.std.os, 'kill'):
+ py.test.skip("no os.kill")
gw = py.execnet.PopenGateway()
channel = gw.remote_exec("""
import os
import time
- channel.send(os.getppid())
channel.send(os.getpid())
while 1:
- channel.send('#'*1000)
- time.sleep(10)
+ channel.send("#" * 100)
""")
- parent = channel.receive()
- remote = channel.receive()
- assert parent == os.getpid()
- time.sleep(0.5)
- os.kill(remote, signal.SIGKILL)
- time.sleep(1)
- channel.waitclose(TESTTIMEOUT)
+ remotepid = channel.receive()
+ os.kill(remotepid, 9)
+ py.test.raises(channel.RemoteError, "channel.waitclose(TESTTIMEOUT)")
+ py.test.raises(EOFError, channel.send, None)
py.test.raises(EOFError, channel.receive)
- #channel.waitclose(TESTTIMEOUT)
- #channel.send('#')
-
-
+
+def test_endmarker_delivery_on_remote_killterm():
+ if not hasattr(py.std.os, 'kill'):
+ py.test.skip("no os.kill()")
+ gw = py.execnet.PopenGateway()
+ try:
+ from Queue import Queue
+ q = Queue()
+ channel = gw.remote_exec(source='''
+ import os
+ os.kill(os.getpid(), 15)
+ ''')
+ channel.setcallback(q.put, endmarker=999)
+ val = q.get(TESTTIMEOUT)
+ assert val == 999
+ err = channel._getremoteerror()
+ finally:
+ gw.exit()
+ py.test.skip("provide information on causes/signals "
+ "of dying remote gateways")
+
class SocketGatewaySetup:
def setup_class(cls):
@@ -516,4 +605,7 @@
py.test.raises(IOError, gw.remote_init_threads, 3)
gw.exit()
-
+
+def test_nodebug():
+ from py.__.execnet import gateway
+ assert not gateway.debug
More information about the pytest-commit
mailing list