[pypy-commit] pypy stm-gc: Start to fix the 'transaction' module, mostly by killing code.

arigo noreply at buildbot.pypy.org
Mon Apr 23 10:28:03 CEST 2012


Author: Armin Rigo <arigo at tunes.org>
Branch: stm-gc
Changeset: r54645:266863eb0af8
Date: 2012-04-23 10:15 +0200
http://bitbucket.org/pypy/pypy/changeset/266863eb0af8/

Log:	Start to fix the 'transaction' module, mostly by killing code.

diff --git a/pypy/module/transaction/__init__.py b/pypy/module/transaction/__init__.py
--- a/pypy/module/transaction/__init__.py
+++ b/pypy/module/transaction/__init__.py
@@ -9,8 +9,8 @@
         'set_num_threads': 'interp_transaction.set_num_threads',
         'add': 'interp_transaction.add',
         'run': 'interp_transaction.run',
-        'add_epoll': 'interp_epoll.add_epoll',        # xxx linux only
-        'remove_epoll': 'interp_epoll.remove_epoll',  # xxx linux only
+        #'add_epoll': 'interp_epoll.add_epoll',        # xxx linux only
+        #'remove_epoll': 'interp_epoll.remove_epoll',  # xxx linux only
         'local': 'interp_local.W_Local',
     }
 
@@ -22,9 +22,9 @@
         "NOT_RPYTHON: patches space.threadlocals to use real threadlocals"
         from pypy.module.transaction import interp_transaction
         MixedModule.__init__(self, space, *args)
-        interp_transaction.state.initialize(space)
-        space.threadlocals = interp_transaction.state
+        space.threadlocals = interp_transaction.getstate(space)
 
     def startup(self, space):
         from pypy.module.transaction import interp_transaction
-        interp_transaction.state.startup(space, space.wrap(self))
+        state = interp_transaction.getstate(space)
+        state.startup(space.wrap(self))
diff --git a/pypy/module/transaction/fifo.py b/pypy/module/transaction/fifo.py
deleted file mode 100644
--- a/pypy/module/transaction/fifo.py
+++ /dev/null
@@ -1,39 +0,0 @@
-
-class Fifo(object):
-    def __init__(self):
-        self.first = None
-        self.last = None
-
-    clear = __init__
-
-    def append(self, newitem):
-        newitem.next = None
-        if self.last is None:
-            self.first = newitem
-        else:
-            self.last.next = newitem
-        self.last = newitem
-
-    def is_empty(self):
-        assert (self.first is None) == (self.last is None)
-        return self.first is None
-
-    def is_of_length_1(self):
-        return self.first is not None and self.first is self.last
-
-    def popleft(self):
-        item = self.first
-        self.first = item.next
-        if self.first is None:
-            self.last = None
-        return item
-
-    def steal(self, otherfifo):
-        if otherfifo.last is not None:
-            if self.last is None:
-                self.first = otherfifo.first
-            else:
-                self.last.next = otherfifo.first
-            self.last = otherfifo.last
-            otherfifo.first = None
-            otherfifo.last = None
diff --git a/pypy/module/transaction/interp_local.py b/pypy/module/transaction/interp_local.py
--- a/pypy/module/transaction/interp_local.py
+++ b/pypy/module/transaction/interp_local.py
@@ -1,7 +1,7 @@
 from pypy.interpreter.baseobjspace import Wrappable
 from pypy.interpreter.typedef import (TypeDef, interp2app, GetSetProperty,
     descr_get_dict)
-from pypy.module.transaction.interp_transaction import state
+#from pypy.module.transaction.interp_transaction import state
 
 
 class W_Local(Wrappable):
diff --git a/pypy/module/transaction/interp_transaction.py b/pypy/module/transaction/interp_transaction.py
--- a/pypy/module/transaction/interp_transaction.py
+++ b/pypy/module/transaction/interp_transaction.py
@@ -1,72 +1,36 @@
 from pypy.interpreter.error import OperationError
 from pypy.interpreter.gateway import unwrap_spec
-from pypy.module.transaction import threadintf
-from pypy.module.transaction.fifo import Fifo
-from pypy.rlib import rstm, rgc
-from pypy.rlib.debug import ll_assert
-
-
-NUM_THREADS_DEFAULT = 4     # by default
-
-MAIN_THREAD_ID = 0
+from pypy.interpreter.executioncontext import ExecutionContext
+from pypy.rlib import rstm
 
 
 class State(object):
-    # Warning: this is the class of a singleton.  It Must Not Be Used
-    # Inside Any Transaction!!!!  This code is written with the
-    # expectation that it is really a global singleton used for
-    # synchronization between transactions.  So if the STM logic kicks
-    # in and makes local copies of it, we loose.
-    #
-    # Instead, we have a TransactionalState instance that can be
-    # written to by transactions (possibly causing conflicts).
+    """The shared, global state.  Warning, writes to it cause conflicts.
+    XXX fix me to somehow avoid conflicts at the beginning due to setvalue()
+    """
 
-    def initialize(self, space):
+    def __init__(self, space):
         self.space = space
+        self.num_threads = rstm.NUM_THREADS_DEFAULT
         self.running = False
-        self.num_threads = NUM_THREADS_DEFAULT
-        self.transactionalstate = None
-        #
         self.w_error = None
         self.threadobjs = {}      # empty during translation
         self.threadnums = {}      # empty during translation
         self.epolls = {}
-        self.pending = Fifo()
-        self._freeze_()
+        self.pending_before_run = []
 
-    def _freeze_(self):
-        self.ll_lock = threadintf.null_ll_lock
-        self.ll_no_tasks_pending_lock = threadintf.null_ll_lock
-        self.ll_unfinished_lock = threadintf.null_ll_lock
-        self.ll_not_ready_to_start_lock = threadintf.null_ll_lock
-        self.threadobjs.clear()
-        self.threadnums.clear()
-        self.epolls.clear()
-        self.pending.clear()
-        return False
-
-    def startup(self, space, w_module):
-        assert space is self.space
+    def startup(self, w_module):
         if w_module is not None:     # for tests
+            space = self.space
             self.w_error = space.getattr(w_module,
                                          space.wrap('TransactionError'))
-        self.ll_lock = threadintf.allocate_lock()
-        self.ll_no_tasks_pending_lock = threadintf.allocate_lock()
-        self.ll_unfinished_lock = threadintf.allocate_lock()
-        self.lock_unfinished()
-        self.ll_not_ready_to_start_lock = threadintf.allocate_lock()
-        self.startup_run()
-
-    def startup_run(self):
-        # this is called at the start of run() too, in order to make
-        # test_checkmodule happy
         main_ec = self.space.getexecutioncontext()    # create it if needed
-        main_ec._transaction_pending = self.pending
+        main_ec._transaction_pending = self.pending_before_run
 
     def add_thread(self, id, ec):
         # register a new transaction thread
         assert id not in self.threadobjs
-        ec._transaction_pending = Fifo()
+        ec._transaction_pending = []
         self.threadobjs[id] = ec
         self.threadnums[id] = len(self.threadnums)
 
@@ -84,9 +48,13 @@
 
     def setvalue(self, value):
         id = rstm.thread_id()
-        assert id == MAIN_THREAD_ID   # should not be used from a transaction
-        self.threadobjs[id] = value
-        self.threadnums = {id: 0}
+        if id == rstm.MAIN_THREAD_ID:
+            assert len(self.threadobjs) == 0
+            assert len(self.threadnums) == 0
+            self.threadobjs[id] = value
+            self.threadnums[id] = 0
+        else:
+            self.add_thread(id, value)
 
     def getmainthreadvalue(self):
         return self.threadobjs.get(MAIN_THREAD_ID, None)
@@ -98,7 +66,9 @@
         for id in self.threadobjs.keys():
             if id != MAIN_THREAD_ID:
                 del self.threadobjs[id]
-        self.threadnums = {MAIN_THREAD_ID: 0}
+        for id in self.threadnums.keys():
+            if id != MAIN_THREAD_ID:
+                del self.threadnums[id]
         self.epolls.clear()
 
     def get_thread_number(self):
@@ -108,262 +78,89 @@
     def get_total_number_of_threads(self):
         return 1 + self.num_threads
 
-    # ----------
-
     def set_num_threads(self, num):
         if self.running:
             space = self.space
-            raise OperationError(space.w_ValueError,
+            raise OperationError(self.w_error,
                                  space.wrap("cannot change the number of "
                                             "threads when transaction.run() "
                                             "is active"))
         self.num_threads = num
 
-    def lock(self):
-        """Acquire the main lock.  This plays a role similar to the GIL
-        in that it must be acquired in order to have the _run_thread()
-        code execute; but it is released around every execution of a
-        transaction."""
-        threadintf.acquire(self.ll_lock, True)
 
-    def unlock(self):
-        """Release the main lock."""
-        threadintf.release(self.ll_lock)
-
-    def lock_no_tasks_pending(self):
-        """This lock is acquired when state.pending.is_empty()."""
-        threadintf.acquire(self.ll_no_tasks_pending_lock, True)
-
-    def unlock_no_tasks_pending(self):
-        """Release the ll_no_tasks_pending_lock."""
-        threadintf.release(self.ll_no_tasks_pending_lock)
-
-    def is_locked_no_tasks_pending(self):
-        """Test ll_no_tasks_pending_lock for debugging."""
-        just_locked = threadintf.acquire(self.ll_no_tasks_pending_lock, False)
-        if just_locked:
-            threadintf.release(self.ll_no_tasks_pending_lock)
-        return not just_locked
-
-    def lock_unfinished(self):
-        """This lock is normally acquired.  It is released when all threads
-        are done."""
-        threadintf.acquire(self.ll_unfinished_lock, True)
-
-    def unlock_unfinished(self):
-        """Release ll_unfinished_lock."""
-        threadintf.release(self.ll_unfinished_lock)
-
-    def lock_not_ready_to_start(self):
-        """This lock is acquired when the threads are still starting.
-        It is released when all threads are past their initialization."""
-        threadintf.acquire(self.ll_not_ready_to_start_lock, True)
-
-    def unlock_not_ready_to_start(self):
-        """Release ll_not_ready_to_start_lock."""
-        threadintf.release(self.ll_not_ready_to_start_lock)
-
-
-class TransactionalState(object):
-
-    def __init__(self):
-        self._reraise_exception = None
-
-    def has_exception(self):
-        return self._reraise_exception is not None
-
-    def must_reraise_exception(self, reraise_callback):
-        self._reraise_exception = reraise_callback
-
-    def close_exceptions(self):
-        if self._reraise_exception is not None:
-            self._reraise_exception(self)
-
-
-state = State()
+def getstate(space):
+    return space.fromcache(State)
 
 
 @unwrap_spec(num=int)
 def set_num_threads(space, num):
     if num < 1:
         num = 1
-    state.set_num_threads(num)
+    getstate(space).set_num_threads(num)
 
 
-class AbstractPending(object):
+class SpaceTransaction(rstm.Transaction):
+
+    def __init__(self, space, w_callback, args):
+        self.space = space
+        self.state = getstate(space)
+        self.w_callback = w_callback
+        self.args = args
 
     def register(self):
-        """Register this AbstractPending instance in the pending list
+        """Register this SpaceTransaction instance in the pending list
         belonging to the current thread.  If called from the main
         thread, it is the global list.  If called from a transaction,
         it is a thread-local list that will be merged with the global
-        list when the transaction is done."""
-        ec = state.getvalue()
+        list when the transaction is done.
+        NOTE: never register() the same instance multiple times.
+        """
+        ec = self.state.getvalue()
         ec._transaction_pending.append(self)
 
     def run(self):
-        # may also be overridden
-        rstm.perform_transaction(AbstractPending._run_in_transaction,
-                                 AbstractPending, self)
+        if self.retry_counter > 0:
+            self.register() # retrying: will be done later, try others first
+            return
+        #
+        ec = self.space.getexecutioncontext()    # create it if needed
+        assert len(ec._transaction_pending) == 0
+        #
+        self.space.call_args(self.w_callback, self.args)
+        #
+        result = ec._transaction_pending
+        ec._transaction_pending = []
+        return result
 
-    @staticmethod
-    def _run_in_transaction(pending, retry_counter):
-        if retry_counter > 0:
-            pending.register() # retrying: will be done later, try others first
-            return
-        ts = state.transactionalstate
-        if ts.has_exception():
-            return   # return early if there is already an exception to reraise
-        try:
-            pending.run_in_transaction(state.space)
-        except Exception, e:
-            ts = state.transactionalstate
-            ts.got_exception_applevel = e
-            ts.must_reraise_exception(_reraise_from_applevel)
 
+class InitialTransaction(rstm.Transaction):
 
-class Pending(AbstractPending):
-    def __init__(self, w_callback, args):
-        self.w_callback = w_callback
-        self.args = args
+    def __init__(self, state):
+        self.state = state
 
-    def run_in_transaction(self, space):
-        space.call_args(self.w_callback, self.args)
-
-
-def _reraise_from_applevel(transactionalstate):
-    e = transactionalstate.got_exception_applevel
-    transactionalstate.got_exception_applevel = None
-    raise e
+    def run(self):
+        # initially: return the list of already-added transactions as
+        # the list of transactions to run next, and clear it
+        result = self.state.pending_before_run[:]
+        del self.state.pending_before_run[:]
+        return result
 
 
 def add(space, w_callback, __args__):
-    Pending(w_callback, __args__).register()
-
-
-def _add_list(new_pending_list):
-    if new_pending_list.is_empty():
-        return
-    was_empty = state.pending.is_empty()
-    state.pending.steal(new_pending_list)
-    if was_empty:
-        state.unlock_no_tasks_pending()
-
-
-def _setup_thread(_, retry_counter):
-    """Setup a thread.  Run as a transaction because it allocates."""
-    assert state.running
-    my_thread_id = rstm.thread_id()
-    my_ec = state.space.createexecutioncontext()
-    state.add_thread(my_thread_id, my_ec)
-
-
-def _run_thread():
-    """The main function running one of the threads."""
-    # Note that we cannot allocate any object here outside a transaction,
-    # so we need to be very careful.
-    rstm.descriptor_init()
-    state.lock()
-    rstm.perform_transaction(_setup_thread, AbstractPending, None)
-    # wait until all threads reach this point to continue
-    if state.num_waiting_threads + 1 < state.num_threads:
-        state.num_waiting_threads += 1
-        state.unlock()
-        state.lock_not_ready_to_start()
-        state.lock()
-    else:
-        state.num_waiting_threads = 0
-    state.unlock_not_ready_to_start()
-    #
-    my_transactions_pending = state.getvalue()._transaction_pending
-    #
-    while True:
-        if state.pending.is_empty():
-            ll_assert(state.is_locked_no_tasks_pending(),
-                      "inconsistently unlocked no_tasks_pending")
-            state.num_waiting_threads += 1
-            if state.num_waiting_threads == state.num_threads:
-                state.finished = True
-                state.unlock_no_tasks_pending()
-            state.unlock()
-            #
-            state.lock_no_tasks_pending()
-            state.unlock_no_tasks_pending()
-            #
-            state.lock()
-            state.num_waiting_threads -= 1
-            if state.finished:
-                break
-        else:
-            pending = state.pending.popleft()
-            if state.pending.is_empty():
-                state.lock_no_tasks_pending()
-            state.unlock()
-            #
-            while True:
-                pending.run()
-                # for now, always break out of this loop, unless
-                # 'my_transactions_pending' contains precisely one item
-                if not my_transactions_pending.is_of_length_1():
-                    break
-                pending = my_transactions_pending.popleft()
-            #
-            state.lock()
-            _add_list(my_transactions_pending)
-    #
-    if state.num_waiting_threads == 0:    # only the last thread to leave
-        state.unlock_unfinished()
-    state.unlock()
-    rstm.descriptor_done()
-
-
- at rgc.no_collect
-def _run():
-    # --- start the threads --- don't use the GC here any more! ---
-    #
-    for i in range(state.num_threads):
-        threadintf.start_new_thread(_run_thread)
-    #
-    state.lock_unfinished()  # wait for all threads to finish
-    #
-    # --- done, we can use the GC again ---
+    transaction = SpaceTransaction(space, w_callback, __args__)
+    transaction.register()
 
 
 def run(space):
+    state = getstate(space)
     if state.running:
         raise OperationError(
             state.w_error,
             space.wrap("recursive invocation of transaction.run()"))
-    state.startup_run()
-    assert rstm.thread_id() == MAIN_THREAD_ID
-    assert not state.is_locked_no_tasks_pending()
-    if state.pending.is_empty():
-        return
-    #
-    # 'num_waiting_threads' is the number of threads that are currently
-    # waiting for more work to do.  When it becomes equal to
-    # 'num_threads' then we are done: we set 'finished' to True and this
-    # causes all threads to leave.  Only accessed during a
-    # 'state.lock'-protected region.
-    state.num_waiting_threads = 0
-    state.finished = False
-    #
-    state.transactionalstate = TransactionalState()
-    state.lock_not_ready_to_start()
     state.running = True
-    #
-    # start the threads and wait for all of them to finish
-    rstm.enter_transactional_mode()
-    _run()
-    rstm.leave_transactional_mode()
-    #
-    assert state.num_waiting_threads == 0
-    assert state.pending.is_empty()
-    assert not state.is_locked_no_tasks_pending()
-    state.clear_all_values_apart_from_main()
-    state.running = False
-    ts = state.transactionalstate
-    state.transactionalstate = None
-    #
-    # now re-raise the exception that we got in a transaction
-    ts.close_exceptions()
+    try:
+        rstm.run_all_transactions(InitialTransaction(state),
+                                  num_threads = state.num_threads)
+    finally:
+        state.running = False
+        assert len(state.pending_before_run) == 0
diff --git a/pypy/module/transaction/threadintf.py b/pypy/module/transaction/threadintf.py
deleted file mode 100644
--- a/pypy/module/transaction/threadintf.py
+++ /dev/null
@@ -1,35 +0,0 @@
-import thread
-from pypy.module.thread import ll_thread
-from pypy.rlib.objectmodel import we_are_translated
-from pypy.rpython.annlowlevel import llhelper
-from pypy.rlib.debug import fatalerror
-
-
-null_ll_lock = ll_thread.null_ll_lock
-
-def allocate_lock():
-    if we_are_translated():
-        return ll_thread.allocate_ll_lock()
-    else:
-        return thread.allocate_lock()
-
-def acquire(lock, wait):
-    if we_are_translated():
-        return ll_thread.acquire_NOAUTO(lock, wait)
-    else:
-        return lock.acquire(wait)
-
-def release(lock):
-    if we_are_translated():
-        ll_thread.release_NOAUTO(lock)
-    else:
-        lock.release()
-
-def start_new_thread(callback):
-    if we_are_translated():
-        llcallback = llhelper(ll_thread.CALLBACK, callback)
-        ident = ll_thread.c_thread_start_NOGIL(llcallback)
-        if ident == -1:
-            fatalerror("cannot start thread")
-    else:
-        thread.start_new_thread(callback, ())


More information about the pypy-commit mailing list