[pypy-svn] r77223 - in pypy/branch/fast-forward/pypy/module/_multiprocessing: . test

afa at codespeak.net afa at codespeak.net
Tue Sep 21 14:22:40 CEST 2010


Author: afa
Date: Tue Sep 21 14:22:38 2010
New Revision: 77223

Modified:
   pypy/branch/fast-forward/pypy/module/_multiprocessing/__init__.py
   pypy/branch/fast-forward/pypy/module/_multiprocessing/interp_connection.py
   pypy/branch/fast-forward/pypy/module/_multiprocessing/test/test_connection.py
Log:
Implement part of _multiprocessing.Connection


Modified: pypy/branch/fast-forward/pypy/module/_multiprocessing/__init__.py
==============================================================================
--- pypy/branch/fast-forward/pypy/module/_multiprocessing/__init__.py	(original)
+++ pypy/branch/fast-forward/pypy/module/_multiprocessing/__init__.py	Tue Sep 21 14:22:38 2010
@@ -4,7 +4,7 @@
 class Module(MixedModule):
 
     interpleveldefs = {
-        'Connection'      : 'interp_connection.W_SocketConnection',
+        'Connection'      : 'interp_connection.W_FileConnection',
         'PipeConnection'  : 'interp_connection.W_PipeConnection',
     }
 

Modified: pypy/branch/fast-forward/pypy/module/_multiprocessing/interp_connection.py
==============================================================================
--- pypy/branch/fast-forward/pypy/module/_multiprocessing/interp_connection.py	(original)
+++ pypy/branch/fast-forward/pypy/module/_multiprocessing/interp_connection.py	Tue Sep 21 14:22:38 2010
@@ -2,7 +2,8 @@
 from pypy.interpreter.typedef import TypeDef, GetSetProperty
 from pypy.interpreter.gateway import interp2app, unwrap_spec
 from pypy.rpython.lltypesystem import rffi, lltype
-import sys
+from pypy.rlib.rarithmetic import r_uint
+import sys, os
 
 READABLE = 1
 WRITABLE = 2
@@ -13,10 +14,8 @@
 class W_BaseConnection(Wrappable):
     BUFFER_SIZE = 1024
 
-    def __init__(self, handle, flags):
-        self.handle = handle
+    def __init__(self, flags):
         self.flags = flags
-
         self.buffer = lltype.malloc(rffi.CCHARP.TO, self.BUFFER_SIZE,
                                     flavor='raw')
 
@@ -24,12 +23,6 @@
         lltype.free(self.buffer, flavor='raw')
         self.do_close()
 
-    def descr_repr(self, space):
-        conn_type = ["read-only", "write-only", "read-write"][self.flags]
-
-        return space.wrap("<%s %s, handle %zd>" % (
-            conn_type, space.type(self).getname(space, '?'), self.handle))
-
     def close(self):
         self.do_close()
 
@@ -178,21 +171,110 @@
     send = interp2app(W_BaseConnection.send),
     recv = interp2app(W_BaseConnection.recv),
     ## poll = interp2app(W_BaseConnection.poll),
-    ## fileno = interp2app(W_BaseConnection.fileno),
     close = interp2app(W_BaseConnection.close),
     )
 
-class W_SocketConnection(W_BaseConnection):
-    pass
+class W_FileConnection(W_BaseConnection):
+    INVALID_HANDLE_VALUE = -1
+
+    def __init__(self, fd, flags):
+        W_BaseConnection.__init__(self, flags)
+        self.fd = fd
+
+    @unwrap_spec(ObjSpace, W_Root, int, bool, bool)
+    def descr_new(space, w_subtype, fd, readable=True, writable=True):
+        flags = (readable and READABLE) | (writable and WRITABLE)
+
+        self = space.allocate_instance(W_FileConnection, w_subtype)
+        W_FileConnection.__init__(self, fd, flags)
+        return space.wrap(self)
+
+    @unwrap_spec('self', ObjSpace)
+    def fileno(self, space):
+        return space.wrap(self.fd)
+
+    def is_valid(self):
+        return self.fd != self.INVALID_HANDLE_VALUE
+
+    def do_close(self):
+        if self.is_valid():
+            os.close(self.fd)
+            self.fd = self.INVALID_HANDLE_VALUE
 
-W_SocketConnection.typedef = TypeDef(
+    def do_send_string(self, space, buffer, offset, size):
+        # Since str2charp copies the buffer anyway, always combine the
+        # "header" and the "body" of the message and send them at once.
+        message = lltype.malloc(rffi.CCHARP.TO, size + 4, flavor='raw')
+        try:
+            rffi.cast(rffi.UINTP, message)[0] = r_uint(size) # XXX htonl!
+            i = size - 1
+            while i >= 0:
+                message[4 + i] = buffer[offset + i]
+                i -= 1
+            self._sendall(space, message, size + 4)
+        finally:
+            lltype.free(message, flavor='raw')
+
+    def do_recv_string(self, space, maxlength):
+        length_ptr = lltype.malloc(rffi.CArrayPtr(rffi.UINT).TO, 1,
+                                   flavor='raw')
+        self._recvall(rffi.cast(rffi.CCHARP, length_ptr), 4)
+        length = length_ptr[0]
+        if length > maxlength:
+            return MP_BAD_MESSAGE_LENGTH
+
+        if length <= self.BUFFER_SIZE:
+            self._recvall(self.buffer, length)
+            return length, None
+        else:
+            newbuf = lltype.malloc(rffi.CCHARP.TO, length, flavor='raw')
+            self._recvall(newbuf, length)
+            return length, newbuf
+
+    def _sendall(self, space, message, size):
+        while size > 0:
+            # XXX inefficient
+            data = rffi.charpsize2str(message, size)
+            try:
+                count = os.write(self.fd, data)
+            except OSError, e:
+                raise wrap_oserror(space, e)
+            size -= count
+            message = rffi.ptradd(message, count)
+
+    def _recvall(self, buffer, length):
+        remaining = length
+        while remaining > 0:
+            try:
+                data = os.read(self.fd, remaining)
+            except OSError, e:
+                raise wrap_oserror(space, e)
+            count = len(data)
+            if count == 0:
+                if remaining == length:
+                    return MP_END_OF_FILE
+                else:
+                    return MP_EARLY_END_OF_FILE
+            # XXX inefficient
+            for i in range(count):
+                buffer[i] = data[i]
+            remaining -= count
+            buffer = rffi.ptradd(buffer, count)
+
+W_FileConnection.typedef = TypeDef(
     'Connection', base_typedef,
+    __new__ = interp2app(W_FileConnection.descr_new.im_func),
+    fileno = interp2app(W_FileConnection.fileno),
 )
 
 class W_PipeConnection(W_BaseConnection):
     if sys.platform == 'win32':
         from pypy.rlib.rwin32 import INVALID_HANDLE_VALUE
 
+    def __init__(self, handle, flags):
+        W_BaseConnection.__init__(self, flags)
+        self.handle = handle
+
     @unwrap_spec(ObjSpace, W_Root, W_Root, bool, bool)
     def descr_new(space, w_subtype, w_handle, readable=True, writable=True):
         from pypy.module._multiprocessing.interp_win32 import handle_w
@@ -203,9 +285,20 @@
         W_PipeConnection.__init__(self, handle, flags)
         return space.wrap(self)
 
+    def descr_repr(self, space):
+        conn_type = ["read-only", "write-only", "read-write"][self.flags]
+
+        return space.wrap("<%s %s, handle %zd>" % (
+            conn_type, space.type(self).getname(space, '?'), self.do_fileno()))
+
     def is_valid(self):
         return self.handle != self.INVALID_HANDLE_VALUE
 
+    @unwrap_spec('self', ObjSpace)
+    def fileno(self, space):
+        from pypy.module._multiprocessing.interp_win32 import w_handle
+        return w_handle(space, self.handle)
+
     def do_close(self):
         from pypy.rlib.rwin32 import CloseHandle
         if self.is_valid():
@@ -283,4 +376,5 @@
 W_PipeConnection.typedef = TypeDef(
     'PipeConnection', base_typedef,
     __new__ = interp2app(W_PipeConnection.descr_new.im_func),
+    fileno = interp2app(W_PipeConnection.fileno),
 )

Modified: pypy/branch/fast-forward/pypy/module/_multiprocessing/test/test_connection.py
==============================================================================
--- pypy/branch/fast-forward/pypy/module/_multiprocessing/test/test_connection.py	(original)
+++ pypy/branch/fast-forward/pypy/module/_multiprocessing/test/test_connection.py	Tue Sep 21 14:22:38 2010
@@ -8,21 +8,39 @@
 
 class AppTestConnection:
     def setup_class(cls):
-        if sys.platform != "win32":
-            py.test.skip("win32 only")
-        cls.space = gettestobjspace(usemodules=('_multiprocessing', 'thread'))
+        space = gettestobjspace(usemodules=('_multiprocessing', 'thread'))
+        cls.space = space
         if sys.platform == "win32":
             # stubs for some modules,
             # just for multiprocessing to import correctly.
-            space = cls.space
             w_modules = space.sys.get('modules')
             space.setitem(w_modules, space.wrap('msvcrt'), space.sys)
             space.setitem(w_modules, space.wrap('_subprocess'), space.sys)
 
-    def test_pipe_connection(self):
+        # import multiprocessing once
+        space.appexec([], """(): import multiprocessing""")
+
+    def test_winpipe_connection(self):
+        import sys
+        if sys.platform != "win32":
+            skip("win32 only")
+
         import multiprocessing
+        rhandle, whandle = multiprocessing.Pipe()
+
+        obj = [1, 2.0, "hello"]
+        whandle.send(obj)
+        obj2 = rhandle.recv()
+        assert obj == obj2
+
+    def test_ospipe_connection(self):
+        import _multiprocessing
+        import os
+        fd1, fd2 = os.pipe()
+        rhandle = _multiprocessing.Connection(fd1, writable=False)
+        whandle = _multiprocessing.Connection(fd2, readable=False)
+
         obj = [1, 2.0, "hello"]
-        whandle, rhandle = multiprocessing.Pipe()
         whandle.send(obj)
         obj2 = rhandle.recv()
         assert obj == obj2



More information about the Pypy-commit mailing list