[pypy-svn] r77789 - in pypy/branch/fast-forward/pypy: module/_multiprocessing module/_multiprocessing/test rlib
afa at codespeak.net
afa at codespeak.net
Mon Oct 11 15:12:31 CEST 2010
Author: afa
Date: Mon Oct 11 15:12:30 2010
New Revision: 77789
Added:
pypy/branch/fast-forward/pypy/module/_multiprocessing/interp_semaphore.py (contents, props changed)
pypy/branch/fast-forward/pypy/module/_multiprocessing/test/test_semaphore.py (contents, props changed)
Modified:
pypy/branch/fast-forward/pypy/module/_multiprocessing/__init__.py
pypy/branch/fast-forward/pypy/rlib/rwin32.py
Log:
Partially implement _multiprocessing.SemLock.
Won't work on Unix for now.
Modified: pypy/branch/fast-forward/pypy/module/_multiprocessing/__init__.py
==============================================================================
--- pypy/branch/fast-forward/pypy/module/_multiprocessing/__init__.py (original)
+++ pypy/branch/fast-forward/pypy/module/_multiprocessing/__init__.py Mon Oct 11 15:12:30 2010
@@ -6,6 +6,7 @@
interpleveldefs = {
'Connection' : 'interp_connection.W_FileConnection',
'PipeConnection' : 'interp_connection.W_PipeConnection',
+ 'SemLock' : 'interp_semaphore.W_SemLock',
}
appleveldefs = {
@@ -13,3 +14,7 @@
if sys.platform == 'win32':
interpleveldefs['win32'] = 'interp_win32.win32_namespace(space)'
+
+ def startup(self, space):
+ from pypy.module._multiprocessing.interp_semaphore import CounterState
+ space.fromcache(CounterState).startup(space)
Added: pypy/branch/fast-forward/pypy/module/_multiprocessing/interp_semaphore.py
==============================================================================
--- (empty file)
+++ pypy/branch/fast-forward/pypy/module/_multiprocessing/interp_semaphore.py Mon Oct 11 15:12:30 2010
@@ -0,0 +1,357 @@
+from pypy.interpreter.baseobjspace import ObjSpace, Wrappable, W_Root
+from pypy.interpreter.typedef import TypeDef, GetSetProperty
+from pypy.interpreter.gateway import interp2app, unwrap_spec
+from pypy.interpreter.error import (
+ wrap_windowserror, wrap_oserror, OperationError)
+from pypy.rpython.lltypesystem import rffi, lltype
+from pypy.rlib.rarithmetic import r_uint
+from pypy.translator.tool.cbuild import ExternalCompilationInfo
+from pypy.module.thread import ll_thread
+import sys, os, time
+
+RECURSIVE_MUTEX, SEMAPHORE = range(2)
+
+if sys.platform == 'win32':
+ from pypy.rlib import rwin32
+
+ _CreateSemaphore = rwin32.winexternal(
+ 'CreateSemaphoreA', [rffi.VOIDP, rffi.LONG, rffi.LONG, rwin32.LPCSTR],
+ rwin32.HANDLE)
+ _ReleaseSemaphore = rwin32.winexternal(
+ 'ReleaseSemaphore', [rwin32.HANDLE, rffi.LONG, rffi.LONGP],
+ rwin32.BOOL)
+ _GetTickCount = rwin32.winexternal(
+ 'GetTickCount', [], rwin32.DWORD)
+
+ CtrlHandler_type = lltype.Ptr(lltype.FuncType([], rwin32.BOOL))
+ _CreateEvent = rwin32.winexternal(
+ 'CreateEventA', [rffi.VOIDP, rwin32.BOOL, rwin32.BOOL, rwin32.LPCSTR],
+ rwin32.HANDLE)
+ _SetEvent = rwin32.winexternal(
+ 'SetEvent', [rwin32.HANDLE], rwin32.BOOL)
+ _ResetEvent = rwin32.winexternal(
+ 'ResetEvent', [rwin32.HANDLE], rwin32.BOOL)
+
+ # This is needed because the handler function must have the "WINAPI"
+ # callinf convention, which is not supported by lltype.Ptr.
+ eci = ExternalCompilationInfo(
+ separate_module_sources=['''
+ #include <windows.h>
+
+ static BOOL (*CtrlHandlerRoutine)(
+ DWORD dwCtrlType);
+
+ static BOOL WINAPI winapi_CtrlHandlerRoutine(
+ DWORD dwCtrlType)
+ {
+ return CtrlHandlerRoutine(dwCtrlType);
+ }
+
+ BOOL pypy_multiprocessing_setCtrlHandlerRoutine(BOOL (*f)(DWORD))
+ {
+ CtrlHandlerRoutine = f;
+ SetConsoleCtrlHandler(winapi_CtrlHandlerRoutine, TRUE);
+ }
+
+ '''],
+ export_symbols=['pypy_multiprocessing_setCtrlHandlerRoutine'],
+ )
+ _setCtrlHandlerRoutine = rffi.llexternal(
+ 'pypy_multiprocessing_setCtrlHandlerRoutine',
+ [CtrlHandler_type], rwin32.BOOL,
+ compilation_info=eci)
+
+ def ProcessingCtrlHandler():
+ _SetEvent(globalState.sigint_event)
+ return False
+
+ class GlobalState:
+ def __init__(self):
+ self.init()
+
+ def init(self):
+ self.sigint_event = rwin32.NULL_HANDLE
+
+ def startup(self, space):
+ # Initialize the event handle used to signal Ctrl-C
+ globalState.sigint_event = _CreateEvent(
+ rffi.NULL, True, False, rffi.NULL)
+ if globalState.sigint_event == rwin32.NULL_HANDLE:
+ raise wrap_windowserror(
+ space, rwin32.lastWindowsError("CreateEvent"))
+ if not _setCtrlHandlerRoutine(ProcessingCtrlHandler):
+ raise wrap_windowserror(
+ space, rwin32.lastWindowsError("SetConsoleCtrlHandler"))
+
+
+else:
+ class GlobalState:
+ def init(self):
+ pass
+
+ def startup(self, space):
+ pass
+
+globalState = GlobalState()
+
+class CounterState:
+ def __init__(self, space):
+ self.counter = 0
+
+ def _freeze_(self):
+ self.counter = 0
+ globalState.init()
+
+ def startup(self, space):
+ globalState.startup(space)
+
+ def getCount(self):
+ value = self.counter
+ self.counter += 1
+ return value
+
+if sys.platform == 'win32':
+ SEM_VALUE_MAX = sys.maxint
+ from pypy.module._multiprocessing.interp_win32 import w_handle
+
+ def create_semaphore(space, name, val, max):
+ rwin32.SetLastError(0)
+ handle = _CreateSemaphore(rffi.NULL, val, max, rffi.NULL)
+ # On Windows we should fail on ERROR_ALREADY_EXISTS
+ err = rwin32.GetLastError()
+ if err != 0:
+ raise wrap_windowserror(
+ space, WindowsError(err, "CreateSemaphore"))
+ return handle
+
+ def semlock_acquire(self, space, block, w_timeout):
+ if not block:
+ full_msecs = 0
+ elif space.is_w(w_timeout, space.w_None):
+ full_msecs = rwin32.INFINITE
+ else:
+ timeout = space.float_w(w_timeout)
+ timeout *= 1000.0
+ if timeout < 0.0:
+ timeout = 0.0
+ elif timeout >= 0.5 * rwin32.INFINITE: # 25 days
+ raise OperationError(space.w_OverflowError,
+ space.wrap("timeout is too large"))
+ full_msecs = int(timeout + 0.5)
+
+ # check whether we can acquire without blocking
+ try:
+ res = rwin32.WaitForSingleObject(self.handle, 0)
+ except WindowsError, e:
+ raise wrap_windowserror(space, e)
+
+ if res != rwin32.WAIT_TIMEOUT:
+ self.last_tid = ll_thread.get_ident()
+ self.count += 1
+ return True
+
+ msecs = r_uint(full_msecs)
+ start = _GetTickCount()
+
+ while True:
+ handles = [self.handle, globalState.sigint_event]
+
+ # do the wait
+ _ResetEvent(globalState.sigint_event)
+ try:
+ res = rwin32.WaitForMultipleObjects(handles, timeout=msecs)
+ except WindowsError, e:
+ raise wrap_windowserror(space, e)
+
+ if res != rwin32.WAIT_OBJECT_0 + 1:
+ break
+
+ # got SIGINT so give signal handler a chance to run
+ time.sleep(0.001)
+
+ # if this is main thread let KeyboardInterrupt be raised
+ # XXX PyErr_CheckSignals()
+
+ # recalculate timeout
+ if msecs != rwin32.INFINITE:
+ ticks = _GetTickCount()
+ if r_uint(ticks - start) >= full_msecs:
+ return False
+ msecs = r_uint(full_msecs - (ticks - start))
+
+ # handle result
+ if res != rwin32.WAIT_TIMEOUT:
+ self.last_tid = ll_thread.get_ident()
+ self.count += 1
+ return True
+ return False
+
+ def semlock_release(self, space):
+ if not _ReleaseSemaphore(self.handle, 1,
+ lltype.nullptr(rffi.LONGP.TO)):
+ err = rwin32.GetLastError()
+ if err == 0x0000012a: # ERROR_TOO_MANY_POSTS
+ raise OperationError(
+ space.w_ValueError,
+ space.wrap("semaphore or lock released too many times"))
+ else:
+ raise wrap_windowserror(
+ space, WindowsError(err, "ReleaseSemaphore"))
+
+else:
+ HAVE_BROKEN_SEM_GETVALUE = False
+
+ def create_semaphore(space, name, val, max):
+ sem_open(name, os.O_CREAT | os.O_EXCL, 0600, val)
+ sem_unlink(name)
+
+ def semlock_acquire(self, space, block, w_timeout):
+ if not block:
+ deadline = lltype.nullptr(TIMESPEC.TO)
+ elif space.is_w(w_timeout, space.w_None):
+ deadline = lltype.nullptr(TIMESPEC.TO)
+ else:
+ timeout = space.float_w(w_timeout)
+ sec = int(timeout)
+ nsec = int(1e9 * (timeout - sec) + 0.5)
+
+ deadline = lltype.malloc(TIMESPEC.TO, 1, flavor='raw')
+ deadline.c_tv_sec = now.c_tv_sec + sec
+ deadline.c_tv_nsec = now.c_tv_usec * 1000 + nsec
+ deadline.c_tv_sec += (deadline.c_tv_nsec / 1000000000)
+ deadline.c_tv_nsec %= 1000000000
+ try:
+ while True:
+ if not block:
+ res = sem_trywait(self.handle)
+ elif not deadline:
+ res = sem_wait(self.handle)
+ else:
+ res = sem_timedwait(self.handle, deadline)
+ if res >= 0:
+ break
+ elif errno != EINTR:
+ break
+ # elif PyErr_CheckSignals():
+ # raise...
+ finally:
+ if deadline:
+ lltype.free(deadline, flavor='raw')
+
+ if res < 0:
+ if errno == EAGAIN or errno == ETIMEDOUT:
+ return False
+ raise wrap_oserror(space, errno)
+ return True
+
+ def semlock_release(self, space):
+ if self.kind == RECURSIVE_MUTEX:
+ return
+ if HAVE_BROKEN_SEM_GETVALUE:
+ # We will only check properly the maxvalue == 1 case
+ if self.maxvalue == 1:
+ # make sure that already locked
+ if sem_trywait(self.handle) < 0:
+ if errno != EAGAIN:
+ raise
+ # it is already locked as expected
+ else:
+ # it was not locked so undo wait and raise
+ if sem_post(self.handle) < 0:
+ raise
+ raise OperationError(
+ space.w_ValueError, space.wrap(
+ "semaphore or lock released too many times"))
+ else:
+ # This check is not an absolute guarantee that the semaphore does
+ # not rise above maxvalue.
+ if sem_getvalue(self.handle, sval_ptr) < 0:
+ raise
+ if sval_ptr[0] >= self.maxvalue:
+ raise OperationError(
+ space.w_ValueError, space.wrap(
+ "semaphore or lock released too many times"))
+
+ if sem_post(self.handle) < 0:
+ raise
+
+
+class W_SemLock(Wrappable):
+ def __init__(self, handle, kind, maxvalue):
+ self.handle = handle
+ self.kind = kind
+ self.count = 0
+ self.maxvalue = maxvalue
+
+ def kind_get(space, self):
+ return space.newint(self.kind)
+ def maxvalue_get(space, self):
+ return space.newint(self.maxvalue)
+ def handle_get(space, self):
+ return w_handle(space, self.handle)
+
+ @unwrap_spec('self', ObjSpace)
+ def get_count(self, space):
+ return space.wrap(self.count)
+
+ def _ismine(self):
+ return self.count > 0 and ll_thread.get_ident() == self.last_tid
+
+ @unwrap_spec('self', ObjSpace)
+ def is_mine(self, space):
+ return space.wrap(self._ismine())
+
+ @unwrap_spec('self', ObjSpace, bool, W_Root)
+ def acquire(self, space, block=True, w_timeout=None):
+ # check whether we already own the lock
+ if self.kind == RECURSIVE_MUTEX and self._ismine():
+ self.count += 1
+ return space.w_True
+
+ res = semlock_acquire(self, space, block, w_timeout)
+ return space.wrap(res)
+
+ @unwrap_spec('self', ObjSpace)
+ def release(self, space):
+ if self.kind == RECURSIVE_MUTEX:
+ if not self._ismine():
+ raise OperationError(
+ space.w_AssertionError,
+ space.wrap("attempt to release recursive lock"
+ " not owned by thread"))
+ if self.count > 1:
+ self.count -= 1
+ return
+
+ semlock_release(self, space)
+
+ self.count -= 1
+
+ at unwrap_spec(ObjSpace, W_Root, int, int, int)
+def descr_new(space, w_subtype, kind, value, maxvalue):
+ if kind != RECURSIVE_MUTEX and kind != SEMAPHORE:
+ raise OperationError(space.w_ValueError,
+ space.wrap("unrecognized kind"))
+
+ counter = space.fromcache(CounterState).getCount()
+ name = "/mp%d-%d" % (os.getpid(), counter)
+
+ handle = create_semaphore(space, name, value, maxvalue)
+
+ self = space.allocate_instance(W_SemLock, w_subtype)
+ self.__init__(handle, kind, maxvalue)
+
+ return space.wrap(self)
+
+W_SemLock.typedef = TypeDef(
+ "SemLock",
+ __new__ = interp2app(descr_new),
+ kind = GetSetProperty(W_SemLock.kind_get),
+ maxvalue = GetSetProperty(W_SemLock.maxvalue_get),
+ handle = GetSetProperty(W_SemLock.handle_get),
+ _count = interp2app(W_SemLock.get_count),
+ _is_mine = interp2app(W_SemLock.is_mine),
+ acquire = interp2app(W_SemLock.acquire),
+ release = interp2app(W_SemLock.release),
+ SEM_VALUE_MAX=SEM_VALUE_MAX,
+ )
Added: pypy/branch/fast-forward/pypy/module/_multiprocessing/test/test_semaphore.py
==============================================================================
--- (empty file)
+++ pypy/branch/fast-forward/pypy/module/_multiprocessing/test/test_semaphore.py Mon Oct 11 15:12:30 2010
@@ -0,0 +1,38 @@
+from pypy.conftest import gettestobjspace
+from pypy.module._multiprocessing.interp_semaphore import (
+ RECURSIVE_MUTEX, SEMAPHORE)
+
+class AppTestSemaphore:
+ def setup_class(cls):
+ space = gettestobjspace(usemodules=('_multiprocessing', 'thread'))
+ cls.space = space
+ cls.w_SEMAPHORE = space.wrap(SEMAPHORE)
+
+ def test_semaphore(self):
+ from _multiprocessing import SemLock
+ assert SemLock.SEM_VALUE_MAX > 10
+
+ kind = self.SEMAPHORE
+ value = 1
+ maxvalue = 1
+ sem = SemLock(kind, value, maxvalue)
+ assert sem.kind == kind
+ assert sem.maxvalue == maxvalue
+ assert isinstance(sem.handle, int)
+
+ assert sem._count() == 0
+ sem.acquire()
+ assert sem._is_mine()
+ assert sem._count() == 1
+ sem.release()
+ assert sem._count() == 0
+
+ def test_semaphore_wait(self):
+ from _multiprocessing import SemLock
+ kind = self.SEMAPHORE
+ value = 1
+ maxvalue = 1
+ sem = SemLock(kind, value, maxvalue)
+
+ assert sem.acquire()
+ assert not sem.acquire(timeout=0.1)
Modified: pypy/branch/fast-forward/pypy/rlib/rwin32.py
==============================================================================
--- pypy/branch/fast-forward/pypy/rlib/rwin32.py (original)
+++ pypy/branch/fast-forward/pypy/rlib/rwin32.py Mon Oct 11 15:12:30 2010
@@ -71,6 +71,7 @@
for name in """FORMAT_MESSAGE_ALLOCATE_BUFFER FORMAT_MESSAGE_FROM_SYSTEM
MAX_PATH
+ WAIT_OBJECT_0 WAIT_TIMEOUT INFINITE
""".split():
locals()[name] = rffi_platform.ConstantInteger(name)
@@ -241,3 +242,36 @@
info.c_szCSDVersion)))
finally:
lltype.free(info, flavor='raw')
+
+ _WaitForSingleObject = winexternal(
+ 'WaitForSingleObject', [HANDLE, DWORD], DWORD)
+
+ def WaitForSingleObject(handle, timeout):
+ """Return values:
+ - WAIT_OBJECT_0 when the object is signaled
+ - WAIT_TIMEOUT when the timeout elapsed"""
+ res = _WaitForSingleObject(handle, timeout)
+ if res == rffi.cast(DWORD, -1):
+ raise lastWindowsError("WaitForSingleObject")
+ return res
+
+ _WaitForMultipleObjects = winexternal(
+ 'WaitForMultipleObjects', [
+ DWORD, rffi.CArrayPtr(HANDLE), BOOL, DWORD], DWORD)
+
+ def WaitForMultipleObjects(handles, waitall=False, timeout=INFINITE):
+ """Return values:
+ - WAIT_OBJECT_0 + index when an object is signaled
+ - WAIT_TIMEOUT when the timeout elapsed"""
+ nb = len(handles)
+ handle_array = lltype.malloc(rffi.CArrayPtr(HANDLE).TO, nb,
+ flavor='raw')
+ try:
+ for i in range(nb):
+ handle_array[i] = handles[i]
+ res = _WaitForMultipleObjects(nb, handle_array, waitall, timeout)
+ if res == rffi.cast(DWORD, -1):
+ raise lastWindowsError("WaitForMultipleObjects")
+ return res
+ finally:
+ lltype.free(handle_array, flavor='raw')
More information about the Pypy-commit
mailing list