[pypy-commit] pypy stm: (fijal looking, arigo)

arigo noreply at buildbot.pypy.org
Mon Jan 30 20:02:00 CET 2012


Author: Armin Rigo <arigo at tunes.org>
Branch: stm
Changeset: r51964:23ce401e071d
Date: 2012-01-30 20:01 +0100
http://bitbucket.org/pypy/pypy/changeset/23ce401e071d/

Log:	(fijal looking, arigo)

	Start add_epoll() which gives an integration between epoll objects
	and the transaction module. See test for a demo.

diff --git a/pypy/module/transaction/__init__.py b/pypy/module/transaction/__init__.py
--- a/pypy/module/transaction/__init__.py
+++ b/pypy/module/transaction/__init__.py
@@ -9,6 +9,7 @@
         'set_num_threads': 'interp_transaction.set_num_threads',
         'add': 'interp_transaction.add',
         'run': 'interp_transaction.run',
+        'add_epoll': 'interp_epoll.add_epoll',   # xxx linux only
     }
 
     appleveldefs = {
diff --git a/pypy/module/transaction/interp_epoll.py b/pypy/module/transaction/interp_epoll.py
new file mode 100644
--- /dev/null
+++ b/pypy/module/transaction/interp_epoll.py
@@ -0,0 +1,66 @@
+
+# Linux-only
+
+import os
+from select import EPOLLIN
+from pypy.rpython.lltypesystem import lltype, rffi
+from pypy.interpreter.gateway import unwrap_spec
+from pypy.interpreter.error import OperationError
+from pypy.module.select.interp_epoll import W_Epoll, FD_SETSIZE
+from pypy.module.select.interp_epoll import epoll_event
+from pypy.module.select.interp_epoll import epoll_wait
+from pypy.module.transaction import interp_transaction
+from pypy.rlib import rposix
+
+
+class EPollPending(interp_transaction.AbstractPending):
+    def __init__(self, space, epoller, w_callback):
+        self.space = space
+        self.epoller = epoller
+        self.w_callback = w_callback
+
+    def run(self):
+        # this code is run non-transactionally
+        state = interp_transaction.state
+        if state.has_exception():
+            return
+        maxevents = FD_SETSIZE - 1    # for now
+        timeout = 500                 # for now: half a second
+        with lltype.scoped_alloc(rffi.CArray(epoll_event), maxevents) as evs:
+            nfds = epoll_wait(self.epoller.epfd, evs, maxevents, int(timeout))
+            if nfds < 0:
+                state.got_exception_errno = rposix.get_errno()
+                state.must_reraise_exception(_reraise_from_errno)
+                return
+            for i in range(nfds):
+                event = evs[i]
+                fd = rffi.cast(lltype.Signed, event.c_data.c_fd)
+                PendingCallback(self.w_callback, fd, event.c_events).register()
+        # re-register myself to epoll_wait() for more
+        self.register()
+
+
+class PendingCallback(interp_transaction.AbstractPending):
+    def __init__(self, w_callback, fd, events):
+        self.w_callback = w_callback
+        self.fd = fd
+        self.events = events
+
+    def run_in_transaction(self, space):
+        space.call_function(self.w_callback, space.wrap(self.fd),
+                                             space.wrap(self.events))
+
+
+def _reraise_from_errno():
+    state = interp_transaction.state
+    space = state.space
+    errno = state.got_exception_errno
+    msg = os.strerror(errno)
+    w_type = space.w_IOError
+    w_error = space.call_function(w_type, space.wrap(errno), space.wrap(msg))
+    raise OperationError(w_type, w_error)
+
+
+ at unwrap_spec(epoller=W_Epoll)
+def add_epoll(space, epoller, w_callback):
+    EPollPending(space, epoller, w_callback).register()
diff --git a/pypy/module/transaction/interp_transaction.py b/pypy/module/transaction/interp_transaction.py
--- a/pypy/module/transaction/interp_transaction.py
+++ b/pypy/module/transaction/interp_transaction.py
@@ -114,6 +114,19 @@
     def unlock_unfinished(self):
         threadintf.release(self.ll_unfinished_lock)
 
+    def init_exceptions(self):
+        self._reraise_exception = None
+
+    def has_exception(self):
+        return self._reraise_exception is not None
+
+    def must_reraise_exception(self, reraise_callback):
+        self._reraise_exception = reraise_callback
+
+    def close_exceptions(self):
+        if self._reraise_exception is not None:
+            self._reraise_exception()
+
 
 state = State()
 
@@ -125,32 +138,45 @@
     state.set_num_threads(num)
 
 
-class Pending:
+class AbstractPending(object):
     _alloc_nonmovable_ = True
 
-    def __init__(self, w_callback, args):
-        self.w_callback = w_callback
-        self.args = args
-
     def register(self):
         ec = state.getvalue()
         ec._transaction_pending.append(self)
 
     def run(self):
-        rstm.perform_transaction(Pending._run_in_transaction, Pending, self)
+        # may also be overridden
+        rstm.perform_transaction(AbstractPending._run_in_transaction,
+                                 AbstractPending, self)
 
     @staticmethod
     def _run_in_transaction(pending, retry_counter):
         if retry_counter > 0:
             pending.register() # retrying: will be done later, try others first
             return
-        if state.got_exception is not None:
-            return   # return early if there is already a 'got_exception'
+        if state.has_exception():
+            return   # return early if there is already an exception to reraise
         try:
-            space = state.space
-            space.call_args(pending.w_callback, pending.args)
+            pending.run_in_transaction(state.space)
         except Exception, e:
-            state.got_exception = e
+            state.got_exception_applevel = e
+            state.must_reraise_exception(_reraise_from_applevel)
+
+
+class Pending(AbstractPending):
+    def __init__(self, w_callback, args):
+        self.w_callback = w_callback
+        self.args = args
+
+    def run_in_transaction(self, space):
+        space.call_args(self.w_callback, self.args)
+
+
+def _reraise_from_applevel():
+    e = state.got_exception_applevel
+    state.got_exception_applevel = None
+    raise e
 
 
 def add(space, w_callback, __args__):
@@ -217,7 +243,7 @@
     state.num_waiting_threads = 0
     state.finished = False
     state.running = True
-    state.got_exception = None
+    state.init_exceptions()
     #
     for i in range(state.num_threads):
         threadintf.start_new_thread(_run_thread, ())
@@ -231,7 +257,4 @@
     state.running = False
     #
     # now re-raise the exception that we got in a transaction
-    if state.got_exception is not None:
-        e = state.got_exception
-        state.got_exception = None
-        raise e
+    state.close_exceptions()
diff --git a/pypy/module/transaction/test/test_epoll.py b/pypy/module/transaction/test/test_epoll.py
new file mode 100644
--- /dev/null
+++ b/pypy/module/transaction/test/test_epoll.py
@@ -0,0 +1,51 @@
+import py
+from pypy.conftest import gettestobjspace
+
+
+class AppTestEpoll: 
+    def setup_class(cls):
+        cls.space = gettestobjspace(usemodules=['transaction', 'select'])
+
+    def test_non_transactional(self):
+        import select, posix as os
+        fd_read, fd_write = os.pipe()
+        epoller = select.epoll()
+        epoller.register(fd_read)
+        os.write(fd_write, 'x')
+        [(fd, events)] = epoller.poll()
+        assert fd == fd_read
+        assert events & select.EPOLLIN
+        got = os.read(fd_read, 1)
+        assert got == 'x'
+
+    def test_simple(self):
+        import transaction, select, posix as os
+
+        steps = []
+
+        fd_read, fd_write = os.pipe()
+
+        epoller = select.epoll()
+        epoller.register(fd_read)
+
+        def write_stuff():
+            os.write(fd_write, 'x')
+            steps.append('write_stuff')
+
+        class Done(Exception):
+            pass
+
+        def callback(fd, events):
+            assert fd == fd_read
+            assert events & select.EPOLLIN
+            got = os.read(fd_read, 1)
+            assert got == 'x'
+            steps.append('callback')
+            raise Done
+
+        transaction.add_epoll(epoller, callback)
+        transaction.add(write_stuff)
+
+        assert steps == []
+        raises(Done, transaction.run)
+        assert steps == ['write_stuff', 'callback']


More information about the pypy-commit mailing list