[pypy-commit] pypy stm-gc: Thread-local data in the transaction module. See docstring for

arigo noreply at buildbot.pypy.org
Fri Apr 20 10:43:09 CEST 2012


Author: Armin Rigo <arigo at tunes.org>
Branch: stm-gc
Changeset: r54562:5ccd3d3146e0
Date: 2012-04-20 10:38 +0200
http://bitbucket.org/pypy/pypy/changeset/5ccd3d3146e0/

Log:	Thread-local data in the transaction module. See docstring for two
	foreseen usage patterns.

diff --git a/pypy/module/transaction/__init__.py b/pypy/module/transaction/__init__.py
--- a/pypy/module/transaction/__init__.py
+++ b/pypy/module/transaction/__init__.py
@@ -11,6 +11,7 @@
         'run': 'interp_transaction.run',
         'add_epoll': 'interp_epoll.add_epoll',        # xxx linux only
         'remove_epoll': 'interp_epoll.remove_epoll',  # xxx linux only
+        'local': 'interp_local.W_Local',
     }
 
     appleveldefs = {
diff --git a/pypy/module/transaction/interp_local.py b/pypy/module/transaction/interp_local.py
new file mode 100644
--- /dev/null
+++ b/pypy/module/transaction/interp_local.py
@@ -0,0 +1,53 @@
+from pypy.interpreter.baseobjspace import Wrappable
+from pypy.interpreter.typedef import (TypeDef, interp2app, GetSetProperty,
+    descr_get_dict)
+from pypy.module.transaction.interp_transaction import state
+
+
+class W_Local(Wrappable):
+    """Thread-local data.  Behaves like a regular object, but its content
+    is not shared between multiple concurrently-running transactions.
+    It can be accessed without conflicts.
+
+    It can be used for purely transaction-local data.
+
+    It can also be used for long-living caches that store values that
+    are (1) not too costly to compute and (2) not too memory-hungry,
+    because they will end up being computed and stored once per actual
+    thread.
+    """
+
+    def __init__(self, space):
+        self.dicts = []
+        self._update_dicts(space)
+        # unless we call transaction.set_num_threads() afterwards, this
+        # 'local' object is now initialized with the correct number of
+        # dictionaries, to avoid conflicts later if _update_dicts() is
+        # called in a transaction.
+
+    def _update_dicts(self, space):
+        new = state.get_number_of_threads() - len(self.dicts)
+        if new <= 0:
+            return
+        # update the list without appending to it (to keep it non-resizable)
+        self.dicts = self.dicts + [space.newdict(instance=True)
+                                   for i in range(new)]
+
+    def getdict(self, space):
+        n = state.get_thread_number()
+        try:
+            return self.dicts[n]
+        except IndexError:
+            self._update_dicts(space)
+            assert n < len(self.dicts)
+            return self.dicts[n]
+
+def descr_local__new__(space, w_subtype):
+    local = W_Local(space)
+    return space.wrap(local)
+
+W_Local.typedef = TypeDef("transaction.local",
+            __new__ = interp2app(descr_local__new__),
+            __dict__ = GetSetProperty(descr_get_dict, cls=W_Local),
+            )
+W_Local.typedef.acceptable_as_base_class = False
diff --git a/pypy/module/transaction/interp_transaction.py b/pypy/module/transaction/interp_transaction.py
--- a/pypy/module/transaction/interp_transaction.py
+++ b/pypy/module/transaction/interp_transaction.py
@@ -23,11 +23,13 @@
         self.ll_no_tasks_pending_lock = threadintf.null_ll_lock
         self.ll_unfinished_lock = threadintf.null_ll_lock
         self.threadobjs = {}      # empty during translation
+        self.threadnums = {}      # empty during translation
         self.epolls = None
         self.pending = Fifo()
 
     def _freeze_(self):
         self.threadobjs.clear()
+        self.threadnums.clear()
         return False
 
     def startup(self, space, w_module):
@@ -52,6 +54,7 @@
         assert id not in self.threadobjs
         ec._transaction_pending = Fifo()
         self.threadobjs[id] = ec
+        self.threadnums[id] = len(self.threadnums)
 
     # ---------- interface for ThreadLocals ----------
     # This works really like a thread-local, which may have slightly
@@ -69,6 +72,7 @@
         id = rstm.thread_id()
         assert id == MAIN_THREAD_ID   # should not be used from a transaction
         self.threadobjs[id] = value
+        self.threadnums = {id: 0}
 
     def getmainthreadvalue(self):
         return self.threadobjs.get(MAIN_THREAD_ID, None)
@@ -80,6 +84,14 @@
         for id in self.threadobjs.keys():
             if id != MAIN_THREAD_ID:
                 del self.threadobjs[id]
+        self.threadnums = {MAIN_THREAD_ID: 0}
+
+    def get_thread_number(self):
+        id = rstm.thread_id()
+        return self.threadnums[id]
+
+    def get_number_of_threads(self):
+        return 1 + self.num_threads
 
     # ----------
 
diff --git a/pypy/module/transaction/test/test_local.py b/pypy/module/transaction/test/test_local.py
new file mode 100644
--- /dev/null
+++ b/pypy/module/transaction/test/test_local.py
@@ -0,0 +1,75 @@
+import py
+from pypy.conftest import gettestobjspace
+
+
+class AppTestLocal:
+    def setup_class(cls):
+        cls.space = gettestobjspace(usemodules=['transaction'])
+
+    def test_simple(self):
+        import transaction
+        x = transaction.local()
+        x.foo = 42
+        assert x.foo == 42
+        assert hasattr(x, 'foo')
+        assert not hasattr(x, 'bar')
+        assert getattr(x, 'foo', 84) == 42
+        assert getattr(x, 'bar', 84) == 84
+
+    def test_transaction_local(self):
+        import transaction
+        transaction.set_num_threads(2)
+        x = transaction.local()
+        all_lists = []
+
+        def f(n):
+            if not hasattr(x, 'lst'):
+                x.lst = []
+                all_lists.append(x.lst)
+            x.lst.append(n)
+            if n > 0:
+                transaction.add(f, n - 1)
+                transaction.add(f, n - 1)
+        transaction.add(f, 5)
+        transaction.run()
+
+        assert not hasattr(x, 'lst')
+        assert len(all_lists) == 2
+        total = all_lists[0] + all_lists[1]
+        assert total.count(5) == 1
+        assert total.count(4) == 2
+        assert total.count(3) == 4
+        assert total.count(2) == 8
+        assert total.count(1) == 16
+        assert total.count(0) == 32
+        assert len(total) == 63
+
+    def test_transaction_local_growing(self):
+        import transaction
+        transaction.set_num_threads(1)
+        x = transaction.local()
+        all_lists = []
+
+        def f(n):
+            if not hasattr(x, 'lst'):
+                x.lst = []
+                all_lists.append(x.lst)
+            x.lst.append(n)
+            if n > 0:
+                transaction.add(f, n - 1)
+                transaction.add(f, n - 1)
+        transaction.add(f, 5)
+
+        transaction.set_num_threads(2)    # more than 1 specified above
+        transaction.run()
+
+        assert not hasattr(x, 'lst')
+        assert len(all_lists) == 2
+        total = all_lists[0] + all_lists[1]
+        assert total.count(5) == 1
+        assert total.count(4) == 2
+        assert total.count(3) == 4
+        assert total.count(2) == 8
+        assert total.count(1) == 16
+        assert total.count(0) == 32
+        assert len(total) == 63


More information about the pypy-commit mailing list