[pypy-commit] stmgc default: add safe-point mechanism again & globally_unique_transaction

Raemi noreply at buildbot.pypy.org
Tue Sep 9 14:06:00 CEST 2014


Author: Remi Meier <remi.meier at inf.ethz.ch>
Branch: 
Changeset: r1379:06a773048b9a
Date: 2014-09-09 14:07 +0200
http://bitbucket.org/pypy/stmgc/changeset/06a773048b9a/

Log:	add safe-point mechanism again & globally_unique_transaction

diff --git a/c8/stm/core.c b/c8/stm/core.c
--- a/c8/stm/core.c
+++ b/c8/stm/core.c
@@ -74,6 +74,11 @@
        most current one and apply all changes done
        by other transactions. Abort if we read one of
        the committed objs. */
+    if (STM_PSEGMENT->transaction_state == TS_INEVITABLE) {
+        assert((uintptr_t)STM_PSEGMENT->last_commit_log_entry->next == -1);
+        return;
+    }
+
     volatile struct stm_commit_log_entry_s *cl, *prev_cl;
     cl = prev_cl = (volatile struct stm_commit_log_entry_s *)
         STM_PSEGMENT->last_commit_log_entry;
@@ -83,6 +88,9 @@
     while ((cl = cl->next)) {
         if ((uintptr_t)cl == -1) {
             /* there is an inevitable transaction running */
+#if STM_TESTS
+            stm_abort_transaction();
+#endif
             cl = prev_cl;
             usleep(1);          /* XXX */
             continue;
@@ -306,13 +314,16 @@
         goto retry;
     /* GS invalid before this point! */
 
+    assert(STM_PSEGMENT->safe_point == SP_NO_TRANSACTION);
     assert(STM_PSEGMENT->transaction_state == TS_NONE);
     STM_PSEGMENT->transaction_state = TS_REGULAR;
+    STM_PSEGMENT->safe_point = SP_RUNNING;
 #ifndef NDEBUG
     STM_PSEGMENT->running_pthread = pthread_self();
 #endif
     STM_PSEGMENT->shadowstack_at_start_of_transaction = tl->shadowstack;
 
+    enter_safe_point_if_requested();
     dprintf(("start_transaction\n"));
 
     s_mutex_unlock();
@@ -372,6 +383,7 @@
 {
     stm_thread_local_t *tl = STM_SEGMENT->running_thread;
 
+    STM_PSEGMENT->safe_point = SP_NO_TRANSACTION;
     STM_PSEGMENT->transaction_state = TS_NONE;
     list_clear(STM_PSEGMENT->objects_pointing_to_nursery);
 
@@ -382,6 +394,7 @@
 void stm_commit_transaction(void)
 {
     assert(!_has_mutex());
+    assert(STM_PSEGMENT->safe_point == SP_RUNNING);
     assert(STM_PSEGMENT->running_pthread == pthread_self());
 
     dprintf(("stm_commit_transaction()\n"));
@@ -412,6 +425,10 @@
     assert(STM_SEGMENT->nursery_end == NURSERY_END);
     stm_rewind_jmp_forget(STM_SEGMENT->running_thread);
 
+    if (globally_unique_transaction && STM_PSEGMENT->transaction_state == TS_INEVITABLE) {
+        committed_globally_unique_transaction();
+    }
+
     /* done */
     _finish_transaction();
     /* cannot access STM_SEGMENT or STM_PSEGMENT from here ! */
@@ -516,23 +533,25 @@
 
     invoke_and_clear_user_callbacks(1);   /* for abort */
 
+    if (is_abort(STM_SEGMENT->nursery_end)) {
+        /* done aborting */
+        STM_SEGMENT->nursery_end = pause_signalled ? NSE_SIGPAUSE
+                                                   : NURSERY_END;
+    }
+
     _finish_transaction();
     /* cannot access STM_SEGMENT or STM_PSEGMENT from here ! */
 
     return tl;
 }
 
-
-#ifdef STM_NO_AUTOMATIC_SETJMP
-void _test_run_abort(stm_thread_local_t *tl) __attribute__((noreturn));
-#endif
-
-void stm_abort_transaction(void)
+static void abort_with_mutex(void)
 {
-    s_mutex_lock();
     stm_thread_local_t *tl = abort_with_mutex_no_longjmp();
     s_mutex_unlock();
 
+    usleep(1);
+
 #ifdef STM_NO_AUTOMATIC_SETJMP
     _test_run_abort(tl);
 #else
@@ -542,9 +561,22 @@
 }
 
 
+
+#ifdef STM_NO_AUTOMATIC_SETJMP
+void _test_run_abort(stm_thread_local_t *tl) __attribute__((noreturn));
+#endif
+
+void stm_abort_transaction(void)
+{
+    s_mutex_lock();
+    abort_with_mutex();
+}
+
+
 void _stm_become_inevitable(const char *msg)
 {
     s_mutex_lock();
+    enter_safe_point_if_requested();
 
     if (STM_PSEGMENT->transaction_state == TS_REGULAR) {
         dprintf(("become_inevitable: %s\n", msg));
@@ -560,3 +592,13 @@
 
     s_mutex_unlock();
 }
+
+void stm_become_globally_unique_transaction(stm_thread_local_t *tl,
+                                            const char *msg)
+{
+    stm_become_inevitable(tl, msg);   /* may still abort */
+
+    s_mutex_lock();
+    synchronize_all_threads(STOP_OTHERS_AND_BECOME_GLOBALLY_UNIQUE);
+    s_mutex_unlock();
+}
diff --git a/c8/stm/core.h b/c8/stm/core.h
--- a/c8/stm/core.h
+++ b/c8/stm/core.h
@@ -57,6 +57,7 @@
 
     uint8_t privatization_lock;
 
+    uint8_t safe_point;
     uint8_t transaction_state;
 
     struct tree_s *callbacks_on_commit_and_abort[2];
@@ -71,6 +72,16 @@
 #endif
 };
 
+enum /* safe_point */ {
+    SP_NO_TRANSACTION=0,
+    SP_RUNNING,
+    SP_WAIT_FOR_C_REQUEST_REMOVED,
+    SP_WAIT_FOR_C_AT_SAFE_POINT,
+#ifdef STM_TESTS
+    SP_WAIT_FOR_OTHER_THREAD,
+#endif
+};
+
 enum /* transaction_state */ {
     TS_NONE=0,
     TS_REGULAR,
@@ -119,6 +130,12 @@
 static bool _is_tl_registered(stm_thread_local_t *tl);
 static bool _seems_to_be_running_transaction(void);
 
+static void teardown_core(void);
+static void abort_with_mutex(void) __attribute__((noreturn));
+static stm_thread_local_t *abort_with_mutex_no_longjmp(void);
+static void abort_data_structures_from_segment_num(int segment_num);
+
+
 
 static inline void _duck(void) {
     /* put a call to _duck() between two instructions that set 0 into
diff --git a/c8/stm/nursery.c b/c8/stm/nursery.c
--- a/c8/stm/nursery.c
+++ b/c8/stm/nursery.c
@@ -253,6 +253,8 @@
 {
     assert(!_has_mutex());
 
+    stm_safe_point();
+
     _do_minor_collection(commit);
 }
 
@@ -276,6 +278,7 @@
     STM_SEGMENT->nursery_current -= size_rounded_up;  /* restore correct val */
 
  restart:
+    stm_safe_point();
 
     OPT_ASSERT(size_rounded_up >= 16);
     OPT_ASSERT((size_rounded_up & 7) == 0);
diff --git a/c8/stm/nursery.h b/c8/stm/nursery.h
--- a/c8/stm/nursery.h
+++ b/c8/stm/nursery.h
@@ -1,5 +1,22 @@
+
+#define NSE_SIGPAUSE   _STM_NSE_SIGNAL_MAX
+
 static void minor_collection(bool commit);
 static void check_nursery_at_transaction_start(void);
 static size_t throw_away_nursery(struct stm_priv_segment_info_s *pseg);
 
 static void assert_memset_zero(void *s, size_t n);
+
+
+static inline bool is_abort(uintptr_t nursery_end) {
+    return (nursery_end <= _STM_NSE_SIGNAL_MAX && nursery_end != NSE_SIGPAUSE);
+}
+
+static inline bool is_aborting_now(uint8_t other_segment_num) {
+    return (is_abort(get_segment(other_segment_num)->nursery_end) &&
+            get_priv_segment(other_segment_num)->safe_point != SP_RUNNING);
+}
+
+
+#define must_abort()   is_abort(STM_SEGMENT->nursery_end)
+static object_t *find_shadow(object_t *obj);
diff --git a/c8/stm/sync.c b/c8/stm/sync.c
--- a/c8/stm/sync.c
+++ b/c8/stm/sync.c
@@ -11,6 +11,7 @@
 static union {
     struct {
         pthread_mutex_t global_mutex;
+        pthread_cond_t cond[_C_TOTAL];
         /* some additional pieces of global state follow */
         uint8_t in_use1[NB_SEGMENTS];   /* 1 if running a pthread */
     };
@@ -22,6 +23,12 @@
 {
     if (pthread_mutex_init(&sync_ctl.global_mutex, NULL) != 0)
         stm_fatalerror("mutex initialization: %m");
+
+    long i;
+    for (i = 0; i < _C_TOTAL; i++) {
+        if (pthread_cond_init(&sync_ctl.cond[i], NULL) != 0)
+            stm_fatalerror("cond initialization: %m");
+    }
 }
 
 static void teardown_sync(void)
@@ -29,6 +36,12 @@
     if (pthread_mutex_destroy(&sync_ctl.global_mutex) != 0)
         stm_fatalerror("mutex destroy: %m");
 
+    long i;
+    for (i = 0; i < _C_TOTAL; i++) {
+        if (pthread_cond_destroy(&sync_ctl.cond[i]) != 0)
+            stm_fatalerror("cond destroy: %m");
+    }
+
     memset(&sync_ctl, 0, sizeof(sync_ctl));
 }
 
@@ -62,6 +75,31 @@
     assert((_has_mutex_here = false, 1));
 }
 
+
+static inline void cond_wait(enum cond_type_e ctype)
+{
+#ifdef STM_NO_COND_WAIT
+    stm_fatalerror("*** cond_wait/%d called!", (int)ctype);
+#endif
+
+    assert(_has_mutex_here);
+    if (UNLIKELY(pthread_cond_wait(&sync_ctl.cond[ctype],
+                                   &sync_ctl.global_mutex) != 0))
+        stm_fatalerror("pthread_cond_wait/%d: %m", (int)ctype);
+}
+
+static inline void cond_signal(enum cond_type_e ctype)
+{
+    if (UNLIKELY(pthread_cond_signal(&sync_ctl.cond[ctype]) != 0))
+        stm_fatalerror("pthread_cond_signal/%d: %m", (int)ctype);
+}
+
+static inline void cond_broadcast(enum cond_type_e ctype)
+{
+    if (UNLIKELY(pthread_cond_broadcast(&sync_ctl.cond[ctype]) != 0))
+        stm_fatalerror("pthread_cond_broadcast/%d: %m", (int)ctype);
+}
+
 /************************************************************/
 
 
@@ -147,3 +185,169 @@
 {
     set_gs_register(get_segment_base(segnum));
 }
+
+#if STM_TESTS
+void _stm_start_safe_point(void)
+{
+    assert(STM_PSEGMENT->safe_point == SP_RUNNING);
+    STM_PSEGMENT->safe_point = SP_WAIT_FOR_OTHER_THREAD;
+}
+
+void _stm_stop_safe_point(void)
+{
+    assert(STM_PSEGMENT->safe_point == SP_WAIT_FOR_OTHER_THREAD);
+    STM_PSEGMENT->safe_point = SP_RUNNING;
+
+    stm_safe_point();
+}
+#endif
+
+
+
+/************************************************************/
+
+
+#ifndef NDEBUG
+static bool _safe_points_requested = false;
+#endif
+
+static void signal_everybody_to_pause_running(void)
+{
+    assert(_safe_points_requested == false);
+    assert((_safe_points_requested = true, 1));
+    assert(_has_mutex());
+
+    long i;
+    for (i = 1; i <= NB_SEGMENTS; i++) {
+        if (get_segment(i)->nursery_end == NURSERY_END)
+            get_segment(i)->nursery_end = NSE_SIGPAUSE;
+    }
+    assert(!pause_signalled);
+    pause_signalled = true;
+}
+
+static inline long count_other_threads_sp_running(void)
+{
+    /* Return the number of other threads in SP_RUNNING.
+       Asserts that SP_RUNNING threads still have the NSE_SIGxxx. */
+    long i;
+    long result = 0;
+    int my_num = STM_SEGMENT->segment_num;
+
+    for (i = 1; i <= NB_SEGMENTS; i++) {
+        if (i != my_num && get_priv_segment(i)->safe_point == SP_RUNNING) {
+            assert(get_segment(i)->nursery_end <= _STM_NSE_SIGNAL_MAX);
+            result++;
+        }
+    }
+    return result;
+}
+
+static void remove_requests_for_safe_point(void)
+{
+    assert(pause_signalled);
+    pause_signalled = false;
+    assert(_safe_points_requested == true);
+    assert((_safe_points_requested = false, 1));
+
+    long i;
+    for (i = 1; i <= NB_SEGMENTS; i++) {
+        assert(get_segment(i)->nursery_end != NURSERY_END);
+        if (get_segment(i)->nursery_end == NSE_SIGPAUSE)
+            get_segment(i)->nursery_end = NURSERY_END;
+    }
+    cond_broadcast(C_REQUEST_REMOVED);
+}
+
+static void enter_safe_point_if_requested(void)
+{
+    if (STM_SEGMENT->nursery_end == NURSERY_END)
+        return;    /* fast path: no safe point requested */
+
+    assert(_seems_to_be_running_transaction());
+    assert(_has_mutex());
+    while (1) {
+        if (must_abort())
+            abort_with_mutex();
+
+        if (STM_SEGMENT->nursery_end == NURSERY_END)
+            break;    /* no safe point requested */
+
+        assert(STM_SEGMENT->nursery_end == NSE_SIGPAUSE);
+        assert(pause_signalled);
+
+        /* If we are requested to enter a safe-point, we cannot proceed now.
+           Wait until the safe-point request is removed for us. */
+#ifdef STM_TESTS
+        abort_with_mutex();
+#endif
+        cond_signal(C_AT_SAFE_POINT);
+        STM_PSEGMENT->safe_point = SP_WAIT_FOR_C_REQUEST_REMOVED;
+        cond_wait(C_REQUEST_REMOVED);
+        STM_PSEGMENT->safe_point = SP_RUNNING;
+    }
+}
+
+static void synchronize_all_threads(enum sync_type_e sync_type)
+{
+    enter_safe_point_if_requested();
+
+    /* Only one thread should reach this point concurrently.  This is
+       why: if several threads call this function, the first one that
+       goes past this point will set the "request safe point" on all
+       other threads; then none of the other threads will go past the
+       enter_safe_point_if_requested() above.
+    */
+    if (UNLIKELY(globally_unique_transaction)) {
+        assert(count_other_threads_sp_running() == 0);
+        return;
+    }
+
+    signal_everybody_to_pause_running();
+
+    /* If some other threads are SP_RUNNING, we cannot proceed now.
+       Wait until all other threads are suspended. */
+    while (count_other_threads_sp_running() > 0) {
+        STM_PSEGMENT->safe_point = SP_WAIT_FOR_C_AT_SAFE_POINT;
+        cond_wait(C_AT_SAFE_POINT);
+        STM_PSEGMENT->safe_point = SP_RUNNING;
+
+        if (must_abort()) {
+            remove_requests_for_safe_point();    /* => C_REQUEST_REMOVED */
+            abort_with_mutex();
+        }
+    }
+
+    if (UNLIKELY(sync_type == STOP_OTHERS_AND_BECOME_GLOBALLY_UNIQUE)) {
+        globally_unique_transaction = true;
+        assert(STM_SEGMENT->nursery_end == NSE_SIGPAUSE);
+        STM_SEGMENT->nursery_end = NURSERY_END;
+        return;  /* don't remove the requests for safe-points in this case */
+    }
+
+    /* Remove the requests for safe-points now.  In principle we should
+       remove it later, when the caller is done, but this is equivalent
+       as long as we hold the mutex.
+    */
+    remove_requests_for_safe_point();    /* => C_REQUEST_REMOVED */
+}
+
+static void committed_globally_unique_transaction(void)
+{
+    assert(globally_unique_transaction);
+    assert(STM_SEGMENT->nursery_end == NURSERY_END);
+    STM_SEGMENT->nursery_end = NSE_SIGPAUSE;
+    globally_unique_transaction = false;
+    remove_requests_for_safe_point();
+}
+
+void _stm_collectable_safe_point(void)
+{
+    /* If 'nursery_end' was set to NSE_SIGxxx by another thread,
+       we end up here as soon as we try to call stm_allocate() or do
+       a call to stm_safe_point().
+    */
+    s_mutex_lock();
+    enter_safe_point_if_requested();
+    s_mutex_unlock();
+}
diff --git a/c8/stm/sync.h b/c8/stm/sync.h
--- a/c8/stm/sync.h
+++ b/c8/stm/sync.h
@@ -1,9 +1,17 @@
 static void setup_sync(void);
 static void teardown_sync(void);
 
+enum cond_type_e {
+    C_AT_SAFE_POINT,
+    C_REQUEST_REMOVED,
+    _C_TOTAL
+};
 
 static void s_mutex_lock(void);
 static void s_mutex_unlock(void);
+static void cond_wait(enum cond_type_e);
+static void cond_signal(enum cond_type_e);
+static void cond_broadcast(enum cond_type_e);
 #ifndef NDEBUG
 static bool _has_mutex(void);
 #endif
@@ -14,3 +22,13 @@
    (must have the mutex acquired!) */
 static bool acquire_thread_segment(stm_thread_local_t *tl);
 static void release_thread_segment(stm_thread_local_t *tl);
+
+enum sync_type_e {
+    STOP_OTHERS_UNTIL_MUTEX_UNLOCK,
+    STOP_OTHERS_AND_BECOME_GLOBALLY_UNIQUE,
+};
+static void synchronize_all_threads(enum sync_type_e sync_type);
+static void committed_globally_unique_transaction(void);
+
+static bool pause_signalled, globally_unique_transaction;
+static void enter_safe_point_if_requested(void);
diff --git a/c8/stmgc.h b/c8/stmgc.h
--- a/c8/stmgc.h
+++ b/c8/stmgc.h
@@ -59,11 +59,13 @@
 
 #define _STM_GCFLAG_WRITE_BARRIER      0x01
 #define _STM_FAST_ALLOC           (66*1024)
+#define _STM_NSE_SIGNAL_MAX               1
 
 void _stm_write_slowpath(object_t *);
 object_t *_stm_allocate_slowpath(ssize_t);
 object_t *_stm_allocate_external(ssize_t);
 void _stm_become_inevitable(const char*);
+void _stm_collectable_safe_point();
 
 object_t *_stm_allocate_old(ssize_t size_rounded_up);
 char *_stm_real_address(object_t *o);
@@ -78,6 +80,9 @@
 void _stm_test_switch_segment(int segnum);
 void _push_obj_to_other_segments(object_t *obj);
 
+void _stm_start_safe_point(void);
+void _stm_stop_safe_point(void);
+
 char *_stm_get_segment_base(long index);
 bool _stm_in_transaction(stm_thread_local_t *tl);
 void _stm_set_nursery_free_count(uint64_t free_count);
@@ -200,6 +205,10 @@
 long stm_call_on_abort(stm_thread_local_t *, void *key, void callback(void *));
 long stm_call_on_commit(stm_thread_local_t *, void *key, void callback(void *));
 
+static inline void stm_safe_point(void) {
+    if (STM_SEGMENT->nursery_end <= _STM_NSE_SIGNAL_MAX)
+        _stm_collectable_safe_point();
+}
 
 
 #ifdef STM_NO_AUTOMATIC_SETJMP
@@ -216,6 +225,7 @@
         _stm_become_inevitable(msg);
 }
 
+void stm_become_globally_unique_transaction(stm_thread_local_t *tl, const char *msg);
 
 /* ==================== END ==================== */
 
diff --git a/c8/test/support.py b/c8/test/support.py
--- a/c8/test/support.py
+++ b/c8/test/support.py
@@ -45,6 +45,8 @@
 bool _check_stm_validate();
 
 object_t *stm_setup_prebuilt(object_t *);
+void _stm_start_safe_point(void);
+bool _check_stop_safe_point(void);
 
 
 bool _checked_stm_write(object_t *obj);
@@ -65,6 +67,7 @@
 bool _check_commit_transaction(void);
 bool _check_abort_transaction(void);
 bool _check_become_inevitable(stm_thread_local_t *tl);
+bool _check_become_globally_unique_transaction(stm_thread_local_t *tl);
 int stm_is_inevitable(void);
 
 void _set_type_id(object_t *obj, uint32_t h);
@@ -152,6 +155,10 @@
     CHECKED(stm_commit_transaction());
 }
 
+bool _check_stop_safe_point(void) {
+    CHECKED(_stm_stop_safe_point());
+}
+
 bool _check_abort_transaction(void) {
     CHECKED(stm_abort_transaction());
 }
@@ -160,6 +167,10 @@
     CHECKED(stm_become_inevitable(tl, "TEST"));
 }
 
+bool _check_become_globally_unique_transaction(stm_thread_local_t *tl) {
+    CHECKED(stm_become_globally_unique_transaction(tl, "TESTGUT"));
+}
+
 bool _check_stm_validate(void) {
     CHECKED(stm_validate(NULL));
 }
@@ -491,13 +502,17 @@
 
     def switch(self, thread_num):
         assert thread_num != self.current_thread
+        tl = self.tls[self.current_thread]
+        if lib._stm_in_transaction(tl):
+            stm_start_safe_point()
         #
         self.current_thread = thread_num
         tl2 = self.tls[thread_num]
         #
         if lib._stm_in_transaction(tl2):
             lib._stm_test_switch(tl2)
-            stm_validate() # can raise
+            stm_stop_safe_point() # can raise Conflict
+            stm_validate() # can raise Conflict
 
     def switch_to_segment(self, seg_num):
         lib._stm_test_switch_segment(seg_num)
@@ -542,3 +557,8 @@
         tl = self.tls[self.current_thread]
         if lib._check_become_inevitable(tl):
             raise Conflict()
+
+    def become_globally_unique_transaction(self):
+        tl = self.tls[self.current_thread]
+        if lib._check_become_globally_unique_transaction(tl):
+            raise Conflict()


More information about the pypy-commit mailing list