[pypy-commit] pypy stm-gc: Start to refactor the RPython interface to be higher-level. This should

arigo noreply at buildbot.pypy.org
Sun Apr 22 12:17:24 CEST 2012


Author: Armin Rigo <arigo at tunes.org>
Branch: stm-gc
Changeset: r54613:9290ce3f4d40
Date: 2012-04-22 12:16 +0200
http://bitbucket.org/pypy/pypy/changeset/9290ce3f4d40/

Log:	Start to refactor the RPython interface to be higher-level. This
	should make it easier to call directly C code from the higher-level
	code without passing through lower-level interfaces. No need to
	access from RPython the ll_thread module with locks and so on; this
	should all be done in C.

	Untested code so far (it doesn't run at all, missing levels, but I'd
	like to add proper testing anyway).

diff --git a/pypy/rlib/rstm.py b/pypy/rlib/rstm.py
--- a/pypy/rlib/rstm.py
+++ b/pypy/rlib/rstm.py
@@ -1,79 +1,146 @@
-import threading
-from pypy.rlib.objectmodel import specialize, we_are_translated
+from pypy.rpython.lltypesystem import lltype, llmemory, rffi
+from pypy.rpython.lltypesystem.lloperation import llop
+from pypy.rpython.annlowlevel import llhelper, cast_instance_to_base_ptr
+from pypy.rpython.annlowlevel import base_ptr_lltype, cast_base_ptr_to_instance
 from pypy.rlib.objectmodel import keepalive_until_here
-from pypy.rlib.debug import ll_assert
-from pypy.rpython.lltypesystem import lltype, llmemory, rffi, rclass
-from pypy.rpython.lltypesystem.lloperation import llop
-from pypy.rpython.annlowlevel import (cast_base_ptr_to_instance,
-                                      cast_instance_to_base_ptr,
-                                      llhelper)
 from pypy.translator.stm.stmgcintf import StmOperations
 
-_global_lock = threading.RLock()
 
- at specialize.memo()
-def _get_stm_callback(func, argcls):
-    def _stm_callback(llarg, retry_counter):
-        llop.stm_start_transaction(lltype.Void)
-        if we_are_translated():
-            llarg = rffi.cast(rclass.OBJECTPTR, llarg)
-            arg = cast_base_ptr_to_instance(argcls, llarg)
+
+NUM_THREADS_DEFAULT = 4     # XXX for now
+
+
+class TransactionError(Exception):
+    pass
+
+class Transaction(object):
+    _next_transaction = None
+    retry_counter = 0
+
+    def run(self):
+        raise NotImplementedError
+
+
+def run_all_transactions(initial_transaction,
+                         num_threads = NUM_THREADS_DEFAULT):
+    if StmOperations.in_transaction():
+        raise TransactionError("nested call to rstm.run_all_transactions()")
+    #
+    _transactionalstate.initialize()
+    #
+    # Tell the GC we are entering transactional mode.  This makes
+    # sure that 'initial_transaction' is flagged as GLOBAL.
+    # No more GC operation afterwards!
+    llop.stm_enter_transactional_mode(lltype.Void)
+    #
+    # Keep alive 'initial_transaction'.  In truth we would like it to
+    # survive a little bit longer, for the beginning of the C code in
+    # run_all_transactions().  This should be equivalent because there
+    # is no possibility of having a GC collection inbetween.
+    keepalive_until_here(initial_transaction)
+    #
+    # Tell the C code to run all transactions.
+    callback = llhelper(_CALLBACK, _run_transaction)
+    ptr = _cast_transaction_to_voidp(initial_transaction)
+    StmOperations.run_all_transactions(callback, ptr, num_threads)
+    #
+    # Tell the GC we are leaving transactional mode.
+    llop.stm_leave_transactional_mode(lltype.Void)
+    #
+    # If an exception was raised, re-raise it here.
+    _transactionalstate.close_exceptions()
+
+
+_CALLBACK = lltype.Ptr(lltype.FuncType([rffi.VOIDP, lltype.Signed],
+                                       rffi.VOIDP))
+
+def _cast_transaction_to_voidp(transaction):
+    ptr = cast_instance_to_base_ptr(transaction)
+    return lltype.cast_pointer(rffi.VOIDP, ptr)
+
+def _cast_voidp_to_transaction(transactionptr):
+    ptr = lltype.cast_pointer(base_ptr_lltype(), transactionptr)
+    return cast_base_ptr_to_instance(Transaction, ptr)
+
+
+class _TransactionalState(object):
+    def initialize(self):
+        self._reraise_exception = None
+
+    def has_exception(self):
+        return self._reraise_exception is not None
+
+    def must_reraise_exception(self, got_exception):
+        self._got_exception = got_exception
+        self._reraise_exception = self.reraise_exception_callback
+
+    def close_exceptions(self):
+        if self._reraise_exception is not None:
+            self._reraise_exception()
+
+    @staticmethod
+    def reraise_exception_callback():
+        exc = _transactionalstate._got_exception
+        self._got_exception = None
+        raise exc
+
+_transactionalstate = _TransactionalState()
+
+
+def _run_transaction(transactionptr, retry_counter):
+    #
+    # Tell the GC we are starting a transaction
+    llop.stm_start_transaction(lltype.Void)
+    #
+    # Now we can use the GC
+    next = None
+    try:
+        if _transactionalstate.has_exception():
+            # a previously committed transaction raised: don't do anything
+            # more in this transaction
+            pass
         else:
-            arg = lltype.TLS.stm_callback_arg
-        try:
-            res = func(arg, retry_counter)
-            ll_assert(res is None, "stm_callback should return None")
-        finally:
-            llop.stm_commit_transaction(lltype.Void)
-        return lltype.nullptr(rffi.VOIDP.TO)
-    return _stm_callback
+            # run!
+            next = _run_really(transactionptr, retry_counter)
+        #
+    except Exception, e:
+        _transactionalstate.must_reraise_exception(e)
+    #
+    # Stop using the GC.  This will make 'next' and all transactions linked
+    # from there GLOBAL objects.
+    llop.stm_stop_transaction(lltype.Void)
+    #
+    # Mark 'next' as kept-alive-until-here.  In truth we would like to
+    # keep it alive after the return, for the C code.  This should be
+    # equivalent because there is no possibility of having a GC collection
+    # inbetween.
+    keepalive_until_here(next)
+    return _cast_transaction_to_voidp(next)
 
- at specialize.arg(0, 1)
-def perform_transaction(func, argcls, arg):
-    ll_assert(arg is None or isinstance(arg, argcls),
-              "perform_transaction: wrong class")
-    if we_are_translated():
-        llarg = cast_instance_to_base_ptr(arg)
-        llarg = rffi.cast(rffi.VOIDP, llarg)
-        adr_of_top = llop.gc_adr_of_root_stack_top(llmemory.Address)
-    else:
-        # only for tests: we want (1) to test the calls to the C library,
-        # but also (2) to work with multiple Python threads, so we acquire
-        # and release some custom GIL here --- even though it doesn't make
-        # sense from an STM point of view :-/
-        _global_lock.acquire()
-        lltype.TLS.stm_callback_arg = arg
-        llarg = lltype.nullptr(rffi.VOIDP.TO)
-        adr_of_top = llmemory.NULL
-    #
-    callback = _get_stm_callback(func, argcls)
-    llcallback = llhelper(StmOperations.CALLBACK_TX, callback)
-    StmOperations.perform_transaction(llcallback, llarg, adr_of_top)
-    keepalive_until_here(arg)
-    if not we_are_translated():
-        _global_lock.release()
 
-def enter_transactional_mode():
-    llop.stm_enter_transactional_mode(lltype.Void)
+def _run_really(transactionptr, retry_counter):
+    # Call the RPython method run() on the Transaction instance.
+    # This logic is in a sub-function because we want to catch
+    # the MemoryErrors that could occur.
+    transaction = _cast_voidp_to_transaction(transactionptr)
+    ll_assert(transaction._next_transaction is None,
+              "_next_transaction should be cleared by C code")
+    transaction.retry_counter = retry_counter
+    new_transactions = transaction.run()
+    return _link_new_transactions(new_transactions)
+_run_really._dont_inline_ = True
 
-def leave_transactional_mode():
-    llop.stm_leave_transactional_mode(lltype.Void)
-
-def descriptor_init():
-    if not we_are_translated(): _global_lock.acquire()
-    llop.stm_descriptor_init(lltype.Void)
-    if not we_are_translated(): _global_lock.release()
-
-def descriptor_done():
-    if not we_are_translated(): _global_lock.acquire()
-    llop.stm_descriptor_done(lltype.Void)
-    if not we_are_translated(): _global_lock.release()
-
-def _debug_get_state():
-    if not we_are_translated(): _global_lock.acquire()
-    res = StmOperations._debug_get_state()
-    if not we_are_translated(): _global_lock.release()
-    return res
-
-def thread_id():
-    return StmOperations.thread_id()
+def _link_new_transactions(new_transactions):
+    # in order to schedule the new transactions, we have to return a
+    # raw pointer to the first one, with their field '_next_transaction'
+    # making a linked list.  The C code reads directly from this
+    # field '_next_transaction'.
+    if new_transactions is None:
+        return None
+    n = len(new_transactions) - 1
+    next = None
+    while n >= 0:
+        new_transactions[n]._next_transaction = next
+        next = new_transactions[n]
+        n -= 1
+    return next
diff --git a/pypy/translator/stm/test/targetdemo.py b/pypy/translator/stm/test/targetdemo.py
--- a/pypy/translator/stm/test/targetdemo.py
+++ b/pypy/translator/stm/test/targetdemo.py
@@ -1,8 +1,6 @@
-from pypy.rpython.lltypesystem import lltype, rffi
-from pypy.module.thread import ll_thread
-from pypy.rlib import rstm, rgc
-from pypy.rlib.debug import debug_print
-from pypy.rpython.annlowlevel import llhelper
+from pypy.rpython.lltypesystem import rffi
+from pypy.rlib import rstm
+from pypy.rlib.debug import debug_print, ll_assert
 
 
 class Node:
@@ -21,10 +19,7 @@
     pass
 
 
-def add_at_end_of_chained_list(arg, retry_counter):
-    assert arg.foobar == 42
-    node = arg.anchor
-    value = arg.value
+def add_at_end_of_chained_list(node, value):
     x = Node(value)
     while node.next:
         node = node.next
@@ -59,65 +54,40 @@
     print "check ok!"
 
 
-def increment_done(arg, retry_counter):
-    print "thread done."
-    glob.done += 1
-
 def _check_pointer(arg1):
     arg1.foobar = 40    # now 'arg1' is local
     return arg1
 
-def check_pointer_equality(arg, retry_counter):
-    res = _check_pointer(arg)
-    if res is not arg:
-        debug_print("ERROR: bogus pointer equality")
-        raise AssertionError
-    raw1 = rffi.cast(rffi.CCHARP, retry_counter)
-    raw2 = rffi.cast(rffi.CCHARP, -1)
-    if raw1 == raw2:
-        debug_print("ERROR: retry_counter == -1")
-        raise AssertionError
+class CheckPointerEquality(rstm.Transaction):
+    def __init__(self, arg):
+        self.arg = arg
+    def run(self):
+        res = _check_pointer(self.arg)    # 'self.arg' reads a GLOBAL object
+        ll_assert(res is self.arg, "ERROR: bogus pointer equality")
+        raw1 = rffi.cast(rffi.CCHARP, self.retry_counter)
+        raw2 = rffi.cast(rffi.CCHARP, -1)
+        ll_assert(raw1 == raw2, "ERROR: retry_counter == -1")
 
-def run_me():
-    rstm.descriptor_init()
-    try:
-        debug_print("thread starting...")
-        arg = glob._arg
-        ll_thread.release_NOAUTO(glob.lock)
-        arg.foobar = 41
-        rstm.perform_transaction(check_pointer_equality, Arg, arg)
-        i = 0
-        while i < glob.LENGTH:
-            arg.anchor = glob.anchor
-            arg.value = i
-            arg.foobar = 42
-            rstm.perform_transaction(add_at_end_of_chained_list, Arg, arg)
-            i += 1
-        rstm.perform_transaction(increment_done, Arg, arg)
-    finally:
-        rstm.descriptor_done()
+class MakeChain(rstm.Transaction):
+    def __init__(self, anchor, value):
+        self.anchor = anchor
+        self.value = value
+    def run(self):
+        add_at_end_of_chained_list(self.anchor, self.value)
+        self.value += 1
+        if self.value < glob.LENGTH:
+            return [self]       # re-schedule the same Transaction object
 
-
- at rgc.no_collect     # don't use the gc as long as other threads are running
-def _run():
-    i = 0
-    while i < glob.NUM_THREADS:
-        glob._arg = glob._arglist[i]
-        ll_run_me = llhelper(ll_thread.CALLBACK, run_me)
-        ll_thread.c_thread_start_NOGIL(ll_run_me)
-        ll_thread.acquire_NOAUTO(glob.lock, True)
-        i += 1
-    debug_print("sleeping...")
-    while glob.done < glob.NUM_THREADS:    # poor man's lock
-        _sleep(rffi.cast(rffi.ULONG, 1))
-    debug_print("done sleeping.")
-
-
-# Posix only
-_sleep = rffi.llexternal('sleep', [rffi.ULONG], rffi.ULONG,
-                         _nowrapper=True,
-                         random_effects_on_gcobjs=False)
-
+class InitialTransaction(rstm.Transaction):
+    def run(self):
+        ll_assert(self.retry_counter == 0, "no reason to abort-and-retry here")
+        scheduled = []
+        for i in range(glob.NUM_THREADS):
+            arg = Arg()
+            arg.foobar = 41
+            scheduled.append(CheckPointerEquality(arg))
+            scheduled.append(MakeChain(glob.anchor, 0))
+        return scheduled
 
 # __________  Entry point  __________
 
@@ -129,14 +99,9 @@
             glob.LENGTH = int(argv[2])
             if len(argv) > 3:
                 glob.USE_MEMORY = bool(int(argv[3]))
-    glob.done = 0
-    glob.lock = ll_thread.allocate_ll_lock()
-    ll_thread.acquire_NOAUTO(glob.lock, True)
-    glob._arglist = [Arg() for i in range(glob.NUM_THREADS)]
     #
-    rstm.enter_transactional_mode()
-    _run()
-    rstm.leave_transactional_mode()
+    rstm.run_all_transactions(InitialTransaction(),
+                              num_threads=glob.NUM_THREADS)
     #
     check_chained_list(glob.anchor.next)
     return 0
@@ -145,3 +110,7 @@
 
 def target(*args):
     return entry_point, None
+
+if __name__ == '__main__':
+    import sys
+    entry_point(sys.argv)


More information about the pypy-commit mailing list