[pypy-commit] stmgc c7-refactor: Mutex and condition variable: the winning combo for writing reasonable
arigo
noreply at buildbot.pypy.org
Sat Feb 15 21:10:09 CET 2014
Author: Armin Rigo <arigo at tunes.org>
Branch: c7-refactor
Changeset: r740:386522880dd5
Date: 2014-02-15 21:09 +0100
http://bitbucket.org/pypy/stmgc/changeset/386522880dd5/
Log: Mutex and condition variable: the winning combo for writing
reasonable code. I keep rediscovering how they are actually a good
idea. Maybe later we'll figure out that we need more control to
avoid spurious wake-ups, but I think that with low numbers of
threads it's fine.
diff --git a/c7/stm/core.c b/c7/stm/core.c
--- a/c7/stm/core.c
+++ b/c7/stm/core.c
@@ -28,22 +28,51 @@
other_pseg = get_priv_segment(current_lock_owner - 1);
assert(other_pseg->write_lock_num == current_lock_owner);
- if ((STM_PSEGMENT->approximate_start_time <
- other_pseg->approximate_start_time) || is_inevitable()) {
- /* we are the thread that must succeed */
- XXX /* don't go here if the other thread is inevitable! */
- ...
- other_pseg->need_abort = 1;
- _stm_start_safe_point(0);
- /* XXX: not good, maybe should be signalled by other thread */
- usleep(1);
- _stm_stop_safe_point(0);
- /* done, will retry */
+ /* note: other_pseg is currently running a transaction, and it cannot
+ commit or abort unexpectedly, because to do that it would need to
+ suspend us. So the reading of other_pseg->start_time and
+ other_pseg->transaction_state is stable, with one exception: the
+ 'transaction_state' can go from TS_REGULAR to TS_INEVITABLE under
+ our feet. */
+ if (STM_PSEGMENT->transaction_state == TS_INEVITABLE) {
+ /* I'm inevitable, so the other is not. */
+ assert(other_pseg->transaction_state != TS_INEVITABLE);
+ other_pseg->transaction_state = TS_MUST_ABORT;
+ }
+ else if (STM_PSEGMENT->start_time >= other_pseg->start_time) {
+ /* The other thread started before us, so I should abort, as I'm
+ the least long-running transaction. */
}
else {
- /* we are the thread that must abort */
+ /* The other thread started strictly after us. We try to tell
+ it to abort, using compare_and_swap(). This fails if its
+ 'transaction_state' is already TS_INEVITABLE. */
+ __sync_bool_compare_and_swap(
+ &other_pseg->transaction_state, TS_REGULAR, TS_MUST_ABORT);
+ }
+
+ if (other_pseg->transaction_state != TS_MUST_ABORT) {
+ /* if the other thread is not in aborting-soon mode, then we must
+ abort. */
stm_abort_transaction();
}
+ else {
+ /* otherwise, we will issue a safe point and wait: */
+ mutex_lock();
+ STM_PSEGMENT->safe_point = SP_SAFE_POINT;
+
+ /* signal the other thread; it must abort */
+ cond_broadcast();
+
+ /* then wait, hopefully until the other thread broadcasts "I'm
+ done aborting" (spurious wake-ups are ok) */
+ cond_wait();
+
+ /* now we return into _stm_write_slowpath() and will try again
+ to acquire the write lock on our object. */
+ STM_PSEGMENT->safe_point = SP_RUNNING;
+ mutex_unlock();
+ }
}
@@ -127,6 +156,8 @@
/* GS invalid before this point! */
acquire_thread_segment(tl);
+ assert(STM_SEGMENT->activity == ACT_NOT_RUNNING);
+ STM_SEGMENT->activity = jmpbuf != NULL ? ACT_REGULAR : ACT_INEVITABLE;
STM_SEGMENT->jmpbuf_ptr = jmpbuf;
uint8_t old_rv = STM_SEGMENT->transaction_read_version;
@@ -179,6 +210,8 @@
{
stm_thread_local_t *tl = STM_SEGMENT->running_thread;
+ assert(STM_SEGMENT->activity != ACT_NOT_RUNNING);
+
/* cannot abort any more */
STM_SEGMENT->jmpbuf_ptr = NULL;
@@ -187,6 +220,8 @@
/* copy modified object versions to other threads */
push_modified_to_other_threads();
+ STM_SEGMENT->activity = ACT_NOT_RUNNING;
+
release_thread_segment(tl);
reset_all_creation_markers();
}
diff --git a/c7/stm/core.h b/c7/stm/core.h
--- a/c7/stm/core.h
+++ b/c7/stm/core.h
@@ -56,9 +56,23 @@
struct list_s *old_objects_to_trace;
struct list_s *modified_objects;
struct list_s *creation_markers;
- uint64_t approximate_start_time;
+ uint64_t start_time;
uint8_t write_lock_num;
- uint8_t need_abort;
+ uint8_t safe_point; /* one of the SP_xxx constants */
+ uint8_t transaction_state; /* one of the TS_xxx constants */
+};
+
+enum {
+ SP_OUTSIDE=0,
+ SP_RUNNING,
+ SP_SAFE_POINT,
+ SP_SAFE_POINT_CAN_COLLECT,
+};
+enum {
+ TS_NONE=0,
+ TS_REGULAR,
+ TS_INEVITABLE,
+ TS_MUST_ABORT,
};
static char *stm_object_pages;
@@ -94,8 +108,4 @@
return ((stm_creation_marker_t *)(((uintptr_t)obj) >> 8))->cm != 0;
}
-static inline bool is_inevitable(void) {
- return STM_SEGMENT->jmpbuf_ptr == NULL;
-}
-
static void teardown_core(void);
diff --git a/c7/stm/sync.c b/c7/stm/sync.c
--- a/c7/stm/sync.c
+++ b/c7/stm/sync.c
@@ -1,35 +1,52 @@
-#include <semaphore.h>
+#include <pthread.h>
#include <sys/syscall.h>
#include <sys/prctl.h>
#include <asm/prctl.h>
+/* XXX Getting the most efficient locks is hard, but the following
+ simplification is probably good enough for small numbers of threads:
+ when a thread wants to check or change any global state (e.g. start
+ running a transaction, etc.), it acquires this single mutex. If
+ additionally it wants to wait until the global state is changed by
+ someone else, it waits on the condition variable. This should be
+ all we need for synchronization.
+
+ Maybe look at https://github.com/neosmart/pevents for how they do
+ WaitForMultipleObjects().
+*/
+
+
static union {
struct {
- sem_t semaphore;
+ pthread_mutex_t global_mutex;
+ pthread_cond_t global_cond;
+ /* some additional pieces of global state follow */
uint8_t in_use[NB_SEGMENTS + 1]; /* 1 if running a pthread */
- uint64_t global_time; /* approximate */
+ uint64_t global_time;
};
- char reserved[64];
-} segments_ctl __attribute__((aligned(64)));
+ char reserved[128];
+} sync_ctl __attribute__((aligned(64)));
static void setup_sync(void)
{
- memset(segments_ctl.in_use, 0, sizeof(segments_ctl.in_use));
- segments_ctl.in_use[NB_SEGMENTS] = 0xff;
- if (sem_init(&segments_ctl.semaphore, 0, NB_SEGMENTS) != 0) {
- perror("sem_init");
+ if (pthread_mutex_init(&sync_ctl.global_mutex, NULL) != 0 ||
+ pthread_cond_init(&sync_ctl.global_cond, NULL) != 0) {
+ perror("mutex/cond initialization");
abort();
}
+ sync_ctl.in_use[NB_SEGMENTS] = 0xff;
}
static void teardown_sync(void)
{
- if (sem_destroy(&segments_ctl.semaphore) != 0) {
- perror("sem_destroy");
+ if (pthread_mutex_destroy(&sync_ctl.global_mutex) != 0 ||
+ pthread_cond_destroy(&sync_ctl.global_cond) != 0) {
+ perror("mutex/cond destroy");
abort();
}
+ memset(sync_ctl, 0, sizeof(sync_ctl.in_use));
}
static void set_gs_register(char *value)
@@ -40,49 +57,92 @@
}
}
+static inline void mutex_lock(void)
+{
+ if (UNLIKELY(pthread_mutex_lock(&sync_ctl.global_mutex) != 0)) {
+ perror("pthread_mutex_lock");
+ abort();
+ }
+}
+
+static inline void mutex_unlock(void)
+{
+ if (UNLIKELY(pthread_mutex_unlock(&sync_ctl.global_mutex) != 0)) {
+ perror("pthread_mutex_unlock");
+ abort();
+ }
+}
+
+static inline void assert_has_mutex(void)
+{
+ assert(pthread_mutex_trylock(&sync_ctl.global_mutex) == EBUSY);
+}
+
+static inline void cond_wait(void)
+{
+ if (UNLIKELY(pthread_cond_wait(&sync_ctl.global_cond,
+ &sync_ctl.global_mutex) != 0)) {
+ perror("pthread_cond_wait");
+ abort();
+ }
+}
+
+static inline void cond_broadcast(void)
+{
+ if (UNLIKELY(pthread_cond_broadcast(&sync_ctl.global_cond) != 0)) {
+ perror("pthread_cond_broadcast");
+ abort();
+ }
+}
+
static void acquire_thread_segment(stm_thread_local_t *tl)
{
/* This function acquires a segment for the currently running thread,
and set up the GS register if it changed. */
- while (sem_wait(&segments_ctl.semaphore) != 0) {
- if (errno != EINTR) {
- perror("sem_wait");
- abort();
+ assert_has_mutex();
+ assert(_is_tl_registered(tl));
+
+ retry:
+ int num = tl->associated_segment_num;
+ if (sync_ctl.in_use[num] == 0) {
+ /* fast-path: we can get the same segment number than the one
+ we had before. The value stored in GS is still valid. */
+ goto got_num;
+ }
+ /* Look for the next free segment. If there is none, wait for
+ the condition variable. */
+ int i;
+ for (i = 0; i < NB_SEGMENTS; i++) {
+ num = (num + 1) % NB_SEGMENTS;
+ if (sync_ctl.in_use[num] == 0) {
+ /* we're getting 'num', a different number. */
+ tl->associated_segment_num = num;
+ set_gs_register(get_segment_base(num));
+ goto got_num;
}
}
- assert(_is_tl_registered(tl));
- int num = tl->associated_segment_num;
- if (__sync_lock_test_and_set(&segments_ctl.in_use[num], 1) == 0) {
- /* fast-path: reacquired the same segment number than the one
- we had before. The value stored in GS is still valid. */
- goto exit;
- }
- /* Look for the next free segment. There must be one, because we
- acquired the semaphore above. */
- while (1) {
- num = (num + 1) % NB_SEGMENTS;
- if (__sync_lock_test_and_set(&segments_ctl.in_use[num], 1) == 0)
- break;
- }
- tl->associated_segment_num = num;
- set_gs_register(get_segment_base(num));
+ /* Wait and retry */
+ cond_wait();
+ goto retry;
- exit:
+ got_num:
+ sync_ctl.in_use[num] = 1;
assert(STM_SEGMENT->running_thread == NULL);
STM_SEGMENT->running_thread = tl;
-
- /* global_time is approximate -> no synchronization required */
- STM_PSEGMENT->approximate_start_time = ++segments_ctl.global_time;
+ STM_PSEGMENT->start_time = ++segments_ctl.global_time;
}
static void release_thread_segment(stm_thread_local_t *tl)
{
+ assert_has_mutex();
+
assert(STM_SEGMENT->running_thread == tl);
STM_SEGMENT->running_thread = NULL;
- int num = tl->associated_segment_num;
- __sync_lock_release(&segments_ctl.in_use[num]);
- sem_post(&segments_ctl.semaphore);
+ assert(sync_ctl.in_use[tl->associated_segment_num] == 1);
+ sync_ctl.in_use[tl->associated_segment_num] = 0;
+
+ cond_broadcast();
}
static bool _running_transaction(void)
diff --git a/c7/stm/sync.h b/c7/stm/sync.h
--- a/c7/stm/sync.h
+++ b/c7/stm/sync.h
@@ -3,6 +3,13 @@
static void setup_sync(void);
static void teardown_sync(void);
-/* acquire and release one of the segments for running the given thread */
+/* all synchronization is done via a mutex and condition variable */
+static void mutex_lock(void);
+static void mutex_unlock(void);
+static void cond_wait(void);
+static void cond_broadcast(void);
+
+/* acquire and release one of the segments for running the given thread
+ (must have the mutex acquired!) */
static void acquire_thread_segment(stm_thread_local_t *tl);
static void release_thread_segment(stm_thread_local_t *tl);
diff --git a/c7/stmgc.h b/c7/stmgc.h
--- a/c7/stmgc.h
+++ b/c7/stmgc.h
@@ -207,17 +207,19 @@
/* Starting and ending transactions. You should only call stm_read(),
stm_write() and stm_allocate() from within a transaction. Use
the macro STM_START_TRANSACTION() to start a transaction that
- can be restarted using the 'jmpbuf' (a pointer to a local variable
- of type stm_jmpbuf_t). */
+ can be restarted using the 'jmpbuf' (a local variable of type
+ stm_jmpbuf_t). */
#define STM_START_TRANSACTION(tl, jmpbuf) ({ \
- int _restart = __builtin_setjmp(jmpbuf); \
- _stm_start_transaction(tl, jmpbuf); \
+ int _restart = __builtin_setjmp(&jmpbuf); \
+ _stm_start_transaction(tl, &jmpbuf); \
_restart; \
})
/* Start an inevitable transaction, if it's going to return from the
current function immediately. */
-void stm_start_inevitable_transaction(stm_thread_local_t *tl);
+static inline void stm_start_inevitable_transaction(stm_thread_local_t *tl) {
+ _stm_start_transaction(tl, NULL);
+}
/* Commit a transaction. */
void stm_commit_transaction(void);
More information about the pypy-commit
mailing list