[pypy-commit] pypy stm-gc: Start to fix the 'transaction' module, mostly by killing code.
arigo
noreply at buildbot.pypy.org
Mon Apr 23 10:28:03 CEST 2012
Author: Armin Rigo <arigo at tunes.org>
Branch: stm-gc
Changeset: r54645:266863eb0af8
Date: 2012-04-23 10:15 +0200
http://bitbucket.org/pypy/pypy/changeset/266863eb0af8/
Log: Start to fix the 'transaction' module, mostly by killing code.
diff --git a/pypy/module/transaction/__init__.py b/pypy/module/transaction/__init__.py
--- a/pypy/module/transaction/__init__.py
+++ b/pypy/module/transaction/__init__.py
@@ -9,8 +9,8 @@
'set_num_threads': 'interp_transaction.set_num_threads',
'add': 'interp_transaction.add',
'run': 'interp_transaction.run',
- 'add_epoll': 'interp_epoll.add_epoll', # xxx linux only
- 'remove_epoll': 'interp_epoll.remove_epoll', # xxx linux only
+ #'add_epoll': 'interp_epoll.add_epoll', # xxx linux only
+ #'remove_epoll': 'interp_epoll.remove_epoll', # xxx linux only
'local': 'interp_local.W_Local',
}
@@ -22,9 +22,9 @@
"NOT_RPYTHON: patches space.threadlocals to use real threadlocals"
from pypy.module.transaction import interp_transaction
MixedModule.__init__(self, space, *args)
- interp_transaction.state.initialize(space)
- space.threadlocals = interp_transaction.state
+ space.threadlocals = interp_transaction.getstate(space)
def startup(self, space):
from pypy.module.transaction import interp_transaction
- interp_transaction.state.startup(space, space.wrap(self))
+ state = interp_transaction.getstate(space)
+ state.startup(space.wrap(self))
diff --git a/pypy/module/transaction/fifo.py b/pypy/module/transaction/fifo.py
deleted file mode 100644
--- a/pypy/module/transaction/fifo.py
+++ /dev/null
@@ -1,39 +0,0 @@
-
-class Fifo(object):
- def __init__(self):
- self.first = None
- self.last = None
-
- clear = __init__
-
- def append(self, newitem):
- newitem.next = None
- if self.last is None:
- self.first = newitem
- else:
- self.last.next = newitem
- self.last = newitem
-
- def is_empty(self):
- assert (self.first is None) == (self.last is None)
- return self.first is None
-
- def is_of_length_1(self):
- return self.first is not None and self.first is self.last
-
- def popleft(self):
- item = self.first
- self.first = item.next
- if self.first is None:
- self.last = None
- return item
-
- def steal(self, otherfifo):
- if otherfifo.last is not None:
- if self.last is None:
- self.first = otherfifo.first
- else:
- self.last.next = otherfifo.first
- self.last = otherfifo.last
- otherfifo.first = None
- otherfifo.last = None
diff --git a/pypy/module/transaction/interp_local.py b/pypy/module/transaction/interp_local.py
--- a/pypy/module/transaction/interp_local.py
+++ b/pypy/module/transaction/interp_local.py
@@ -1,7 +1,7 @@
from pypy.interpreter.baseobjspace import Wrappable
from pypy.interpreter.typedef import (TypeDef, interp2app, GetSetProperty,
descr_get_dict)
-from pypy.module.transaction.interp_transaction import state
+#from pypy.module.transaction.interp_transaction import state
class W_Local(Wrappable):
diff --git a/pypy/module/transaction/interp_transaction.py b/pypy/module/transaction/interp_transaction.py
--- a/pypy/module/transaction/interp_transaction.py
+++ b/pypy/module/transaction/interp_transaction.py
@@ -1,72 +1,36 @@
from pypy.interpreter.error import OperationError
from pypy.interpreter.gateway import unwrap_spec
-from pypy.module.transaction import threadintf
-from pypy.module.transaction.fifo import Fifo
-from pypy.rlib import rstm, rgc
-from pypy.rlib.debug import ll_assert
-
-
-NUM_THREADS_DEFAULT = 4 # by default
-
-MAIN_THREAD_ID = 0
+from pypy.interpreter.executioncontext import ExecutionContext
+from pypy.rlib import rstm
class State(object):
- # Warning: this is the class of a singleton. It Must Not Be Used
- # Inside Any Transaction!!!! This code is written with the
- # expectation that it is really a global singleton used for
- # synchronization between transactions. So if the STM logic kicks
- # in and makes local copies of it, we loose.
- #
- # Instead, we have a TransactionalState instance that can be
- # written to by transactions (possibly causing conflicts).
+ """The shared, global state. Warning, writes to it cause conflicts.
+ XXX fix me to somehow avoid conflicts at the beginning due to setvalue()
+ """
- def initialize(self, space):
+ def __init__(self, space):
self.space = space
+ self.num_threads = rstm.NUM_THREADS_DEFAULT
self.running = False
- self.num_threads = NUM_THREADS_DEFAULT
- self.transactionalstate = None
- #
self.w_error = None
self.threadobjs = {} # empty during translation
self.threadnums = {} # empty during translation
self.epolls = {}
- self.pending = Fifo()
- self._freeze_()
+ self.pending_before_run = []
- def _freeze_(self):
- self.ll_lock = threadintf.null_ll_lock
- self.ll_no_tasks_pending_lock = threadintf.null_ll_lock
- self.ll_unfinished_lock = threadintf.null_ll_lock
- self.ll_not_ready_to_start_lock = threadintf.null_ll_lock
- self.threadobjs.clear()
- self.threadnums.clear()
- self.epolls.clear()
- self.pending.clear()
- return False
-
- def startup(self, space, w_module):
- assert space is self.space
+ def startup(self, w_module):
if w_module is not None: # for tests
+ space = self.space
self.w_error = space.getattr(w_module,
space.wrap('TransactionError'))
- self.ll_lock = threadintf.allocate_lock()
- self.ll_no_tasks_pending_lock = threadintf.allocate_lock()
- self.ll_unfinished_lock = threadintf.allocate_lock()
- self.lock_unfinished()
- self.ll_not_ready_to_start_lock = threadintf.allocate_lock()
- self.startup_run()
-
- def startup_run(self):
- # this is called at the start of run() too, in order to make
- # test_checkmodule happy
main_ec = self.space.getexecutioncontext() # create it if needed
- main_ec._transaction_pending = self.pending
+ main_ec._transaction_pending = self.pending_before_run
def add_thread(self, id, ec):
# register a new transaction thread
assert id not in self.threadobjs
- ec._transaction_pending = Fifo()
+ ec._transaction_pending = []
self.threadobjs[id] = ec
self.threadnums[id] = len(self.threadnums)
@@ -84,9 +48,13 @@
def setvalue(self, value):
id = rstm.thread_id()
- assert id == MAIN_THREAD_ID # should not be used from a transaction
- self.threadobjs[id] = value
- self.threadnums = {id: 0}
+ if id == rstm.MAIN_THREAD_ID:
+ assert len(self.threadobjs) == 0
+ assert len(self.threadnums) == 0
+ self.threadobjs[id] = value
+ self.threadnums[id] = 0
+ else:
+ self.add_thread(id, value)
def getmainthreadvalue(self):
return self.threadobjs.get(MAIN_THREAD_ID, None)
@@ -98,7 +66,9 @@
for id in self.threadobjs.keys():
if id != MAIN_THREAD_ID:
del self.threadobjs[id]
- self.threadnums = {MAIN_THREAD_ID: 0}
+ for id in self.threadnums.keys():
+ if id != MAIN_THREAD_ID:
+ del self.threadnums[id]
self.epolls.clear()
def get_thread_number(self):
@@ -108,262 +78,89 @@
def get_total_number_of_threads(self):
return 1 + self.num_threads
- # ----------
-
def set_num_threads(self, num):
if self.running:
space = self.space
- raise OperationError(space.w_ValueError,
+ raise OperationError(self.w_error,
space.wrap("cannot change the number of "
"threads when transaction.run() "
"is active"))
self.num_threads = num
- def lock(self):
- """Acquire the main lock. This plays a role similar to the GIL
- in that it must be acquired in order to have the _run_thread()
- code execute; but it is released around every execution of a
- transaction."""
- threadintf.acquire(self.ll_lock, True)
- def unlock(self):
- """Release the main lock."""
- threadintf.release(self.ll_lock)
-
- def lock_no_tasks_pending(self):
- """This lock is acquired when state.pending.is_empty()."""
- threadintf.acquire(self.ll_no_tasks_pending_lock, True)
-
- def unlock_no_tasks_pending(self):
- """Release the ll_no_tasks_pending_lock."""
- threadintf.release(self.ll_no_tasks_pending_lock)
-
- def is_locked_no_tasks_pending(self):
- """Test ll_no_tasks_pending_lock for debugging."""
- just_locked = threadintf.acquire(self.ll_no_tasks_pending_lock, False)
- if just_locked:
- threadintf.release(self.ll_no_tasks_pending_lock)
- return not just_locked
-
- def lock_unfinished(self):
- """This lock is normally acquired. It is released when all threads
- are done."""
- threadintf.acquire(self.ll_unfinished_lock, True)
-
- def unlock_unfinished(self):
- """Release ll_unfinished_lock."""
- threadintf.release(self.ll_unfinished_lock)
-
- def lock_not_ready_to_start(self):
- """This lock is acquired when the threads are still starting.
- It is released when all threads are past their initialization."""
- threadintf.acquire(self.ll_not_ready_to_start_lock, True)
-
- def unlock_not_ready_to_start(self):
- """Release ll_not_ready_to_start_lock."""
- threadintf.release(self.ll_not_ready_to_start_lock)
-
-
-class TransactionalState(object):
-
- def __init__(self):
- self._reraise_exception = None
-
- def has_exception(self):
- return self._reraise_exception is not None
-
- def must_reraise_exception(self, reraise_callback):
- self._reraise_exception = reraise_callback
-
- def close_exceptions(self):
- if self._reraise_exception is not None:
- self._reraise_exception(self)
-
-
-state = State()
+def getstate(space):
+ return space.fromcache(State)
@unwrap_spec(num=int)
def set_num_threads(space, num):
if num < 1:
num = 1
- state.set_num_threads(num)
+ getstate(space).set_num_threads(num)
-class AbstractPending(object):
+class SpaceTransaction(rstm.Transaction):
+
+ def __init__(self, space, w_callback, args):
+ self.space = space
+ self.state = getstate(space)
+ self.w_callback = w_callback
+ self.args = args
def register(self):
- """Register this AbstractPending instance in the pending list
+ """Register this SpaceTransaction instance in the pending list
belonging to the current thread. If called from the main
thread, it is the global list. If called from a transaction,
it is a thread-local list that will be merged with the global
- list when the transaction is done."""
- ec = state.getvalue()
+ list when the transaction is done.
+ NOTE: never register() the same instance multiple times.
+ """
+ ec = self.state.getvalue()
ec._transaction_pending.append(self)
def run(self):
- # may also be overridden
- rstm.perform_transaction(AbstractPending._run_in_transaction,
- AbstractPending, self)
+ if self.retry_counter > 0:
+ self.register() # retrying: will be done later, try others first
+ return
+ #
+ ec = self.space.getexecutioncontext() # create it if needed
+ assert len(ec._transaction_pending) == 0
+ #
+ self.space.call_args(self.w_callback, self.args)
+ #
+ result = ec._transaction_pending
+ ec._transaction_pending = []
+ return result
- @staticmethod
- def _run_in_transaction(pending, retry_counter):
- if retry_counter > 0:
- pending.register() # retrying: will be done later, try others first
- return
- ts = state.transactionalstate
- if ts.has_exception():
- return # return early if there is already an exception to reraise
- try:
- pending.run_in_transaction(state.space)
- except Exception, e:
- ts = state.transactionalstate
- ts.got_exception_applevel = e
- ts.must_reraise_exception(_reraise_from_applevel)
+class InitialTransaction(rstm.Transaction):
-class Pending(AbstractPending):
- def __init__(self, w_callback, args):
- self.w_callback = w_callback
- self.args = args
+ def __init__(self, state):
+ self.state = state
- def run_in_transaction(self, space):
- space.call_args(self.w_callback, self.args)
-
-
-def _reraise_from_applevel(transactionalstate):
- e = transactionalstate.got_exception_applevel
- transactionalstate.got_exception_applevel = None
- raise e
+ def run(self):
+ # initially: return the list of already-added transactions as
+ # the list of transactions to run next, and clear it
+ result = self.state.pending_before_run[:]
+ del self.state.pending_before_run[:]
+ return result
def add(space, w_callback, __args__):
- Pending(w_callback, __args__).register()
-
-
-def _add_list(new_pending_list):
- if new_pending_list.is_empty():
- return
- was_empty = state.pending.is_empty()
- state.pending.steal(new_pending_list)
- if was_empty:
- state.unlock_no_tasks_pending()
-
-
-def _setup_thread(_, retry_counter):
- """Setup a thread. Run as a transaction because it allocates."""
- assert state.running
- my_thread_id = rstm.thread_id()
- my_ec = state.space.createexecutioncontext()
- state.add_thread(my_thread_id, my_ec)
-
-
-def _run_thread():
- """The main function running one of the threads."""
- # Note that we cannot allocate any object here outside a transaction,
- # so we need to be very careful.
- rstm.descriptor_init()
- state.lock()
- rstm.perform_transaction(_setup_thread, AbstractPending, None)
- # wait until all threads reach this point to continue
- if state.num_waiting_threads + 1 < state.num_threads:
- state.num_waiting_threads += 1
- state.unlock()
- state.lock_not_ready_to_start()
- state.lock()
- else:
- state.num_waiting_threads = 0
- state.unlock_not_ready_to_start()
- #
- my_transactions_pending = state.getvalue()._transaction_pending
- #
- while True:
- if state.pending.is_empty():
- ll_assert(state.is_locked_no_tasks_pending(),
- "inconsistently unlocked no_tasks_pending")
- state.num_waiting_threads += 1
- if state.num_waiting_threads == state.num_threads:
- state.finished = True
- state.unlock_no_tasks_pending()
- state.unlock()
- #
- state.lock_no_tasks_pending()
- state.unlock_no_tasks_pending()
- #
- state.lock()
- state.num_waiting_threads -= 1
- if state.finished:
- break
- else:
- pending = state.pending.popleft()
- if state.pending.is_empty():
- state.lock_no_tasks_pending()
- state.unlock()
- #
- while True:
- pending.run()
- # for now, always break out of this loop, unless
- # 'my_transactions_pending' contains precisely one item
- if not my_transactions_pending.is_of_length_1():
- break
- pending = my_transactions_pending.popleft()
- #
- state.lock()
- _add_list(my_transactions_pending)
- #
- if state.num_waiting_threads == 0: # only the last thread to leave
- state.unlock_unfinished()
- state.unlock()
- rstm.descriptor_done()
-
-
- at rgc.no_collect
-def _run():
- # --- start the threads --- don't use the GC here any more! ---
- #
- for i in range(state.num_threads):
- threadintf.start_new_thread(_run_thread)
- #
- state.lock_unfinished() # wait for all threads to finish
- #
- # --- done, we can use the GC again ---
+ transaction = SpaceTransaction(space, w_callback, __args__)
+ transaction.register()
def run(space):
+ state = getstate(space)
if state.running:
raise OperationError(
state.w_error,
space.wrap("recursive invocation of transaction.run()"))
- state.startup_run()
- assert rstm.thread_id() == MAIN_THREAD_ID
- assert not state.is_locked_no_tasks_pending()
- if state.pending.is_empty():
- return
- #
- # 'num_waiting_threads' is the number of threads that are currently
- # waiting for more work to do. When it becomes equal to
- # 'num_threads' then we are done: we set 'finished' to True and this
- # causes all threads to leave. Only accessed during a
- # 'state.lock'-protected region.
- state.num_waiting_threads = 0
- state.finished = False
- #
- state.transactionalstate = TransactionalState()
- state.lock_not_ready_to_start()
state.running = True
- #
- # start the threads and wait for all of them to finish
- rstm.enter_transactional_mode()
- _run()
- rstm.leave_transactional_mode()
- #
- assert state.num_waiting_threads == 0
- assert state.pending.is_empty()
- assert not state.is_locked_no_tasks_pending()
- state.clear_all_values_apart_from_main()
- state.running = False
- ts = state.transactionalstate
- state.transactionalstate = None
- #
- # now re-raise the exception that we got in a transaction
- ts.close_exceptions()
+ try:
+ rstm.run_all_transactions(InitialTransaction(state),
+ num_threads = state.num_threads)
+ finally:
+ state.running = False
+ assert len(state.pending_before_run) == 0
diff --git a/pypy/module/transaction/threadintf.py b/pypy/module/transaction/threadintf.py
deleted file mode 100644
--- a/pypy/module/transaction/threadintf.py
+++ /dev/null
@@ -1,35 +0,0 @@
-import thread
-from pypy.module.thread import ll_thread
-from pypy.rlib.objectmodel import we_are_translated
-from pypy.rpython.annlowlevel import llhelper
-from pypy.rlib.debug import fatalerror
-
-
-null_ll_lock = ll_thread.null_ll_lock
-
-def allocate_lock():
- if we_are_translated():
- return ll_thread.allocate_ll_lock()
- else:
- return thread.allocate_lock()
-
-def acquire(lock, wait):
- if we_are_translated():
- return ll_thread.acquire_NOAUTO(lock, wait)
- else:
- return lock.acquire(wait)
-
-def release(lock):
- if we_are_translated():
- ll_thread.release_NOAUTO(lock)
- else:
- lock.release()
-
-def start_new_thread(callback):
- if we_are_translated():
- llcallback = llhelper(ll_thread.CALLBACK, callback)
- ident = ll_thread.c_thread_start_NOGIL(llcallback)
- if ident == -1:
- fatalerror("cannot start thread")
- else:
- thread.start_new_thread(callback, ())
More information about the pypy-commit
mailing list