[pypy-commit] stmgc c7-refactor: Mutex and condition variable: the winning combo for writing reasonable

arigo noreply at buildbot.pypy.org
Sat Feb 15 21:10:09 CET 2014


Author: Armin Rigo <arigo at tunes.org>
Branch: c7-refactor
Changeset: r740:386522880dd5
Date: 2014-02-15 21:09 +0100
http://bitbucket.org/pypy/stmgc/changeset/386522880dd5/

Log:	Mutex and condition variable: the winning combo for writing
	reasonable code. I keep rediscovering how they are actually a good
	idea. Maybe later we'll figure out that we need more control to
	avoid spurious wake-ups, but I think that with low numbers of
	threads it's fine.

diff --git a/c7/stm/core.c b/c7/stm/core.c
--- a/c7/stm/core.c
+++ b/c7/stm/core.c
@@ -28,22 +28,51 @@
     other_pseg = get_priv_segment(current_lock_owner - 1);
     assert(other_pseg->write_lock_num == current_lock_owner);
 
-    if ((STM_PSEGMENT->approximate_start_time <
-            other_pseg->approximate_start_time) || is_inevitable()) {
-        /* we are the thread that must succeed */
-        XXX  /* don't go here if the other thread is inevitable! */
-        ...                           
-        other_pseg->need_abort = 1;
-        _stm_start_safe_point(0);
-        /* XXX: not good, maybe should be signalled by other thread */
-        usleep(1);
-        _stm_stop_safe_point(0);
-        /* done, will retry */
+    /* note: other_pseg is currently running a transaction, and it cannot
+       commit or abort unexpectedly, because to do that it would need to
+       suspend us.  So the reading of other_pseg->start_time and
+       other_pseg->transaction_state is stable, with one exception: the
+       'transaction_state' can go from TS_REGULAR to TS_INEVITABLE under
+       our feet. */
+    if (STM_PSEGMENT->transaction_state == TS_INEVITABLE) {
+        /* I'm inevitable, so the other is not. */
+        assert(other_pseg->transaction_state != TS_INEVITABLE);
+        other_pseg->transaction_state = TS_MUST_ABORT;
+    }
+    else if (STM_PSEGMENT->start_time >= other_pseg->start_time) {
+        /* The other thread started before us, so I should abort, as I'm
+           the least long-running transaction. */
     }
     else {
-        /* we are the thread that must abort */
+        /* The other thread started strictly after us.  We try to tell
+           it to abort, using compare_and_swap().  This fails if its
+           'transaction_state' is already TS_INEVITABLE. */
+        __sync_bool_compare_and_swap(
+                    &other_pseg->transaction_state, TS_REGULAR, TS_MUST_ABORT);
+    }
+
+    if (other_pseg->transaction_state != TS_MUST_ABORT) {
+        /* if the other thread is not in aborting-soon mode, then we must
+           abort. */
         stm_abort_transaction();
     }
+    else {
+        /* otherwise, we will issue a safe point and wait: */
+        mutex_lock();
+        STM_PSEGMENT->safe_point = SP_SAFE_POINT;
+
+        /* signal the other thread; it must abort */
+        cond_broadcast();
+
+        /* then wait, hopefully until the other thread broadcasts "I'm
+           done aborting" (spurious wake-ups are ok) */
+        cond_wait();
+
+        /* now we return into _stm_write_slowpath() and will try again
+           to acquire the write lock on our object. */
+        STM_PSEGMENT->safe_point = SP_RUNNING;
+        mutex_unlock();
+    }
 }
 
 
@@ -127,6 +156,8 @@
     /* GS invalid before this point! */
     acquire_thread_segment(tl);
 
+    assert(STM_SEGMENT->activity == ACT_NOT_RUNNING);
+    STM_SEGMENT->activity = jmpbuf != NULL ? ACT_REGULAR : ACT_INEVITABLE;
     STM_SEGMENT->jmpbuf_ptr = jmpbuf;
 
     uint8_t old_rv = STM_SEGMENT->transaction_read_version;
@@ -179,6 +210,8 @@
 {
     stm_thread_local_t *tl = STM_SEGMENT->running_thread;
 
+    assert(STM_SEGMENT->activity != ACT_NOT_RUNNING);
+
     /* cannot abort any more */
     STM_SEGMENT->jmpbuf_ptr = NULL;
 
@@ -187,6 +220,8 @@
     /* copy modified object versions to other threads */
     push_modified_to_other_threads();
 
+    STM_SEGMENT->activity = ACT_NOT_RUNNING;
+
     release_thread_segment(tl);
     reset_all_creation_markers();
 }
diff --git a/c7/stm/core.h b/c7/stm/core.h
--- a/c7/stm/core.h
+++ b/c7/stm/core.h
@@ -56,9 +56,23 @@
     struct list_s *old_objects_to_trace;
     struct list_s *modified_objects;
     struct list_s *creation_markers;
-    uint64_t approximate_start_time;
+    uint64_t start_time;
     uint8_t write_lock_num;
-    uint8_t need_abort;
+    uint8_t safe_point;         /* one of the SP_xxx constants */
+    uint8_t transaction_state;  /* one of the TS_xxx constants */
+};
+
+enum {
+    SP_OUTSIDE=0,
+    SP_RUNNING,
+    SP_SAFE_POINT,
+    SP_SAFE_POINT_CAN_COLLECT,
+};
+enum {
+    TS_NONE=0,
+    TS_REGULAR,
+    TS_INEVITABLE,
+    TS_MUST_ABORT,
 };
 
 static char *stm_object_pages;
@@ -94,8 +108,4 @@
     return ((stm_creation_marker_t *)(((uintptr_t)obj) >> 8))->cm != 0;
 }
 
-static inline bool is_inevitable(void) {
-    return STM_SEGMENT->jmpbuf_ptr == NULL;
-}
-
 static void teardown_core(void);
diff --git a/c7/stm/sync.c b/c7/stm/sync.c
--- a/c7/stm/sync.c
+++ b/c7/stm/sync.c
@@ -1,35 +1,52 @@
-#include <semaphore.h>
+#include <pthread.h>
 #include <sys/syscall.h>
 #include <sys/prctl.h>
 #include <asm/prctl.h>
 
 
+/* XXX Getting the most efficient locks is hard, but the following
+   simplification is probably good enough for small numbers of threads:
+   when a thread wants to check or change any global state (e.g. start
+   running a transaction, etc.), it acquires this single mutex.  If
+   additionally it wants to wait until the global state is changed by
+   someone else, it waits on the condition variable.  This should be
+   all we need for synchronization.
+
+   Maybe look at https://github.com/neosmart/pevents for how they do
+   WaitForMultipleObjects().
+*/
+
+
 static union {
     struct {
-        sem_t semaphore;
+        pthread_mutex_t global_mutex;
+        pthread_cond_t global_cond;
+        /* some additional pieces of global state follow */
         uint8_t in_use[NB_SEGMENTS + 1];   /* 1 if running a pthread */
-        uint64_t global_time;     /* approximate */
+        uint64_t global_time;
     };
-    char reserved[64];
-} segments_ctl __attribute__((aligned(64)));
+    char reserved[128];
+} sync_ctl __attribute__((aligned(64)));
 
 
 static void setup_sync(void)
 {
-    memset(segments_ctl.in_use, 0, sizeof(segments_ctl.in_use));
-    segments_ctl.in_use[NB_SEGMENTS] = 0xff;
-    if (sem_init(&segments_ctl.semaphore, 0, NB_SEGMENTS) != 0) {
-        perror("sem_init");
+    if (pthread_mutex_init(&sync_ctl.global_mutex, NULL) != 0 ||
+         pthread_cond_init(&sync_ctl.global_cond, NULL) != 0) {
+        perror("mutex/cond initialization");
         abort();
     }
+    sync_ctl.in_use[NB_SEGMENTS] = 0xff;
 }
 
 static void teardown_sync(void)
 {
-    if (sem_destroy(&segments_ctl.semaphore) != 0) {
-        perror("sem_destroy");
+    if (pthread_mutex_destroy(&sync_ctl.global_mutex) != 0 ||
+         pthread_cond_destroy(&sync_ctl.global_cond) != 0) {
+        perror("mutex/cond destroy");
         abort();
     }
+    memset(sync_ctl, 0, sizeof(sync_ctl.in_use));
 }
 
 static void set_gs_register(char *value)
@@ -40,49 +57,92 @@
     }
 }
 
+static inline void mutex_lock(void)
+{
+    if (UNLIKELY(pthread_mutex_lock(&sync_ctl.global_mutex) != 0)) {
+        perror("pthread_mutex_lock");
+        abort();
+    }
+}
+
+static inline void mutex_unlock(void)
+{
+    if (UNLIKELY(pthread_mutex_unlock(&sync_ctl.global_mutex) != 0)) {
+        perror("pthread_mutex_unlock");
+        abort();
+    }
+}
+
+static inline void assert_has_mutex(void)
+{
+    assert(pthread_mutex_trylock(&sync_ctl.global_mutex) == EBUSY);
+}
+
+static inline void cond_wait(void)
+{
+    if (UNLIKELY(pthread_cond_wait(&sync_ctl.global_cond,
+                                   &sync_ctl.global_mutex) != 0)) {
+        perror("pthread_cond_wait");
+        abort();
+    }
+}
+
+static inline void cond_broadcast(void)
+{
+    if (UNLIKELY(pthread_cond_broadcast(&sync_ctl.global_cond) != 0)) {
+        perror("pthread_cond_broadcast");
+        abort();
+    }
+}
+
 static void acquire_thread_segment(stm_thread_local_t *tl)
 {
     /* This function acquires a segment for the currently running thread,
        and set up the GS register if it changed. */
-    while (sem_wait(&segments_ctl.semaphore) != 0) {
-        if (errno != EINTR) {
-            perror("sem_wait");
-            abort();
+    assert_has_mutex();
+    assert(_is_tl_registered(tl));
+
+ retry:
+    int num = tl->associated_segment_num;
+    if (sync_ctl.in_use[num] == 0) {
+        /* fast-path: we can get the same segment number than the one
+           we had before.  The value stored in GS is still valid. */
+        goto got_num;
+    }
+    /* Look for the next free segment.  If there is none, wait for
+       the condition variable. */
+    int i;
+    for (i = 0; i < NB_SEGMENTS; i++) {
+        num = (num + 1) % NB_SEGMENTS;
+        if (sync_ctl.in_use[num] == 0) {
+            /* we're getting 'num', a different number. */
+            tl->associated_segment_num = num;
+            set_gs_register(get_segment_base(num));
+            goto got_num;
         }
     }
-    assert(_is_tl_registered(tl));
-    int num = tl->associated_segment_num;
-    if (__sync_lock_test_and_set(&segments_ctl.in_use[num], 1) == 0) {
-        /* fast-path: reacquired the same segment number than the one
-           we had before.  The value stored in GS is still valid. */
-        goto exit;
-    }
-    /* Look for the next free segment.  There must be one, because we
-       acquired the semaphore above. */
-    while (1) {
-        num = (num + 1) % NB_SEGMENTS;
-        if (__sync_lock_test_and_set(&segments_ctl.in_use[num], 1) == 0)
-            break;
-    }
-    tl->associated_segment_num = num;
-    set_gs_register(get_segment_base(num));
+    /* Wait and retry */
+    cond_wait();
+    goto retry;
 
- exit:
+ got_num:
+    sync_ctl.in_use[num] = 1;
     assert(STM_SEGMENT->running_thread == NULL);
     STM_SEGMENT->running_thread = tl;
-
-    /* global_time is approximate -> no synchronization required */
-    STM_PSEGMENT->approximate_start_time = ++segments_ctl.global_time;
+    STM_PSEGMENT->start_time = ++segments_ctl.global_time;
 }
 
 static void release_thread_segment(stm_thread_local_t *tl)
 {
+    assert_has_mutex();
+
     assert(STM_SEGMENT->running_thread == tl);
     STM_SEGMENT->running_thread = NULL;
 
-    int num = tl->associated_segment_num;
-    __sync_lock_release(&segments_ctl.in_use[num]);
-    sem_post(&segments_ctl.semaphore);
+    assert(sync_ctl.in_use[tl->associated_segment_num] == 1);
+    sync_ctl.in_use[tl->associated_segment_num] = 0;
+
+    cond_broadcast();
 }
 
 static bool _running_transaction(void)
diff --git a/c7/stm/sync.h b/c7/stm/sync.h
--- a/c7/stm/sync.h
+++ b/c7/stm/sync.h
@@ -3,6 +3,13 @@
 static void setup_sync(void);
 static void teardown_sync(void);
 
-/* acquire and release one of the segments for running the given thread */
+/* all synchronization is done via a mutex and condition variable */
+static void mutex_lock(void);
+static void mutex_unlock(void);
+static void cond_wait(void);
+static void cond_broadcast(void);
+
+/* acquire and release one of the segments for running the given thread
+   (must have the mutex acquired!) */
 static void acquire_thread_segment(stm_thread_local_t *tl);
 static void release_thread_segment(stm_thread_local_t *tl);
diff --git a/c7/stmgc.h b/c7/stmgc.h
--- a/c7/stmgc.h
+++ b/c7/stmgc.h
@@ -207,17 +207,19 @@
 /* Starting and ending transactions.  You should only call stm_read(),
    stm_write() and stm_allocate() from within a transaction.  Use
    the macro STM_START_TRANSACTION() to start a transaction that
-   can be restarted using the 'jmpbuf' (a pointer to a local variable
-   of type stm_jmpbuf_t). */
+   can be restarted using the 'jmpbuf' (a local variable of type
+   stm_jmpbuf_t). */
 #define STM_START_TRANSACTION(tl, jmpbuf)  ({           \
-    int _restart = __builtin_setjmp(jmpbuf);            \
-    _stm_start_transaction(tl, jmpbuf);                 \
+    int _restart = __builtin_setjmp(&jmpbuf);           \
+    _stm_start_transaction(tl, &jmpbuf);                \
    _restart;                                            \
 })
 
 /* Start an inevitable transaction, if it's going to return from the
    current function immediately. */
-void stm_start_inevitable_transaction(stm_thread_local_t *tl);
+static inline void stm_start_inevitable_transaction(stm_thread_local_t *tl) {
+    _stm_start_transaction(tl, NULL);
+}
 
 /* Commit a transaction. */
 void stm_commit_transaction(void);


More information about the pypy-commit mailing list