[pypy-svn] r77223 - in pypy/branch/fast-forward/pypy/module/_multiprocessing: . test
afa at codespeak.net
afa at codespeak.net
Tue Sep 21 14:22:40 CEST 2010
Author: afa
Date: Tue Sep 21 14:22:38 2010
New Revision: 77223
Modified:
pypy/branch/fast-forward/pypy/module/_multiprocessing/__init__.py
pypy/branch/fast-forward/pypy/module/_multiprocessing/interp_connection.py
pypy/branch/fast-forward/pypy/module/_multiprocessing/test/test_connection.py
Log:
Implement part of _multiprocessing.Connection
Modified: pypy/branch/fast-forward/pypy/module/_multiprocessing/__init__.py
==============================================================================
--- pypy/branch/fast-forward/pypy/module/_multiprocessing/__init__.py (original)
+++ pypy/branch/fast-forward/pypy/module/_multiprocessing/__init__.py Tue Sep 21 14:22:38 2010
@@ -4,7 +4,7 @@
class Module(MixedModule):
interpleveldefs = {
- 'Connection' : 'interp_connection.W_SocketConnection',
+ 'Connection' : 'interp_connection.W_FileConnection',
'PipeConnection' : 'interp_connection.W_PipeConnection',
}
Modified: pypy/branch/fast-forward/pypy/module/_multiprocessing/interp_connection.py
==============================================================================
--- pypy/branch/fast-forward/pypy/module/_multiprocessing/interp_connection.py (original)
+++ pypy/branch/fast-forward/pypy/module/_multiprocessing/interp_connection.py Tue Sep 21 14:22:38 2010
@@ -2,7 +2,8 @@
from pypy.interpreter.typedef import TypeDef, GetSetProperty
from pypy.interpreter.gateway import interp2app, unwrap_spec
from pypy.rpython.lltypesystem import rffi, lltype
-import sys
+from pypy.rlib.rarithmetic import r_uint
+import sys, os
READABLE = 1
WRITABLE = 2
@@ -13,10 +14,8 @@
class W_BaseConnection(Wrappable):
BUFFER_SIZE = 1024
- def __init__(self, handle, flags):
- self.handle = handle
+ def __init__(self, flags):
self.flags = flags
-
self.buffer = lltype.malloc(rffi.CCHARP.TO, self.BUFFER_SIZE,
flavor='raw')
@@ -24,12 +23,6 @@
lltype.free(self.buffer, flavor='raw')
self.do_close()
- def descr_repr(self, space):
- conn_type = ["read-only", "write-only", "read-write"][self.flags]
-
- return space.wrap("<%s %s, handle %zd>" % (
- conn_type, space.type(self).getname(space, '?'), self.handle))
-
def close(self):
self.do_close()
@@ -178,21 +171,110 @@
send = interp2app(W_BaseConnection.send),
recv = interp2app(W_BaseConnection.recv),
## poll = interp2app(W_BaseConnection.poll),
- ## fileno = interp2app(W_BaseConnection.fileno),
close = interp2app(W_BaseConnection.close),
)
-class W_SocketConnection(W_BaseConnection):
- pass
+class W_FileConnection(W_BaseConnection):
+ INVALID_HANDLE_VALUE = -1
+
+ def __init__(self, fd, flags):
+ W_BaseConnection.__init__(self, flags)
+ self.fd = fd
+
+ @unwrap_spec(ObjSpace, W_Root, int, bool, bool)
+ def descr_new(space, w_subtype, fd, readable=True, writable=True):
+ flags = (readable and READABLE) | (writable and WRITABLE)
+
+ self = space.allocate_instance(W_FileConnection, w_subtype)
+ W_FileConnection.__init__(self, fd, flags)
+ return space.wrap(self)
+
+ @unwrap_spec('self', ObjSpace)
+ def fileno(self, space):
+ return space.wrap(self.fd)
+
+ def is_valid(self):
+ return self.fd != self.INVALID_HANDLE_VALUE
+
+ def do_close(self):
+ if self.is_valid():
+ os.close(self.fd)
+ self.fd = self.INVALID_HANDLE_VALUE
-W_SocketConnection.typedef = TypeDef(
+ def do_send_string(self, space, buffer, offset, size):
+ # Since str2charp copies the buffer anyway, always combine the
+ # "header" and the "body" of the message and send them at once.
+ message = lltype.malloc(rffi.CCHARP.TO, size + 4, flavor='raw')
+ try:
+ rffi.cast(rffi.UINTP, message)[0] = r_uint(size) # XXX htonl!
+ i = size - 1
+ while i >= 0:
+ message[4 + i] = buffer[offset + i]
+ i -= 1
+ self._sendall(space, message, size + 4)
+ finally:
+ lltype.free(message, flavor='raw')
+
+ def do_recv_string(self, space, maxlength):
+ length_ptr = lltype.malloc(rffi.CArrayPtr(rffi.UINT).TO, 1,
+ flavor='raw')
+ self._recvall(rffi.cast(rffi.CCHARP, length_ptr), 4)
+ length = length_ptr[0]
+ if length > maxlength:
+ return MP_BAD_MESSAGE_LENGTH
+
+ if length <= self.BUFFER_SIZE:
+ self._recvall(self.buffer, length)
+ return length, None
+ else:
+ newbuf = lltype.malloc(rffi.CCHARP.TO, length, flavor='raw')
+ self._recvall(newbuf, length)
+ return length, newbuf
+
+ def _sendall(self, space, message, size):
+ while size > 0:
+ # XXX inefficient
+ data = rffi.charpsize2str(message, size)
+ try:
+ count = os.write(self.fd, data)
+ except OSError, e:
+ raise wrap_oserror(space, e)
+ size -= count
+ message = rffi.ptradd(message, count)
+
+ def _recvall(self, buffer, length):
+ remaining = length
+ while remaining > 0:
+ try:
+ data = os.read(self.fd, remaining)
+ except OSError, e:
+ raise wrap_oserror(space, e)
+ count = len(data)
+ if count == 0:
+ if remaining == length:
+ return MP_END_OF_FILE
+ else:
+ return MP_EARLY_END_OF_FILE
+ # XXX inefficient
+ for i in range(count):
+ buffer[i] = data[i]
+ remaining -= count
+ buffer = rffi.ptradd(buffer, count)
+
+W_FileConnection.typedef = TypeDef(
'Connection', base_typedef,
+ __new__ = interp2app(W_FileConnection.descr_new.im_func),
+ fileno = interp2app(W_FileConnection.fileno),
)
class W_PipeConnection(W_BaseConnection):
if sys.platform == 'win32':
from pypy.rlib.rwin32 import INVALID_HANDLE_VALUE
+ def __init__(self, handle, flags):
+ W_BaseConnection.__init__(self, flags)
+ self.handle = handle
+
@unwrap_spec(ObjSpace, W_Root, W_Root, bool, bool)
def descr_new(space, w_subtype, w_handle, readable=True, writable=True):
from pypy.module._multiprocessing.interp_win32 import handle_w
@@ -203,9 +285,20 @@
W_PipeConnection.__init__(self, handle, flags)
return space.wrap(self)
+ def descr_repr(self, space):
+ conn_type = ["read-only", "write-only", "read-write"][self.flags]
+
+ return space.wrap("<%s %s, handle %zd>" % (
+ conn_type, space.type(self).getname(space, '?'), self.do_fileno()))
+
def is_valid(self):
return self.handle != self.INVALID_HANDLE_VALUE
+ @unwrap_spec('self', ObjSpace)
+ def fileno(self, space):
+ from pypy.module._multiprocessing.interp_win32 import w_handle
+ return w_handle(space, self.handle)
+
def do_close(self):
from pypy.rlib.rwin32 import CloseHandle
if self.is_valid():
@@ -283,4 +376,5 @@
W_PipeConnection.typedef = TypeDef(
'PipeConnection', base_typedef,
__new__ = interp2app(W_PipeConnection.descr_new.im_func),
+ fileno = interp2app(W_PipeConnection.fileno),
)
Modified: pypy/branch/fast-forward/pypy/module/_multiprocessing/test/test_connection.py
==============================================================================
--- pypy/branch/fast-forward/pypy/module/_multiprocessing/test/test_connection.py (original)
+++ pypy/branch/fast-forward/pypy/module/_multiprocessing/test/test_connection.py Tue Sep 21 14:22:38 2010
@@ -8,21 +8,39 @@
class AppTestConnection:
def setup_class(cls):
- if sys.platform != "win32":
- py.test.skip("win32 only")
- cls.space = gettestobjspace(usemodules=('_multiprocessing', 'thread'))
+ space = gettestobjspace(usemodules=('_multiprocessing', 'thread'))
+ cls.space = space
if sys.platform == "win32":
# stubs for some modules,
# just for multiprocessing to import correctly.
- space = cls.space
w_modules = space.sys.get('modules')
space.setitem(w_modules, space.wrap('msvcrt'), space.sys)
space.setitem(w_modules, space.wrap('_subprocess'), space.sys)
- def test_pipe_connection(self):
+ # import multiprocessing once
+ space.appexec([], """(): import multiprocessing""")
+
+ def test_winpipe_connection(self):
+ import sys
+ if sys.platform != "win32":
+ skip("win32 only")
+
import multiprocessing
+ rhandle, whandle = multiprocessing.Pipe()
+
+ obj = [1, 2.0, "hello"]
+ whandle.send(obj)
+ obj2 = rhandle.recv()
+ assert obj == obj2
+
+ def test_ospipe_connection(self):
+ import _multiprocessing
+ import os
+ fd1, fd2 = os.pipe()
+ rhandle = _multiprocessing.Connection(fd1, writable=False)
+ whandle = _multiprocessing.Connection(fd2, readable=False)
+
obj = [1, 2.0, "hello"]
- whandle, rhandle = multiprocessing.Pipe()
whandle.send(obj)
obj2 = rhandle.recv()
assert obj == obj2
More information about the Pypy-commit
mailing list