[pypy-commit] stmgc default: add abort/commit callbacks

Raemi noreply at buildbot.pypy.org
Tue Sep 9 11:33:18 CEST 2014


Author: Remi Meier <remi.meier at inf.ethz.ch>
Branch: 
Changeset: r1378:0e5c6ce3c7bd
Date: 2014-09-09 11:31 +0200
http://bitbucket.org/pypy/stmgc/changeset/0e5c6ce3c7bd/

Log:	add abort/commit callbacks

diff --git a/c8/stm/core.c b/c8/stm/core.c
--- a/c8/stm/core.c
+++ b/c8/stm/core.c
@@ -327,6 +327,8 @@
     assert(list_is_empty(STM_PSEGMENT->objects_pointing_to_nursery));
     assert(tree_is_cleared(STM_PSEGMENT->young_outside_nursery));
     assert(tree_is_cleared(STM_PSEGMENT->nursery_objects_shadows));
+    assert(tree_is_cleared(STM_PSEGMENT->callbacks_on_commit_and_abort[0]));
+    assert(tree_is_cleared(STM_PSEGMENT->callbacks_on_commit_and_abort[1]));
 
     check_nursery_at_transaction_start();
 
@@ -403,6 +405,7 @@
 
     release_modified_objs_lock(STM_SEGMENT->segment_num);
 
+    invoke_and_clear_user_callbacks(0);   /* for commit */
 
     s_mutex_lock();
 
@@ -508,6 +511,11 @@
 
     stm_thread_local_t *tl = STM_SEGMENT->running_thread;
 
+    if (tl->mem_clear_on_abort)
+        memset(tl->mem_clear_on_abort, 0, tl->mem_bytes_to_clear_on_abort);
+
+    invoke_and_clear_user_callbacks(1);   /* for abort */
+
     _finish_transaction();
     /* cannot access STM_SEGMENT or STM_PSEGMENT from here ! */
 
@@ -544,6 +552,7 @@
         _validate_and_turn_inevitable();
         STM_PSEGMENT->transaction_state = TS_INEVITABLE;
         stm_rewind_jmp_forget(STM_SEGMENT->running_thread);
+        invoke_and_clear_user_callbacks(0);   /* for commit */
     }
     else {
         assert(STM_PSEGMENT->transaction_state == TS_INEVITABLE);
diff --git a/c8/stm/core.h b/c8/stm/core.h
--- a/c8/stm/core.h
+++ b/c8/stm/core.h
@@ -59,6 +59,8 @@
 
     uint8_t transaction_state;
 
+    struct tree_s *callbacks_on_commit_and_abort[2];
+
     struct stm_commit_log_entry_s *last_commit_log_entry;
 
     struct stm_shadowentry_s *shadowstack_at_start_of_transaction;
diff --git a/c8/stm/extra.c b/c8/stm/extra.c
new file mode 100644
--- /dev/null
+++ b/c8/stm/extra.c
@@ -0,0 +1,82 @@
+#ifndef _STM_CORE_H_
+# error "must be compiled via stmgc.c"
+#endif
+
+
+static long register_callbacks(stm_thread_local_t *tl,
+                               void *key, void callback(void *), long index)
+{
+    if (!_stm_in_transaction(tl)) {
+        /* check that the current thread-local is really running a
+           transaction, and do nothing otherwise. */
+        return -1;
+    }
+
+    if (STM_PSEGMENT->transaction_state == TS_INEVITABLE) {
+        /* ignore callbacks if we're in an inevitable transaction
+           (which cannot abort) */
+        return -1;
+    }
+
+    struct tree_s *callbacks;
+    callbacks = STM_PSEGMENT->callbacks_on_commit_and_abort[index];
+
+    if (callback == NULL) {
+        /* double-unregistering works, but return 0 */
+        return tree_delete_item(callbacks, (uintptr_t)key);
+    }
+    else {
+        /* double-registering the same key will crash */
+        tree_insert(callbacks, (uintptr_t)key, (uintptr_t)callback);
+        return 1;
+    }
+}
+
+long stm_call_on_commit(stm_thread_local_t *tl,
+                       void *key, void callback(void *))
+{
+    long result = register_callbacks(tl, key, callback, 0);
+    if (result < 0 && callback != NULL) {
+        /* no regular transaction running, invoke the callback
+           immediately */
+        callback(key);
+    }
+    return result;
+}
+
+long stm_call_on_abort(stm_thread_local_t *tl,
+                       void *key, void callback(void *))
+{
+    return register_callbacks(tl, key, callback, 1);
+}
+
+static void invoke_and_clear_user_callbacks(long index)
+{
+    struct tree_s *callbacks;
+
+    /* clear the callbacks that we don't want to invoke at all */
+    callbacks = STM_PSEGMENT->callbacks_on_commit_and_abort[1 - index];
+    if (!tree_is_cleared(callbacks))
+        tree_clear(callbacks);
+
+    /* invoke the callbacks from the other group */
+    callbacks = STM_PSEGMENT->callbacks_on_commit_and_abort[index];
+    if (tree_is_cleared(callbacks))
+        return;
+    STM_PSEGMENT->callbacks_on_commit_and_abort[index] = tree_create();
+
+    wlog_t *item;
+    TREE_LOOP_FORWARD(callbacks, item) {
+        void *key = (void *)item->addr;
+        void (*callback)(void *) = (void(*)(void *))item->val;
+        assert(key != NULL);
+        assert(callback != NULL);
+
+        /* The callback may call stm_call_on_abort(key, NULL).  It is ignored,
+           because 'callbacks_on_commit_and_abort' was cleared already. */
+        callback(key);
+
+    } TREE_LOOP_END;
+
+    tree_free(callbacks);
+}
diff --git a/c8/stm/extra.h b/c8/stm/extra.h
new file mode 100644
--- /dev/null
+++ b/c8/stm/extra.h
@@ -0,0 +1,3 @@
+
+static void invoke_and_clear_user_callbacks(long index);
+/* 0 = for commit, 1 = for abort */
diff --git a/c8/stm/setup.c b/c8/stm/setup.c
--- a/c8/stm/setup.c
+++ b/c8/stm/setup.c
@@ -112,6 +112,9 @@
         pr->objects_pointing_to_nursery = list_create();
         pr->young_outside_nursery = tree_create();
         pr->nursery_objects_shadows = tree_create();
+        pr->callbacks_on_commit_and_abort[0] = tree_create();
+        pr->callbacks_on_commit_and_abort[1] = tree_create();
+
         pr->last_commit_log_entry = &commit_log_root;
         pr->pub.transaction_read_version = 0xff;
     }
@@ -145,6 +148,8 @@
         tree_free(pr->modified_old_objects);
         tree_free(pr->young_outside_nursery);
         tree_free(pr->nursery_objects_shadows);
+        tree_free(pr->callbacks_on_commit_and_abort[0]);
+        tree_free(pr->callbacks_on_commit_and_abort[1]);
     }
 
     munmap(stm_object_pages, TOTAL_MEMORY);
diff --git a/c8/stmgc.c b/c8/stmgc.c
--- a/c8/stmgc.c
+++ b/c8/stmgc.c
@@ -11,6 +11,7 @@
 #include "stm/setup.h"
 #include "stm/fprintcolor.h"
 #include "stm/rewind_setjmp.h"
+#include "stm/extra.h"
 
 #include "stm/list.c"
 #include "stm/pagecopy.c"
@@ -25,3 +26,4 @@
 #include "stm/hash_id.c"
 #include "stm/prebuilt.c"
 #include "stm/misc.c"
+#include "stm/extra.c"
diff --git a/c8/stmgc.h b/c8/stmgc.h
--- a/c8/stmgc.h
+++ b/c8/stmgc.h
@@ -47,6 +47,9 @@
     /* rewind_setjmp's interface */
     rewind_jmp_thread rjthread;
     struct stm_shadowentry_s *shadowstack, *shadowstack_base;
+
+    char *mem_clear_on_abort;
+    size_t mem_bytes_to_clear_on_abort;
     long last_abort__bytes_in_nursery;
     /* the next fields are handled internally by the library */
     int associated_segment_num;
@@ -194,6 +197,11 @@
 object_t *stm_setup_prebuilt(object_t *);
 
 
+long stm_call_on_abort(stm_thread_local_t *, void *key, void callback(void *));
+long stm_call_on_commit(stm_thread_local_t *, void *key, void callback(void *));
+
+
+
 #ifdef STM_NO_AUTOMATIC_SETJMP
 int stm_is_inevitable(void);
 #else
diff --git a/c8/test/support.py b/c8/test/support.py
--- a/c8/test/support.py
+++ b/c8/test/support.py
@@ -24,6 +24,8 @@
 typedef struct {
     rewind_jmp_thread rjthread;
     struct stm_shadowentry_s *shadowstack, *shadowstack_base;
+    char *mem_clear_on_abort;
+    size_t mem_bytes_to_clear_on_abort;
     long last_abort__bytes_in_nursery;
     int associated_segment_num;
     struct stm_thread_local_s *prev, *next;
@@ -79,6 +81,10 @@
 void stm_set_prebuilt_identityhash(object_t *obj, uint64_t hash);
 
 
+long stm_call_on_abort(stm_thread_local_t *, void *key, void callback(void *));
+long stm_call_on_commit(stm_thread_local_t *, void *key, void callback(void *));
+
+
 long _stm_count_modified_old_objects(void);
 long _stm_count_objects_pointing_to_nursery(void);
 object_t *_stm_enum_modified_old_objects(long index);
diff --git a/c8/test/test_extra.py b/c8/test/test_extra.py
new file mode 100644
--- /dev/null
+++ b/c8/test/test_extra.py
@@ -0,0 +1,194 @@
+from support import *
+import py
+
+def ffi_new_aligned(string):
+    ALIGN = ffi.sizeof("void *")
+    p1 = ffi.new("void *[]", (len(string) + ALIGN) // ALIGN)
+    p2 = ffi.gc(ffi.cast("char *", p1), lambda p2: p1)
+    p2[0:len(string)+1] = string + '\x00'
+    assert ffi.string(p2) == string
+    return p2
+
+
+class TestExtra(BaseTest):
+
+    def test_clear_on_abort(self):
+        p = ffi.new("char[]", "hello")
+        tl = self.get_stm_thread_local()
+        tl.mem_clear_on_abort = p
+        tl.mem_bytes_to_clear_on_abort = 2
+        #
+        self.start_transaction()
+        assert ffi.string(p) == "hello"
+        self.abort_transaction()
+        assert p[0] == '\0'
+        assert p[1] == '\0'
+        assert p[2] == 'l'
+        assert p[3] == 'l'
+        assert p[4] == 'o'
+
+    def test_call_on_abort(self):
+        p0 = ffi_new_aligned("aaa")
+        p1 = ffi_new_aligned("hello")
+        p2 = ffi_new_aligned("removed")
+        p3 = ffi_new_aligned("world")
+        p4 = ffi_new_aligned("00")
+        #
+        @ffi.callback("void(void *)")
+        def clear_me(p):
+            p = ffi.cast("char *", p)
+            p[0] = chr(ord(p[0]) + 1)
+        #
+        self.start_transaction()
+        x = lib.stm_call_on_abort(self.get_stm_thread_local(), p0, clear_me)
+        assert x != 0
+        # the registered callbacks are removed on
+        # successful commit
+        self.commit_transaction()
+        assert ffi.string(p0) == "aaa"
+        #
+        self.start_transaction()
+        x = lib.stm_call_on_abort(self.get_stm_thread_local(), p1, clear_me)
+        assert x != 0
+        x = lib.stm_call_on_abort(self.get_stm_thread_local(), p2, clear_me)
+        assert x != 0
+        x = lib.stm_call_on_abort(self.get_stm_thread_local(), p3, clear_me)
+        assert x != 0
+        x = lib.stm_call_on_abort(self.get_stm_thread_local(), p2, ffi.NULL)
+        assert x != 0
+        x = lib.stm_call_on_abort(self.get_stm_thread_local(), p2, ffi.NULL)
+        assert x == 0
+        x = lib.stm_call_on_abort(self.get_stm_thread_local(), p4, ffi.NULL)
+        assert x == 0
+        assert ffi.string(p0) == "aaa"
+        assert ffi.string(p1) == "hello"
+        assert ffi.string(p2) == "removed"
+        assert ffi.string(p3) == "world"
+        self.abort_transaction()
+        #
+        assert ffi.string(p0) == "aaa"
+        assert ffi.string(p1) == "iello"
+        assert ffi.string(p2) == "removed"
+        assert ffi.string(p3) == "xorld"
+        #
+        # the registered callbacks are removed on abort
+        self.start_transaction()
+        self.abort_transaction()
+        assert ffi.string(p0) == "aaa"
+        assert ffi.string(p1) == "iello"
+        assert ffi.string(p2) == "removed"
+        assert ffi.string(p3) == "xorld"
+        assert ffi.string(p4) == "00"
+
+    def test_ignores_if_outside_transaction(self):
+        @ffi.callback("void(void *)")
+        def dont_see_me(p):
+            seen.append(p)
+        #
+        seen = []
+        p0 = ffi_new_aligned("aaa")
+        x = lib.stm_call_on_abort(self.get_stm_thread_local(), p0, dont_see_me)
+        assert x != 0
+        self.start_transaction()
+        self.abort_transaction()
+        assert seen == []
+
+    def test_call_on_commit(self):
+        p0 = ffi_new_aligned("aaa")
+        p1 = ffi_new_aligned("hello")
+        p2 = ffi_new_aligned("removed")
+        p3 = ffi_new_aligned("world")
+        p4 = ffi_new_aligned("00")
+        #
+        @ffi.callback("void(void *)")
+        def clear_me(p):
+            p = ffi.cast("char *", p)
+            p[0] = chr(ord(p[0]) + 1)
+        #
+        self.start_transaction()
+        x = lib.stm_call_on_commit(self.get_stm_thread_local(), p0, clear_me)
+        assert x != 0
+        # the registered callbacks are not called on abort
+        self.abort_transaction()
+        assert ffi.string(p0) == "aaa"
+        #
+        self.start_transaction()
+        x = lib.stm_call_on_commit(self.get_stm_thread_local(), p1, clear_me)
+        assert x != 0
+        x = lib.stm_call_on_commit(self.get_stm_thread_local(), p2, clear_me)
+        assert x != 0
+        x = lib.stm_call_on_commit(self.get_stm_thread_local(), p3, clear_me)
+        assert x != 0
+        x = lib.stm_call_on_commit(self.get_stm_thread_local(), p2, ffi.NULL)
+        assert x != 0
+        x = lib.stm_call_on_commit(self.get_stm_thread_local(), p2, ffi.NULL)
+        assert x == 0
+        x = lib.stm_call_on_commit(self.get_stm_thread_local(), p4, ffi.NULL)
+        assert x == 0
+        assert ffi.string(p0) == "aaa"
+        assert ffi.string(p1) == "hello"
+        assert ffi.string(p2) == "removed"
+        assert ffi.string(p3) == "world"
+        self.commit_transaction()
+        #
+        assert ffi.string(p0) == "aaa"
+        assert ffi.string(p1) == "iello"
+        assert ffi.string(p2) == "removed"
+        assert ffi.string(p3) == "xorld"
+        assert ffi.string(p4) == "00"
+
+    def test_call_on_commit_immediately_if_inevitable(self):
+        p0 = ffi_new_aligned("aaa")
+        self.start_transaction()
+        self.become_inevitable()
+        #
+        @ffi.callback("void(void *)")
+        def clear_me(p):
+            p = ffi.cast("char *", p)
+            p[0] = chr(ord(p[0]) + 1)
+        #
+        lib.stm_call_on_commit(self.get_stm_thread_local(), p0, clear_me)
+        assert ffi.string(p0) == "baa"
+        self.commit_transaction()
+        assert ffi.string(p0) == "baa"
+
+    def test_call_on_commit_as_soon_as_inevitable(self):
+        p0 = ffi_new_aligned("aaa")
+        self.start_transaction()
+        #
+        @ffi.callback("void(void *)")
+        def clear_me(p):
+            p = ffi.cast("char *", p)
+            p[0] = chr(ord(p[0]) + 1)
+        #
+        lib.stm_call_on_commit(self.get_stm_thread_local(), p0, clear_me)
+        assert ffi.string(p0) == "aaa"
+        self.become_inevitable()
+        assert ffi.string(p0) == "baa"
+        self.commit_transaction()
+        assert ffi.string(p0) == "baa"
+
+    def test_call_on_commit_immediately_if_outside_transaction(self):
+        p0 = ffi_new_aligned("aaa")
+        #
+        @ffi.callback("void(void *)")
+        def clear_me(p):
+            p = ffi.cast("char *", p)
+            p[0] = chr(ord(p[0]) + 1)
+        #
+        lib.stm_call_on_commit(self.get_stm_thread_local(), p0, clear_me)
+        assert ffi.string(p0) == "baa"
+        self.start_transaction()
+        assert ffi.string(p0) == "baa"
+        self.commit_transaction()
+        assert ffi.string(p0) == "baa"
+
+    def test_stm_become_globally_unique_transaction(self):
+        self.start_transaction()
+        #
+        self.switch(1)
+        self.start_transaction()
+        self.become_globally_unique_transaction()
+        assert lib.stm_is_inevitable()
+        #
+        py.test.raises(Conflict, self.switch, 0)


More information about the pypy-commit mailing list