[pypy-svn] r46587 - in pypy/dist/pypy: interpreter module/_file module/_file/test module/thread rpython/lltypesystem

arigo at codespeak.net arigo at codespeak.net
Fri Sep 14 19:21:42 CEST 2007


Author: arigo
Date: Fri Sep 14 19:21:36 2007
New Revision: 46587

Modified:
   pypy/dist/pypy/interpreter/baseobjspace.py
   pypy/dist/pypy/module/_file/app_file.py
   pypy/dist/pypy/module/_file/interp_file.py
   pypy/dist/pypy/module/_file/test/test_file.py
   pypy/dist/pypy/module/thread/os_lock.py
   pypy/dist/pypy/rpython/lltypesystem/rffi.py
Log:
Proper locking for app-level files to protect the streamio
from corruption in multithreaded usage.

A pypy-c-thread now works correctly as long as we manually
patch the C sources to make pypy_g_ExcData a gcc thread-local...


Modified: pypy/dist/pypy/interpreter/baseobjspace.py
==============================================================================
--- pypy/dist/pypy/interpreter/baseobjspace.py	(original)
+++ pypy/dist/pypy/interpreter/baseobjspace.py	Fri Sep 14 19:21:36 2007
@@ -434,6 +434,19 @@
         from pypy.interpreter import pyframe
         return pyframe.PyFrame(self, code, w_globals, closure)
 
+    def allocate_lock(self):
+        """Return an interp-level Lock object if threads are enabled,
+        and a dummy object if they are not."""
+        if self.config.objspace.usemodules.thread:
+            from pypy.module.thread.ll_thread import allocate_lock, error
+            try:
+                return allocate_lock()
+            except error:
+                raise OperationError(self.w_RuntimeError,
+                                     self.wrap("out of resources"))
+        else:
+            return dummy_lock
+
     # Following is a friendly interface to common object space operations
     # that can be defined in term of more primitive ones.  Subclasses
     # may also override specific functions for performance.
@@ -859,6 +872,15 @@
         space.exec_(source.compile(), w_glob, w_glob)
         return space.getitem(w_glob, space.wrap('anonymous'))
 
+class DummyLock(object):
+    def acquire(self, flag):
+        return True
+    def release(self):
+        pass
+    def _freeze_(self):
+        return True
+dummy_lock = DummyLock()
+
 ## Table describing the regular part of the interface of object spaces,
 ## namely all methods which only take w_ arguments and return a w_ result
 ## (if any).  Note: keep in sync with pypy.objspace.flow.operation.Table.

Modified: pypy/dist/pypy/module/_file/app_file.py
==============================================================================
--- pypy/dist/pypy/module/_file/app_file.py	(original)
+++ pypy/dist/pypy/module/_file/app_file.py	Fri Sep 14 19:21:36 2007
@@ -102,12 +102,16 @@
             return self.stream.readall()
         else:
             result = []
-            while n > 0:
-                data = self.stream.read(n)
-                if not data:
-                    break
-                n -= len(data)
-                result.append(data)
+            self.stream.lock()
+            try:
+                while n > 0:
+                    data = self.stream.read(n)
+                    if not data:
+                        break
+                    n -= len(data)
+                    result.append(data)
+            finally:
+                self.stream.unlock()
             return ''.join(result)
 
     def readline(self, size=-1):
@@ -125,20 +129,24 @@
         else:
             # very inefficient unless there is a peek()
             result = []
-            while size > 0:
-                # "peeks" on the underlying stream to see how many characters
-                # we can safely read without reading past an end-of-line
-                peeked = self.stream.peek()
-                pn = peeked.find("\n", 0, size)
-                if pn < 0:
-                    pn = min(size-1, len(peeked))
-                c = self.stream.read(pn + 1)
-                if not c:
-                    break
-                result.append(c)
-                if c.endswith('\n'):
-                    break
-                size -= len(c)
+            self.stream.lock()
+            try:
+                while size > 0:
+                    # "peeks" on the underlying stream to see how many chars
+                    # we can safely read without reading past an end-of-line
+                    peeked = self.stream.peek()
+                    pn = peeked.find("\n", 0, size)
+                    if pn < 0:
+                        pn = min(size-1, len(peeked))
+                    c = self.stream.read(pn + 1)
+                    if not c:
+                        break
+                    result.append(c)
+                    if c.endswith('\n'):
+                        break
+                    size -= len(c)
+            finally:
+                self.stream.unlock()
             return ''.join(result)
 
     def readlines(self, size=-1):
@@ -151,17 +159,21 @@
             raise ValueError('I/O operation on closed file')
         if not isinstance(size, (int, long)):
             raise TypeError("an integer is required")
-        if size < 0:
-            return list(iter(self.stream.readline, ""))
-        else:
-            result = []
-            while size > 0:
-                line = self.stream.readline()
-                if not line:
-                    break
-                result.append(line)
-                size -= len(line)
-            return result
+        self.stream.lock()
+        try:
+            if size < 0:
+                result = list(iter(self.stream.readline, ""))
+            else:
+                result = []
+                while size > 0:
+                    line = self.stream.readline()
+                    if not line:
+                        break
+                    result.append(line)
+                    size -= len(line)
+        finally:
+            self.stream.unlock()
+        return result
 
     def write(self, data):
         """write(str) -> None.  Write string str to file.
@@ -226,9 +238,13 @@
 Size defaults to the current file position, as returned by tell()."""
         if self._closed:
             raise ValueError('I/O operation on closed file')
-        if size is None:
-            size = self.stream.tell()
-        self.stream.truncate(size)
+        self.stream.lock()
+        try:
+            if size is None:
+                size = self.stream.tell()
+            self.stream.truncate(size)
+        finally:
+            self.stream.unlock()
 
     def flush(self):
         """flush() -> None.  Flush the internal I/O buffer."""
@@ -243,10 +259,17 @@
 further I/O operations.  close() may be called more than once without
 error.  Some kinds of file objects (for example, opened by popen())
 may return an exit status upon closing."""
+        # use the stream lock to avoid double-closes or
+        # close-while-another-thread-uses-it.
         if not self._closed and hasattr(self, 'stream'):
-            self._closed = True
-            sys.pypy__exithandlers__.pop(self.stream, None)
-            self.stream.close()
+            self.stream.lock()
+            try:
+                if not self._closed:   # could have changed...
+                    self._closed = True
+                    sys.pypy__exithandlers__.pop(self.stream, None)
+                    self.stream.close()
+            finally:
+                self.stream.unlock()
 
     __del__ = close
 

Modified: pypy/dist/pypy/module/_file/interp_file.py
==============================================================================
--- pypy/dist/pypy/module/_file/interp_file.py	(original)
+++ pypy/dist/pypy/module/_file/interp_file.py	Fri Sep 14 19:21:36 2007
@@ -34,25 +34,83 @@
 
 
 class W_Stream(Wrappable):
+    slock = None
+    slockowner = None
+    # Locking issues:
+    # * Multiple threads can access the same W_Stream in
+    #   parallel, because many of the streamio calls eventually
+    #   release the GIL in some external function call.
+    # * Parallel accesses have bad (and crashing) effects on the
+    #   internal state of the buffering levels of the stream in
+    #   particular.
+    # * We can't easily have a lock on each W_Stream because we
+    #   can't translate prebuilt lock objects.
+    # We are still protected by the GIL, so the easiest is to create
+    # the lock on-demand.
+
     def __init__(self, space, stream):
+        self.space = space
         self.stream = stream
 
+    def try_acquire_lock(self):
+        # this function runs with the GIL acquired so there is no race
+        # condition in the creation of the lock
+        if self.slock is None:
+            self.slock = self.space.allocate_lock()
+        me = self.space.getexecutioncontext()   # used as thread ident
+        if self.slockowner is me:
+            return False    # already acquired by the current thread
+        self.slock.acquire(True)
+        self.slockowner = me
+        return True
+
+    def release_lock(self):
+        self.slockowner = None
+        self.slock.release()
+
+    def descr_lock(self):
+        if not self.try_acquire_lock():
+            raise OperationError(self.space.w_RuntimeError,
+                                 self.space.wrap("stream lock already held"))
+
+    def descr_unlock(self):
+        me = self.space.getexecutioncontext()   # used as thread ident
+        if self.slockowner is not me:
+            raise OperationError(self.space.w_RuntimeError,
+                                 self.space.wrap("stream lock is not held"))
+        self.release_lock()
+
+    def _freeze_(self):
+        # remove the lock object, which will be created again as need at
+        # run-time.
+        self.slock = None
+        assert self.slockowner is None
+        return False
+
 for name, argtypes in streamio.STREAM_METHODS.iteritems():
     numargs = len(argtypes)
     args = ", ".join(["v%s" % i for i in range(numargs)])
     exec py.code.Source("""
     def %(name)s(self, space, %(args)s):
+        acquired = self.try_acquire_lock()
         try:
-            return space.wrap(self.stream.%(name)s(%(args)s))
-        except streamio.StreamError, e:
-            raise OperationError(space.w_ValueError,
-                                 space.wrap(e.message))
-        except OSError, e:
-            raise wrap_oserror_as_ioerror(space, e)
+            try:
+                result = self.stream.%(name)s(%(args)s)
+            except streamio.StreamError, e:
+                raise OperationError(space.w_ValueError,
+                                     space.wrap(e.message))
+            except OSError, e:
+                raise wrap_oserror_as_ioerror(space, e)
+        finally:
+            if acquired:
+                self.release_lock()
+        return space.wrap(result)
     %(name)s.unwrap_spec = [W_Stream, ObjSpace] + argtypes
     """ % locals()).compile() in globals()
 
 W_Stream.typedef = TypeDef("Stream",
+    lock   = interp2app(W_Stream.descr_lock),
+    unlock = interp2app(W_Stream.descr_unlock),
     **dict([(name, interp2app(globals()[name]))
                 for name, _ in streamio.STREAM_METHODS.iteritems()]))
 

Modified: pypy/dist/pypy/module/_file/test/test_file.py
==============================================================================
--- pypy/dist/pypy/module/_file/test/test_file.py	(original)
+++ pypy/dist/pypy/module/_file/test/test_file.py	Fri Sep 14 19:21:36 2007
@@ -11,10 +11,8 @@
     def test_simple(self):
         import _file
         f = _file.file(self.temppath, "w")
-        try:
-            f.write("foo")
-        finally:
-            f.close()
+        f.write("foo")
+        f.close()
         f = _file.file(self.temppath, "r")
         raises(TypeError, f.read, None)
         try:
@@ -117,6 +115,39 @@
         assert type(res) is str
         f.close()
 
+class AppTestConcurrency(object):
+    # these tests only really make sense on top of a translated pypy-c,
+    # because on top of py.py the inner calls to os.write() don't
+    # release our object space's GIL.
+    def setup_class(cls):
+        cls.space = gettestobjspace(usemodules=("_file", "thread"))
+        cls.w_temppath = cls.space.wrap(
+            str(py.test.ensuretemp("fileimpl").join("concurrency.txt")))
+
+    def test_concurrent_writes(self):
+        # check that f.write() is atomic
+        import thread, _file, time
+        f = _file.file(self.temppath, "w+b")
+        def writer(i):
+            for j in range(150):
+                f.write('%3d %3d\n' % (i, j))
+            locks[i].release()
+        locks = []
+        for i in range(10):
+            lock = thread.allocate_lock()
+            lock.acquire()
+            locks.append(lock)
+        for i in range(10):
+            thread.start_new_thread(writer, (i,))
+        # wait until all threads are done
+        for i in range(10):
+            locks[i].acquire()
+        f.seek(0)
+        lines = f.readlines()
+        lines.sort()
+        assert lines == ['%3d %3d\n' % (i, j) for i in range(10)
+                                              for j in range(150)]
+        f.close()
 
 def test_flush_at_exit():
     from pypy import conftest

Modified: pypy/dist/pypy/module/thread/os_lock.py
==============================================================================
--- pypy/dist/pypy/module/thread/os_lock.py	(original)
+++ pypy/dist/pypy/module/thread/os_lock.py	Fri Sep 14 19:21:36 2007
@@ -30,6 +30,7 @@
     "A wrappable box around an interp-level lock object."
 
     def __init__(self):
+        # XXX catch thread.error!
         self.lock = thread.allocate_lock()
 
     def descr_lock_acquire(self, space, waitflag=1):

Modified: pypy/dist/pypy/rpython/lltypesystem/rffi.py
==============================================================================
--- pypy/dist/pypy/rpython/lltypesystem/rffi.py	(original)
+++ pypy/dist/pypy/rpython/lltypesystem/rffi.py	Fri Sep 14 19:21:36 2007
@@ -86,9 +86,11 @@
             before = aroundstate.before
             after = aroundstate.after
             if before: before()
-        result = funcptr(*real_args)
-        if invoke_around_handlers:
-            if after: after()
+        try:
+            result = funcptr(*real_args)
+        finally:
+            if invoke_around_handlers:
+                if after: after()
         if stringpolicy == 'fullauto':
             for i, tp in unrolling_arg_tps:
                 if tp is CCHARP:



More information about the Pypy-commit mailing list