[pypy-svn] r77217 - in pypy/branch/fast-forward/pypy/module/_multiprocessing: . test
afa at codespeak.net
afa at codespeak.net
Tue Sep 21 00:10:21 CEST 2010
Author: afa
Date: Tue Sep 21 00:10:19 2010
New Revision: 77217
Modified:
pypy/branch/fast-forward/pypy/module/_multiprocessing/interp_connection.py
pypy/branch/fast-forward/pypy/module/_multiprocessing/interp_win32.py
pypy/branch/fast-forward/pypy/module/_multiprocessing/test/test_connection.py
pypy/branch/fast-forward/pypy/module/_multiprocessing/test/test_win32.py
Log:
_multiprocessing.PipeConnection: Enough progress for the simple case to work
Does not yet translate!
Modified: pypy/branch/fast-forward/pypy/module/_multiprocessing/interp_connection.py
==============================================================================
--- pypy/branch/fast-forward/pypy/module/_multiprocessing/interp_connection.py (original)
+++ pypy/branch/fast-forward/pypy/module/_multiprocessing/interp_connection.py Tue Sep 21 00:10:19 2010
@@ -1,9 +1,9 @@
from pypy.interpreter.baseobjspace import ObjSpace, Wrappable, W_Root
from pypy.interpreter.typedef import TypeDef, GetSetProperty
from pypy.interpreter.gateway import interp2app, unwrap_spec
+from pypy.rpython.lltypesystem import rffi, lltype
import sys
-INVALID_HANDLE_VALUE = -1
READABLE = 1
WRITABLE = 2
@@ -11,10 +11,19 @@
PY_SSIZE_T_MIN = -sys.maxint - 1
class W_BaseConnection(Wrappable):
+ BUFFER_SIZE = 1024
+
def __init__(self, handle, flags):
self.handle = handle
self.flags = flags
+ self.buffer = lltype.malloc(rffi.CCHARP.TO, self.BUFFER_SIZE,
+ flavor='raw')
+
+ def __del__(self):
+ lltype.free(self.buffer, flavor='raw')
+ self.do_close()
+
def descr_repr(self, space):
conn_type = ["read-only", "write-only", "read-write"][self.flags]
@@ -22,24 +31,28 @@
conn_type, space.type(self).getname(space, '?'), self.handle))
def close(self):
- if self.handle != INVALID_HANDLE_VALUE:
- self.do_close()
- self.handle = INVALID_HANDLE_VALUE
-
- def __del__(self):
- self.close()
+ self.do_close()
def closed_get(space, self):
- return space.w_bool(self.handle == INVALID_HANDLE_VALUE)
+ return space.w_bool(not self.is_valid())
def readable_get(space, self):
return space.w_bool(self.flags & READABLE)
def writable_get(space, self):
return space.w_bool(self.flags & WRITABLE)
+ def _check_readable(self, space):
+ if not self.flags & READABLE:
+ raise OperationError(space.w_IOError,
+ space.wrap("connection is write-only"))
+ def _check_writable(self, space):
+ if not self.flags & WRITABLE:
+ raise OperationError(space.w_IOError,
+ space.wrap("connection is read-only"))
+
@unwrap_spec('self', ObjSpace, 'bufferstr', 'index', 'index')
def send_bytes(self, space, buffer, offset=0, size=PY_SSIZE_T_MIN):
length = len(buffer)
- self._check_writable()
+ self._check_writable(space)
if offset < 0:
raise OperationError(space.w_ValueError,
space.wrap("offset is negative"))
@@ -56,20 +69,17 @@
raise OperationError(space.w_ValueError,
space.wrap("buffer length > offset + size"))
- res = self.do_send_string(buffer, offset, size)
- if res < 0:
- raise mp_error(res)
+ self.do_send_string(space, buffer, offset, size)
@unwrap_spec('self', ObjSpace, 'index')
- def recv_bytes(self, space, maxlength=sys.maxint):
- self._check_readable()
+ def recv_bytes(self, space, maxlength=PY_SSIZE_T_MAX):
+ self._check_readable(space)
if maxlength < 0:
raise OperationError(space.w_ValueError,
space.wrap("maxlength < 0"))
+ res, newbuf = self.do_recv_string(space, maxlength)
try:
- res, newbuf = self.do_recv_string(maxlength)
-
if res < 0:
if res == MP_BAD_MESSAGE_LENGTH:
self.flags &= ~READABLE
@@ -77,13 +87,12 @@
self.close()
raise mp_error(res)
- if newbuf is not None:
- return space.wrap(rffi.charp2str(newbuf, res))
+ if newbuf:
+ return space.wrap(rffi.charpsize2str(newbuf, res))
else:
- return space.wrap(rffi.charp2str(self.buffer, res))
- return result
+ return space.wrap(rffi.charpsize2str(self.buffer, res))
finally:
- if newbuf is not None:
+ if newbuf:
rffi.free_charp(newbuf)
@unwrap_spec('self', ObjSpace, W_Root, 'index')
@@ -91,9 +100,8 @@
rwbuffer = space.rwbuffer_w(w_buffer)
length = rwbuffer.getlength()
+ res, newbuf = self.do_recv_string(space, length - offset)
try:
- res, newbuf = self.do_recv_string(length - offset)
-
if res < 0:
if res == MP_BAD_MESSAGE_LENGTH:
self.flags &= ~READABLE
@@ -103,16 +111,60 @@
if res > length - offset:
raise OperationError(BufferTooShort)
- if newbuf is not None:
- rwbuffer.setslice(offset, newbuf)
+ if newbuf:
+ rwbuffer.setslice(offset, rffi.charpsize2str(newbuf, res))
else:
- rwbuffer.setslice(offset, self.buffer)
+ rwbuffer.setslice(offset, rffi.charpsize2str(self.buffer, res))
finally:
- if newbuf is not None:
+ if newbuf:
rffi.free_charp(newbuf)
return space.wrap(res)
+ @unwrap_spec('self', ObjSpace, W_Root)
+ def send(self, space, w_obj):
+ self._check_writable(space)
+
+ w_builtins = space.getbuiltinmodule('__builtin__')
+ w_picklemodule = space.call_method(
+ w_builtins, '__import__', space.wrap("pickle"))
+ w_protocol = space.getattr(
+ w_picklemodule, space.wrap("HIGHEST_PROTOCOL"))
+ w_pickled = space.call_method(
+ w_picklemodule, "dumps", w_obj, w_protocol)
+
+ buffer = space.bufferstr_w(w_pickled)
+ self.do_send_string(space, buffer, 0, len(buffer))
+
+ @unwrap_spec('self', ObjSpace)
+ def recv(self, space):
+ self._check_readable(space)
+
+ res, newbuf = self.do_recv_string(space, PY_SSIZE_T_MAX)
+ try:
+ if res < 0:
+ if res == MP_BAD_MESSAGE_LENGTH:
+ self.flags &= ~READABLE
+ if self.flags == 0:
+ self.close()
+ raise mp_error(res)
+ if newbuf:
+ w_received = space.wrap(rffi.charpsize2str(newbuf, res))
+ else:
+ w_received = space.wrap(rffi.charpsize2str(self.buffer, res))
+ finally:
+ if newbuf:
+ rffi.free_charp(newbuf)
+
+ w_builtins = space.getbuiltinmodule('__builtin__')
+ w_picklemodule = space.call_method(
+ w_builtins, '__import__', space.wrap("pickle"))
+ w_unpickled = space.call_method(
+ w_picklemodule, "loads", w_received)
+
+ return w_unpickled
+
+
base_typedef = TypeDef(
'BaseConnection',
@@ -123,23 +175,112 @@
send_bytes = interp2app(W_BaseConnection.send_bytes),
recv_bytes = interp2app(W_BaseConnection.recv_bytes),
recv_bytes_into = interp2app(W_BaseConnection.recv_bytes_into),
- ## send = interp2app(W_BaseConnection.send),
- ## recv = interp2app(W_BaseConnection.recv),
+ send = interp2app(W_BaseConnection.send),
+ recv = interp2app(W_BaseConnection.recv),
## poll = interp2app(W_BaseConnection.poll),
## fileno = interp2app(W_BaseConnection.fileno),
- ## close = interp2app(W_BaseConnection.close),
+ close = interp2app(W_BaseConnection.close),
)
class W_SocketConnection(W_BaseConnection):
pass
W_SocketConnection.typedef = TypeDef(
- 'Connection', base_typedef
+ 'Connection', base_typedef,
)
class W_PipeConnection(W_BaseConnection):
- pass
+ if sys.platform == 'win32':
+ from pypy.rlib.rwin32 import INVALID_HANDLE_VALUE
+
+ @unwrap_spec(ObjSpace, W_Root, W_Root, bool, bool)
+ def descr_new(space, w_subtype, w_handle, readable=True, writable=True):
+ from pypy.module._multiprocessing.interp_win32 import handle_w
+ handle = handle_w(space, w_handle)
+ flags = (readable and READABLE) | (writable and WRITABLE)
+
+ self = space.allocate_instance(W_PipeConnection, w_subtype)
+ W_PipeConnection.__init__(self, handle, flags)
+ return space.wrap(self)
+
+ def is_valid(self):
+ return self.handle != self.INVALID_HANDLE_VALUE
+
+ def do_close(self):
+ from pypy.rlib.rwin32 import CloseHandle
+ if self.is_valid():
+ CloseHandle(self.handle)
+ self.handle = self.INVALID_HANDLE_VALUE
+
+ def do_send_string(self, space, buffer, offset, size):
+ from pypy.module._multiprocessing.interp_win32 import (
+ _WriteFile, ERROR_NO_SYSTEM_RESOURCES)
+ from pypy.rlib import rwin32
+
+ charp = rffi.str2charp(buffer)
+ written_ptr = lltype.malloc(rffi.CArrayPtr(rwin32.DWORD).TO, 1,
+ flavor='raw')
+ try:
+ result = _WriteFile(
+ self.handle, rffi.ptradd(charp, offset),
+ size, written_ptr, rffi.NULL)
+
+ if (result == 0 and
+ rwin32.GetLastError() == ERROR_NO_SYSTEM_RESOURCES):
+ raise operrfmt(
+ space.w_ValueError,
+ "Cannot send %ld bytes over connection", size)
+ finally:
+ rffi.free_charp(charp)
+ lltype.free(written_ptr, flavor='raw')
+
+ def do_recv_string(self, space, maxlength):
+ from pypy.module._multiprocessing.interp_win32 import (
+ _ReadFile)
+ from pypy.rlib import rwin32
+
+ read_ptr = lltype.malloc(rffi.CArrayPtr(rwin32.DWORD).TO, 1,
+ flavor='raw')
+ left_ptr = lltype.malloc(rffi.CArrayPtr(rwin32.DWORD).TO, 1,
+ flavor='raw')
+ try:
+ result = _ReadFile(self.handle,
+ self.buffer, min(self.BUFFER_SIZE, maxlength),
+ read_ptr, rffi.NULL)
+ if result:
+ return read_ptr[0], None
+
+ err = rwin32.GetLastError()
+ if err == ERROR_BROKEN_PIPE:
+ return MP_END_OF_FILE
+ elif err != ERROR_MORE_DATA:
+ return MP_STANDARD_ERROR
+
+ # More data...
+ if not _PeekNamedPipe(self.handle, rffi.NULL, 0,
+ rffi.NULL, rffi.NULL, left_ptr):
+ return MP_STANDARD_ERROR
+
+ length = read_ptr[0] + left_ptr[0]
+ if length > maxlength:
+ return MP_BAD_MESSAGE_LENGTH
+
+ newbuf = lltype.malloc(rffi.CCHARP.TO, length + 1, flavor='raw')
+ raw_memcopy(self.buffer, newbuf, read_ptr[0])
+
+ result = _ReadFile(self.handle,
+ rffi.ptradd(newbuf, read_ptr[0]), left_ptr[0],
+ read_ptr, rffi.NULL)
+ if result:
+ assert read_ptr[0] == left_ptr[0]
+ return length, newbuf
+ else:
+ rffi.free_charp(newbuf)
+ return MP_STANDARD_ERROR, None
+ finally:
+ lltype.free(read_ptr, flavor='raw')
W_PipeConnection.typedef = TypeDef(
- 'PipeConnection', base_typedef
+ 'PipeConnection', base_typedef,
+ __new__ = interp2app(W_PipeConnection.descr_new.im_func),
)
Modified: pypy/branch/fast-forward/pypy/module/_multiprocessing/interp_win32.py
==============================================================================
--- pypy/branch/fast-forward/pypy/module/_multiprocessing/interp_win32.py (original)
+++ pypy/branch/fast-forward/pypy/module/_multiprocessing/interp_win32.py Tue Sep 21 00:10:19 2010
@@ -14,6 +14,7 @@
PIPE_UNLIMITED_INSTANCES
NMPWAIT_WAIT_FOREVER
ERROR_PIPE_CONNECTED ERROR_SEM_TIMEOUT ERROR_PIPE_BUSY
+ ERROR_NO_SYSTEM_RESOURCES
""".split()
class CConfig:
@@ -25,9 +26,8 @@
for name in CONSTANTS:
locals()[name] = rffi_platform.ConstantInteger(name)
-class cConfig:
- pass
-cConfig.__dict__.update(rffi_platform.configure(CConfig))
+config = rffi_platform.configure(CConfig)
+globals().update(config)
def handle_w(space, w_handle):
return rffi.cast(rwin32.HANDLE, space.int_w(w_handle))
@@ -58,6 +58,20 @@
rwin32.DWORD, rwin32.DWORD, rwin32.HANDLE],
rwin32.HANDLE)
+_WriteFile = rwin32.winexternal(
+ 'WriteFile', [
+ rwin32.HANDLE,
+ rffi.VOIDP, rwin32.DWORD,
+ rwin32.LPDWORD, rffi.VOIDP],
+ rwin32.BOOL)
+
+_ReadFile = rwin32.winexternal(
+ 'ReadFile', [
+ rwin32.HANDLE,
+ rffi.VOIDP, rwin32.DWORD,
+ rwin32.LPDWORD, rffi.VOIDP],
+ rwin32.BOOL)
+
_ExitProcess = rwin32.winexternal(
'ExitProcess', [rffi.UINT], lltype.Void)
@@ -146,7 +160,7 @@
for name in CONSTANTS:
space.setattr(w_win32,
space.wrap(name),
- space.wrap(getattr(cConfig, name)))
+ space.wrap(config[name]))
space.setattr(w_win32,
space.wrap('NULL'),
space.newint(0))
Modified: pypy/branch/fast-forward/pypy/module/_multiprocessing/test/test_connection.py
==============================================================================
--- pypy/branch/fast-forward/pypy/module/_multiprocessing/test/test_connection.py (original)
+++ pypy/branch/fast-forward/pypy/module/_multiprocessing/test/test_connection.py Tue Sep 21 00:10:19 2010
@@ -14,6 +14,8 @@
#'_rawffi', # on win32
))
if sys.platform == "win32":
+ # stubs for the 'msvcrt' and '_subprocess' module,
+ # just for multiprocessing to import correctly.
space = cls.space
space.setitem(space.sys.get('modules'),
space.wrap('msvcrt'), space.sys)
Modified: pypy/branch/fast-forward/pypy/module/_multiprocessing/test/test_win32.py
==============================================================================
--- pypy/branch/fast-forward/pypy/module/_multiprocessing/test/test_win32.py (original)
+++ pypy/branch/fast-forward/pypy/module/_multiprocessing/test/test_win32.py Tue Sep 21 00:10:19 2010
@@ -12,6 +12,12 @@
from _multiprocessing import win32
raises(WindowsError, win32.CloseHandle, -1)
+ def test_CreateFile(self):
+ from _multiprocessing import win32
+ err = raises(WindowsError, win32.CreateFile,
+ "in/valid", 0, 0, 0, 0, 0, 0)
+ assert err.value.winerror == 87 # ERROR_INVALID_PARAMETER
+
def test_pipe(self):
from _multiprocessing import win32
import os
More information about the Pypy-commit
mailing list