[pypy-commit] pypy stm: (fijal looking, arigo)
arigo
noreply at buildbot.pypy.org
Mon Jan 30 20:02:00 CET 2012
Author: Armin Rigo <arigo at tunes.org>
Branch: stm
Changeset: r51964:23ce401e071d
Date: 2012-01-30 20:01 +0100
http://bitbucket.org/pypy/pypy/changeset/23ce401e071d/
Log: (fijal looking, arigo)
Start add_epoll() which gives an integration between epoll objects
and the transaction module. See test for a demo.
diff --git a/pypy/module/transaction/__init__.py b/pypy/module/transaction/__init__.py
--- a/pypy/module/transaction/__init__.py
+++ b/pypy/module/transaction/__init__.py
@@ -9,6 +9,7 @@
'set_num_threads': 'interp_transaction.set_num_threads',
'add': 'interp_transaction.add',
'run': 'interp_transaction.run',
+ 'add_epoll': 'interp_epoll.add_epoll', # xxx linux only
}
appleveldefs = {
diff --git a/pypy/module/transaction/interp_epoll.py b/pypy/module/transaction/interp_epoll.py
new file mode 100644
--- /dev/null
+++ b/pypy/module/transaction/interp_epoll.py
@@ -0,0 +1,66 @@
+
+# Linux-only
+
+import os
+from select import EPOLLIN
+from pypy.rpython.lltypesystem import lltype, rffi
+from pypy.interpreter.gateway import unwrap_spec
+from pypy.interpreter.error import OperationError
+from pypy.module.select.interp_epoll import W_Epoll, FD_SETSIZE
+from pypy.module.select.interp_epoll import epoll_event
+from pypy.module.select.interp_epoll import epoll_wait
+from pypy.module.transaction import interp_transaction
+from pypy.rlib import rposix
+
+
+class EPollPending(interp_transaction.AbstractPending):
+ def __init__(self, space, epoller, w_callback):
+ self.space = space
+ self.epoller = epoller
+ self.w_callback = w_callback
+
+ def run(self):
+ # this code is run non-transactionally
+ state = interp_transaction.state
+ if state.has_exception():
+ return
+ maxevents = FD_SETSIZE - 1 # for now
+ timeout = 500 # for now: half a second
+ with lltype.scoped_alloc(rffi.CArray(epoll_event), maxevents) as evs:
+ nfds = epoll_wait(self.epoller.epfd, evs, maxevents, int(timeout))
+ if nfds < 0:
+ state.got_exception_errno = rposix.get_errno()
+ state.must_reraise_exception(_reraise_from_errno)
+ return
+ for i in range(nfds):
+ event = evs[i]
+ fd = rffi.cast(lltype.Signed, event.c_data.c_fd)
+ PendingCallback(self.w_callback, fd, event.c_events).register()
+ # re-register myself to epoll_wait() for more
+ self.register()
+
+
+class PendingCallback(interp_transaction.AbstractPending):
+ def __init__(self, w_callback, fd, events):
+ self.w_callback = w_callback
+ self.fd = fd
+ self.events = events
+
+ def run_in_transaction(self, space):
+ space.call_function(self.w_callback, space.wrap(self.fd),
+ space.wrap(self.events))
+
+
+def _reraise_from_errno():
+ state = interp_transaction.state
+ space = state.space
+ errno = state.got_exception_errno
+ msg = os.strerror(errno)
+ w_type = space.w_IOError
+ w_error = space.call_function(w_type, space.wrap(errno), space.wrap(msg))
+ raise OperationError(w_type, w_error)
+
+
+ at unwrap_spec(epoller=W_Epoll)
+def add_epoll(space, epoller, w_callback):
+ EPollPending(space, epoller, w_callback).register()
diff --git a/pypy/module/transaction/interp_transaction.py b/pypy/module/transaction/interp_transaction.py
--- a/pypy/module/transaction/interp_transaction.py
+++ b/pypy/module/transaction/interp_transaction.py
@@ -114,6 +114,19 @@
def unlock_unfinished(self):
threadintf.release(self.ll_unfinished_lock)
+ def init_exceptions(self):
+ self._reraise_exception = None
+
+ def has_exception(self):
+ return self._reraise_exception is not None
+
+ def must_reraise_exception(self, reraise_callback):
+ self._reraise_exception = reraise_callback
+
+ def close_exceptions(self):
+ if self._reraise_exception is not None:
+ self._reraise_exception()
+
state = State()
@@ -125,32 +138,45 @@
state.set_num_threads(num)
-class Pending:
+class AbstractPending(object):
_alloc_nonmovable_ = True
- def __init__(self, w_callback, args):
- self.w_callback = w_callback
- self.args = args
-
def register(self):
ec = state.getvalue()
ec._transaction_pending.append(self)
def run(self):
- rstm.perform_transaction(Pending._run_in_transaction, Pending, self)
+ # may also be overridden
+ rstm.perform_transaction(AbstractPending._run_in_transaction,
+ AbstractPending, self)
@staticmethod
def _run_in_transaction(pending, retry_counter):
if retry_counter > 0:
pending.register() # retrying: will be done later, try others first
return
- if state.got_exception is not None:
- return # return early if there is already a 'got_exception'
+ if state.has_exception():
+ return # return early if there is already an exception to reraise
try:
- space = state.space
- space.call_args(pending.w_callback, pending.args)
+ pending.run_in_transaction(state.space)
except Exception, e:
- state.got_exception = e
+ state.got_exception_applevel = e
+ state.must_reraise_exception(_reraise_from_applevel)
+
+
+class Pending(AbstractPending):
+ def __init__(self, w_callback, args):
+ self.w_callback = w_callback
+ self.args = args
+
+ def run_in_transaction(self, space):
+ space.call_args(self.w_callback, self.args)
+
+
+def _reraise_from_applevel():
+ e = state.got_exception_applevel
+ state.got_exception_applevel = None
+ raise e
def add(space, w_callback, __args__):
@@ -217,7 +243,7 @@
state.num_waiting_threads = 0
state.finished = False
state.running = True
- state.got_exception = None
+ state.init_exceptions()
#
for i in range(state.num_threads):
threadintf.start_new_thread(_run_thread, ())
@@ -231,7 +257,4 @@
state.running = False
#
# now re-raise the exception that we got in a transaction
- if state.got_exception is not None:
- e = state.got_exception
- state.got_exception = None
- raise e
+ state.close_exceptions()
diff --git a/pypy/module/transaction/test/test_epoll.py b/pypy/module/transaction/test/test_epoll.py
new file mode 100644
--- /dev/null
+++ b/pypy/module/transaction/test/test_epoll.py
@@ -0,0 +1,51 @@
+import py
+from pypy.conftest import gettestobjspace
+
+
+class AppTestEpoll:
+ def setup_class(cls):
+ cls.space = gettestobjspace(usemodules=['transaction', 'select'])
+
+ def test_non_transactional(self):
+ import select, posix as os
+ fd_read, fd_write = os.pipe()
+ epoller = select.epoll()
+ epoller.register(fd_read)
+ os.write(fd_write, 'x')
+ [(fd, events)] = epoller.poll()
+ assert fd == fd_read
+ assert events & select.EPOLLIN
+ got = os.read(fd_read, 1)
+ assert got == 'x'
+
+ def test_simple(self):
+ import transaction, select, posix as os
+
+ steps = []
+
+ fd_read, fd_write = os.pipe()
+
+ epoller = select.epoll()
+ epoller.register(fd_read)
+
+ def write_stuff():
+ os.write(fd_write, 'x')
+ steps.append('write_stuff')
+
+ class Done(Exception):
+ pass
+
+ def callback(fd, events):
+ assert fd == fd_read
+ assert events & select.EPOLLIN
+ got = os.read(fd_read, 1)
+ assert got == 'x'
+ steps.append('callback')
+ raise Done
+
+ transaction.add_epoll(epoller, callback)
+ transaction.add(write_stuff)
+
+ assert steps == []
+ raises(Done, transaction.run)
+ assert steps == ['write_stuff', 'callback']
More information about the pypy-commit
mailing list