[pypy-commit] pypy stm: (bivab, arigo)

arigo noreply at buildbot.pypy.org
Thu Jan 19 18:00:17 CET 2012


Author: Armin Rigo <arigo at tunes.org>
Branch: stm
Changeset: r51492:bc50696a4294
Date: 2012-01-19 17:59 +0100
http://bitbucket.org/pypy/pypy/changeset/bc50696a4294/

Log:	(bivab, arigo)

	- really create a low-level transaction for running each transaction
	- changed the addition of new transactions to go into a per-thread
	list until the transaction really commits
	- enable some more tests

diff --git a/pypy/module/transaction/interp_transaction.py b/pypy/module/transaction/interp_transaction.py
--- a/pypy/module/transaction/interp_transaction.py
+++ b/pypy/module/transaction/interp_transaction.py
@@ -1,6 +1,7 @@
 from pypy.interpreter.error import OperationError
 from pypy.interpreter.gateway import unwrap_spec
 from pypy.module.transaction import threadintf
+from pypy.rlib import rstm
 
 
 NUM_THREADS_DEFAULT = 4     # by default
@@ -21,8 +22,9 @@
         self.ll_unfinished_lock = threadintf.allocate_lock()
         self.w_error = space.new_exception_class(
             "transaction.TransactionError")
-        self.lock_no_tasks_pending()
         self.lock_unfinished()
+        self.main_thread_id = threadintf.thread_id()
+        self.pending_lists = {self.main_thread_id: self.pending}
 
     def set_num_threads(self, num):
         if self.running:
@@ -46,9 +48,11 @@
     def unlock_no_tasks_pending(self):
         threadintf.release(self.ll_no_tasks_pending_lock)
 
-    def assert_locked_no_tasks_pending(self):
+    def is_locked_no_tasks_pending(self):
         just_locked = threadintf.acquire(self.ll_no_tasks_pending_lock, False)
-        assert not just_locked
+        if just_locked:
+            threadintf.release(self.ll_no_tasks_pending_lock)
+        return not just_locked
 
     def lock_unfinished(self):
         threadintf.acquire(self.ll_unfinished_lock, True)
@@ -69,31 +73,47 @@
 
 
 class Pending:
+    _alloc_nonmovable_ = True
+
     def __init__(self, w_callback, args):
         self.w_callback = w_callback
         self.args = args
 
     def run(self):
+        rstm.perform_transaction(Pending._run_in_transaction, Pending, self)
+
+    @staticmethod
+    def _run_in_transaction(pending):
         space = state.space
-        space.call_args(self.w_callback, self.args)
+        space.call_args(pending.w_callback, pending.args)
         # xxx exceptions?
 
 
 def add(space, w_callback, __args__):
-    state.lock()
+    id = threadintf.thread_id()
+    state.pending_lists[id].append(Pending(w_callback, __args__))
+
+
+def add_list(new_pending_list):
+    if len(new_pending_list) == 0:
+        return
     was_empty = len(state.pending) == 0
-    state.pending.append(Pending(w_callback, __args__))
+    state.pending += new_pending_list
+    del new_pending_list[:]
     if was_empty:
         state.unlock_no_tasks_pending()
-    state.unlock()
 
 
 def _run_thread():
     state.lock()
+    my_pending_list = []
+    my_thread_id = threadintf.thread_id()
+    state.pending_lists[my_thread_id] = my_pending_list
+    rstm.descriptor_init()
     #
     while True:
         if len(state.pending) == 0:
-            state.assert_locked_no_tasks_pending()
+            assert state.is_locked_no_tasks_pending()
             state.num_waiting_threads += 1
             if state.num_waiting_threads == state.num_threads:
                 state.finished = True
@@ -114,7 +134,10 @@
             state.unlock()
             pending.run()
             state.lock()
+            add_list(my_pending_list)
     #
+    rstm.descriptor_done()
+    del state.pending_lists[my_thread_id]
     if state.num_waiting_threads == 0:    # only the last thread to leave
         state.unlock_unfinished()
     state.unlock()
@@ -125,6 +148,9 @@
         raise OperationError(
             state.w_error,
             space.wrap("recursive invocation of transaction.run()"))
+    assert not state.is_locked_no_tasks_pending()
+    if len(state.pending) == 0:
+        return
     state.num_waiting_threads = 0
     state.finished = False
     state.running = True
@@ -132,8 +158,10 @@
     for i in range(state.num_threads):
         threadintf.start_new_thread(_run_thread, ())
     #
-    state.lock_unfinished()
+    state.lock_unfinished()  # wait for all threads to finish
+    #
     assert state.num_waiting_threads == 0
     assert len(state.pending) == 0
-    state.lock_no_tasks_pending()
+    assert state.pending_lists.keys() == [state.main_thread_id]
+    assert not state.is_locked_no_tasks_pending()
     state.running = False
diff --git a/pypy/module/transaction/test/test_transaction.py b/pypy/module/transaction/test/test_transaction.py
--- a/pypy/module/transaction/test/test_transaction.py
+++ b/pypy/module/transaction/test/test_transaction.py
@@ -4,7 +4,6 @@
 
 class AppTestTransaction: 
     def setup_class(cls):
-        py.test.skip("XXX not transactional!")
         cls.space = gettestobjspace(usemodules=['transaction'])
 
     def test_simple(self):
@@ -34,7 +33,7 @@
         assert len(lst) == 7 * 3
         seen = set()
         for start in range(0, 21, 7):
-            seen.append(lst[start])
+            seen.add(lst[start])
             for index in range(7):
                 assert lst[start + index] == lst[start] + index
         assert seen == set([10, 20, 30])
diff --git a/pypy/module/transaction/threadintf.py b/pypy/module/transaction/threadintf.py
--- a/pypy/module/transaction/threadintf.py
+++ b/pypy/module/transaction/threadintf.py
@@ -7,7 +7,7 @@
 
 def acquire(lock, wait):
     "NOT_RPYTHON"
-    lock.acquire(wait)
+    return lock.acquire(wait)
 
 def release(lock):
     "NOT_RPYTHON"
@@ -17,3 +17,6 @@
     "NOT_RPYTHON"
     thread.start_new_thread(callback, args)
 
+def thread_id():
+    "NOT_RPYTHON"
+    return thread.get_ident()


More information about the pypy-commit mailing list