[pypy-commit] stmgc default: add safe-point mechanism again & globally_unique_transaction
Raemi
noreply at buildbot.pypy.org
Tue Sep 9 14:06:00 CEST 2014
Author: Remi Meier <remi.meier at inf.ethz.ch>
Branch:
Changeset: r1379:06a773048b9a
Date: 2014-09-09 14:07 +0200
http://bitbucket.org/pypy/stmgc/changeset/06a773048b9a/
Log: add safe-point mechanism again & globally_unique_transaction
diff --git a/c8/stm/core.c b/c8/stm/core.c
--- a/c8/stm/core.c
+++ b/c8/stm/core.c
@@ -74,6 +74,11 @@
most current one and apply all changes done
by other transactions. Abort if we read one of
the committed objs. */
+ if (STM_PSEGMENT->transaction_state == TS_INEVITABLE) {
+ assert((uintptr_t)STM_PSEGMENT->last_commit_log_entry->next == -1);
+ return;
+ }
+
volatile struct stm_commit_log_entry_s *cl, *prev_cl;
cl = prev_cl = (volatile struct stm_commit_log_entry_s *)
STM_PSEGMENT->last_commit_log_entry;
@@ -83,6 +88,9 @@
while ((cl = cl->next)) {
if ((uintptr_t)cl == -1) {
/* there is an inevitable transaction running */
+#if STM_TESTS
+ stm_abort_transaction();
+#endif
cl = prev_cl;
usleep(1); /* XXX */
continue;
@@ -306,13 +314,16 @@
goto retry;
/* GS invalid before this point! */
+ assert(STM_PSEGMENT->safe_point == SP_NO_TRANSACTION);
assert(STM_PSEGMENT->transaction_state == TS_NONE);
STM_PSEGMENT->transaction_state = TS_REGULAR;
+ STM_PSEGMENT->safe_point = SP_RUNNING;
#ifndef NDEBUG
STM_PSEGMENT->running_pthread = pthread_self();
#endif
STM_PSEGMENT->shadowstack_at_start_of_transaction = tl->shadowstack;
+ enter_safe_point_if_requested();
dprintf(("start_transaction\n"));
s_mutex_unlock();
@@ -372,6 +383,7 @@
{
stm_thread_local_t *tl = STM_SEGMENT->running_thread;
+ STM_PSEGMENT->safe_point = SP_NO_TRANSACTION;
STM_PSEGMENT->transaction_state = TS_NONE;
list_clear(STM_PSEGMENT->objects_pointing_to_nursery);
@@ -382,6 +394,7 @@
void stm_commit_transaction(void)
{
assert(!_has_mutex());
+ assert(STM_PSEGMENT->safe_point == SP_RUNNING);
assert(STM_PSEGMENT->running_pthread == pthread_self());
dprintf(("stm_commit_transaction()\n"));
@@ -412,6 +425,10 @@
assert(STM_SEGMENT->nursery_end == NURSERY_END);
stm_rewind_jmp_forget(STM_SEGMENT->running_thread);
+ if (globally_unique_transaction && STM_PSEGMENT->transaction_state == TS_INEVITABLE) {
+ committed_globally_unique_transaction();
+ }
+
/* done */
_finish_transaction();
/* cannot access STM_SEGMENT or STM_PSEGMENT from here ! */
@@ -516,23 +533,25 @@
invoke_and_clear_user_callbacks(1); /* for abort */
+ if (is_abort(STM_SEGMENT->nursery_end)) {
+ /* done aborting */
+ STM_SEGMENT->nursery_end = pause_signalled ? NSE_SIGPAUSE
+ : NURSERY_END;
+ }
+
_finish_transaction();
/* cannot access STM_SEGMENT or STM_PSEGMENT from here ! */
return tl;
}
-
-#ifdef STM_NO_AUTOMATIC_SETJMP
-void _test_run_abort(stm_thread_local_t *tl) __attribute__((noreturn));
-#endif
-
-void stm_abort_transaction(void)
+static void abort_with_mutex(void)
{
- s_mutex_lock();
stm_thread_local_t *tl = abort_with_mutex_no_longjmp();
s_mutex_unlock();
+ usleep(1);
+
#ifdef STM_NO_AUTOMATIC_SETJMP
_test_run_abort(tl);
#else
@@ -542,9 +561,22 @@
}
+
+#ifdef STM_NO_AUTOMATIC_SETJMP
+void _test_run_abort(stm_thread_local_t *tl) __attribute__((noreturn));
+#endif
+
+void stm_abort_transaction(void)
+{
+ s_mutex_lock();
+ abort_with_mutex();
+}
+
+
void _stm_become_inevitable(const char *msg)
{
s_mutex_lock();
+ enter_safe_point_if_requested();
if (STM_PSEGMENT->transaction_state == TS_REGULAR) {
dprintf(("become_inevitable: %s\n", msg));
@@ -560,3 +592,13 @@
s_mutex_unlock();
}
+
+void stm_become_globally_unique_transaction(stm_thread_local_t *tl,
+ const char *msg)
+{
+ stm_become_inevitable(tl, msg); /* may still abort */
+
+ s_mutex_lock();
+ synchronize_all_threads(STOP_OTHERS_AND_BECOME_GLOBALLY_UNIQUE);
+ s_mutex_unlock();
+}
diff --git a/c8/stm/core.h b/c8/stm/core.h
--- a/c8/stm/core.h
+++ b/c8/stm/core.h
@@ -57,6 +57,7 @@
uint8_t privatization_lock;
+ uint8_t safe_point;
uint8_t transaction_state;
struct tree_s *callbacks_on_commit_and_abort[2];
@@ -71,6 +72,16 @@
#endif
};
+enum /* safe_point */ {
+ SP_NO_TRANSACTION=0,
+ SP_RUNNING,
+ SP_WAIT_FOR_C_REQUEST_REMOVED,
+ SP_WAIT_FOR_C_AT_SAFE_POINT,
+#ifdef STM_TESTS
+ SP_WAIT_FOR_OTHER_THREAD,
+#endif
+};
+
enum /* transaction_state */ {
TS_NONE=0,
TS_REGULAR,
@@ -119,6 +130,12 @@
static bool _is_tl_registered(stm_thread_local_t *tl);
static bool _seems_to_be_running_transaction(void);
+static void teardown_core(void);
+static void abort_with_mutex(void) __attribute__((noreturn));
+static stm_thread_local_t *abort_with_mutex_no_longjmp(void);
+static void abort_data_structures_from_segment_num(int segment_num);
+
+
static inline void _duck(void) {
/* put a call to _duck() between two instructions that set 0 into
diff --git a/c8/stm/nursery.c b/c8/stm/nursery.c
--- a/c8/stm/nursery.c
+++ b/c8/stm/nursery.c
@@ -253,6 +253,8 @@
{
assert(!_has_mutex());
+ stm_safe_point();
+
_do_minor_collection(commit);
}
@@ -276,6 +278,7 @@
STM_SEGMENT->nursery_current -= size_rounded_up; /* restore correct val */
restart:
+ stm_safe_point();
OPT_ASSERT(size_rounded_up >= 16);
OPT_ASSERT((size_rounded_up & 7) == 0);
diff --git a/c8/stm/nursery.h b/c8/stm/nursery.h
--- a/c8/stm/nursery.h
+++ b/c8/stm/nursery.h
@@ -1,5 +1,22 @@
+
+#define NSE_SIGPAUSE _STM_NSE_SIGNAL_MAX
+
static void minor_collection(bool commit);
static void check_nursery_at_transaction_start(void);
static size_t throw_away_nursery(struct stm_priv_segment_info_s *pseg);
static void assert_memset_zero(void *s, size_t n);
+
+
+static inline bool is_abort(uintptr_t nursery_end) {
+ return (nursery_end <= _STM_NSE_SIGNAL_MAX && nursery_end != NSE_SIGPAUSE);
+}
+
+static inline bool is_aborting_now(uint8_t other_segment_num) {
+ return (is_abort(get_segment(other_segment_num)->nursery_end) &&
+ get_priv_segment(other_segment_num)->safe_point != SP_RUNNING);
+}
+
+
+#define must_abort() is_abort(STM_SEGMENT->nursery_end)
+static object_t *find_shadow(object_t *obj);
diff --git a/c8/stm/sync.c b/c8/stm/sync.c
--- a/c8/stm/sync.c
+++ b/c8/stm/sync.c
@@ -11,6 +11,7 @@
static union {
struct {
pthread_mutex_t global_mutex;
+ pthread_cond_t cond[_C_TOTAL];
/* some additional pieces of global state follow */
uint8_t in_use1[NB_SEGMENTS]; /* 1 if running a pthread */
};
@@ -22,6 +23,12 @@
{
if (pthread_mutex_init(&sync_ctl.global_mutex, NULL) != 0)
stm_fatalerror("mutex initialization: %m");
+
+ long i;
+ for (i = 0; i < _C_TOTAL; i++) {
+ if (pthread_cond_init(&sync_ctl.cond[i], NULL) != 0)
+ stm_fatalerror("cond initialization: %m");
+ }
}
static void teardown_sync(void)
@@ -29,6 +36,12 @@
if (pthread_mutex_destroy(&sync_ctl.global_mutex) != 0)
stm_fatalerror("mutex destroy: %m");
+ long i;
+ for (i = 0; i < _C_TOTAL; i++) {
+ if (pthread_cond_destroy(&sync_ctl.cond[i]) != 0)
+ stm_fatalerror("cond destroy: %m");
+ }
+
memset(&sync_ctl, 0, sizeof(sync_ctl));
}
@@ -62,6 +75,31 @@
assert((_has_mutex_here = false, 1));
}
+
+static inline void cond_wait(enum cond_type_e ctype)
+{
+#ifdef STM_NO_COND_WAIT
+ stm_fatalerror("*** cond_wait/%d called!", (int)ctype);
+#endif
+
+ assert(_has_mutex_here);
+ if (UNLIKELY(pthread_cond_wait(&sync_ctl.cond[ctype],
+ &sync_ctl.global_mutex) != 0))
+ stm_fatalerror("pthread_cond_wait/%d: %m", (int)ctype);
+}
+
+static inline void cond_signal(enum cond_type_e ctype)
+{
+ if (UNLIKELY(pthread_cond_signal(&sync_ctl.cond[ctype]) != 0))
+ stm_fatalerror("pthread_cond_signal/%d: %m", (int)ctype);
+}
+
+static inline void cond_broadcast(enum cond_type_e ctype)
+{
+ if (UNLIKELY(pthread_cond_broadcast(&sync_ctl.cond[ctype]) != 0))
+ stm_fatalerror("pthread_cond_broadcast/%d: %m", (int)ctype);
+}
+
/************************************************************/
@@ -147,3 +185,169 @@
{
set_gs_register(get_segment_base(segnum));
}
+
+#if STM_TESTS
+void _stm_start_safe_point(void)
+{
+ assert(STM_PSEGMENT->safe_point == SP_RUNNING);
+ STM_PSEGMENT->safe_point = SP_WAIT_FOR_OTHER_THREAD;
+}
+
+void _stm_stop_safe_point(void)
+{
+ assert(STM_PSEGMENT->safe_point == SP_WAIT_FOR_OTHER_THREAD);
+ STM_PSEGMENT->safe_point = SP_RUNNING;
+
+ stm_safe_point();
+}
+#endif
+
+
+
+/************************************************************/
+
+
+#ifndef NDEBUG
+static bool _safe_points_requested = false;
+#endif
+
+static void signal_everybody_to_pause_running(void)
+{
+ assert(_safe_points_requested == false);
+ assert((_safe_points_requested = true, 1));
+ assert(_has_mutex());
+
+ long i;
+ for (i = 1; i <= NB_SEGMENTS; i++) {
+ if (get_segment(i)->nursery_end == NURSERY_END)
+ get_segment(i)->nursery_end = NSE_SIGPAUSE;
+ }
+ assert(!pause_signalled);
+ pause_signalled = true;
+}
+
+static inline long count_other_threads_sp_running(void)
+{
+ /* Return the number of other threads in SP_RUNNING.
+ Asserts that SP_RUNNING threads still have the NSE_SIGxxx. */
+ long i;
+ long result = 0;
+ int my_num = STM_SEGMENT->segment_num;
+
+ for (i = 1; i <= NB_SEGMENTS; i++) {
+ if (i != my_num && get_priv_segment(i)->safe_point == SP_RUNNING) {
+ assert(get_segment(i)->nursery_end <= _STM_NSE_SIGNAL_MAX);
+ result++;
+ }
+ }
+ return result;
+}
+
+static void remove_requests_for_safe_point(void)
+{
+ assert(pause_signalled);
+ pause_signalled = false;
+ assert(_safe_points_requested == true);
+ assert((_safe_points_requested = false, 1));
+
+ long i;
+ for (i = 1; i <= NB_SEGMENTS; i++) {
+ assert(get_segment(i)->nursery_end != NURSERY_END);
+ if (get_segment(i)->nursery_end == NSE_SIGPAUSE)
+ get_segment(i)->nursery_end = NURSERY_END;
+ }
+ cond_broadcast(C_REQUEST_REMOVED);
+}
+
+static void enter_safe_point_if_requested(void)
+{
+ if (STM_SEGMENT->nursery_end == NURSERY_END)
+ return; /* fast path: no safe point requested */
+
+ assert(_seems_to_be_running_transaction());
+ assert(_has_mutex());
+ while (1) {
+ if (must_abort())
+ abort_with_mutex();
+
+ if (STM_SEGMENT->nursery_end == NURSERY_END)
+ break; /* no safe point requested */
+
+ assert(STM_SEGMENT->nursery_end == NSE_SIGPAUSE);
+ assert(pause_signalled);
+
+ /* If we are requested to enter a safe-point, we cannot proceed now.
+ Wait until the safe-point request is removed for us. */
+#ifdef STM_TESTS
+ abort_with_mutex();
+#endif
+ cond_signal(C_AT_SAFE_POINT);
+ STM_PSEGMENT->safe_point = SP_WAIT_FOR_C_REQUEST_REMOVED;
+ cond_wait(C_REQUEST_REMOVED);
+ STM_PSEGMENT->safe_point = SP_RUNNING;
+ }
+}
+
+static void synchronize_all_threads(enum sync_type_e sync_type)
+{
+ enter_safe_point_if_requested();
+
+ /* Only one thread should reach this point concurrently. This is
+ why: if several threads call this function, the first one that
+ goes past this point will set the "request safe point" on all
+ other threads; then none of the other threads will go past the
+ enter_safe_point_if_requested() above.
+ */
+ if (UNLIKELY(globally_unique_transaction)) {
+ assert(count_other_threads_sp_running() == 0);
+ return;
+ }
+
+ signal_everybody_to_pause_running();
+
+ /* If some other threads are SP_RUNNING, we cannot proceed now.
+ Wait until all other threads are suspended. */
+ while (count_other_threads_sp_running() > 0) {
+ STM_PSEGMENT->safe_point = SP_WAIT_FOR_C_AT_SAFE_POINT;
+ cond_wait(C_AT_SAFE_POINT);
+ STM_PSEGMENT->safe_point = SP_RUNNING;
+
+ if (must_abort()) {
+ remove_requests_for_safe_point(); /* => C_REQUEST_REMOVED */
+ abort_with_mutex();
+ }
+ }
+
+ if (UNLIKELY(sync_type == STOP_OTHERS_AND_BECOME_GLOBALLY_UNIQUE)) {
+ globally_unique_transaction = true;
+ assert(STM_SEGMENT->nursery_end == NSE_SIGPAUSE);
+ STM_SEGMENT->nursery_end = NURSERY_END;
+ return; /* don't remove the requests for safe-points in this case */
+ }
+
+ /* Remove the requests for safe-points now. In principle we should
+ remove it later, when the caller is done, but this is equivalent
+ as long as we hold the mutex.
+ */
+ remove_requests_for_safe_point(); /* => C_REQUEST_REMOVED */
+}
+
+static void committed_globally_unique_transaction(void)
+{
+ assert(globally_unique_transaction);
+ assert(STM_SEGMENT->nursery_end == NURSERY_END);
+ STM_SEGMENT->nursery_end = NSE_SIGPAUSE;
+ globally_unique_transaction = false;
+ remove_requests_for_safe_point();
+}
+
+void _stm_collectable_safe_point(void)
+{
+ /* If 'nursery_end' was set to NSE_SIGxxx by another thread,
+ we end up here as soon as we try to call stm_allocate() or do
+ a call to stm_safe_point().
+ */
+ s_mutex_lock();
+ enter_safe_point_if_requested();
+ s_mutex_unlock();
+}
diff --git a/c8/stm/sync.h b/c8/stm/sync.h
--- a/c8/stm/sync.h
+++ b/c8/stm/sync.h
@@ -1,9 +1,17 @@
static void setup_sync(void);
static void teardown_sync(void);
+enum cond_type_e {
+ C_AT_SAFE_POINT,
+ C_REQUEST_REMOVED,
+ _C_TOTAL
+};
static void s_mutex_lock(void);
static void s_mutex_unlock(void);
+static void cond_wait(enum cond_type_e);
+static void cond_signal(enum cond_type_e);
+static void cond_broadcast(enum cond_type_e);
#ifndef NDEBUG
static bool _has_mutex(void);
#endif
@@ -14,3 +22,13 @@
(must have the mutex acquired!) */
static bool acquire_thread_segment(stm_thread_local_t *tl);
static void release_thread_segment(stm_thread_local_t *tl);
+
+enum sync_type_e {
+ STOP_OTHERS_UNTIL_MUTEX_UNLOCK,
+ STOP_OTHERS_AND_BECOME_GLOBALLY_UNIQUE,
+};
+static void synchronize_all_threads(enum sync_type_e sync_type);
+static void committed_globally_unique_transaction(void);
+
+static bool pause_signalled, globally_unique_transaction;
+static void enter_safe_point_if_requested(void);
diff --git a/c8/stmgc.h b/c8/stmgc.h
--- a/c8/stmgc.h
+++ b/c8/stmgc.h
@@ -59,11 +59,13 @@
#define _STM_GCFLAG_WRITE_BARRIER 0x01
#define _STM_FAST_ALLOC (66*1024)
+#define _STM_NSE_SIGNAL_MAX 1
void _stm_write_slowpath(object_t *);
object_t *_stm_allocate_slowpath(ssize_t);
object_t *_stm_allocate_external(ssize_t);
void _stm_become_inevitable(const char*);
+void _stm_collectable_safe_point();
object_t *_stm_allocate_old(ssize_t size_rounded_up);
char *_stm_real_address(object_t *o);
@@ -78,6 +80,9 @@
void _stm_test_switch_segment(int segnum);
void _push_obj_to_other_segments(object_t *obj);
+void _stm_start_safe_point(void);
+void _stm_stop_safe_point(void);
+
char *_stm_get_segment_base(long index);
bool _stm_in_transaction(stm_thread_local_t *tl);
void _stm_set_nursery_free_count(uint64_t free_count);
@@ -200,6 +205,10 @@
long stm_call_on_abort(stm_thread_local_t *, void *key, void callback(void *));
long stm_call_on_commit(stm_thread_local_t *, void *key, void callback(void *));
+static inline void stm_safe_point(void) {
+ if (STM_SEGMENT->nursery_end <= _STM_NSE_SIGNAL_MAX)
+ _stm_collectable_safe_point();
+}
#ifdef STM_NO_AUTOMATIC_SETJMP
@@ -216,6 +225,7 @@
_stm_become_inevitable(msg);
}
+void stm_become_globally_unique_transaction(stm_thread_local_t *tl, const char *msg);
/* ==================== END ==================== */
diff --git a/c8/test/support.py b/c8/test/support.py
--- a/c8/test/support.py
+++ b/c8/test/support.py
@@ -45,6 +45,8 @@
bool _check_stm_validate();
object_t *stm_setup_prebuilt(object_t *);
+void _stm_start_safe_point(void);
+bool _check_stop_safe_point(void);
bool _checked_stm_write(object_t *obj);
@@ -65,6 +67,7 @@
bool _check_commit_transaction(void);
bool _check_abort_transaction(void);
bool _check_become_inevitable(stm_thread_local_t *tl);
+bool _check_become_globally_unique_transaction(stm_thread_local_t *tl);
int stm_is_inevitable(void);
void _set_type_id(object_t *obj, uint32_t h);
@@ -152,6 +155,10 @@
CHECKED(stm_commit_transaction());
}
+bool _check_stop_safe_point(void) {
+ CHECKED(_stm_stop_safe_point());
+}
+
bool _check_abort_transaction(void) {
CHECKED(stm_abort_transaction());
}
@@ -160,6 +167,10 @@
CHECKED(stm_become_inevitable(tl, "TEST"));
}
+bool _check_become_globally_unique_transaction(stm_thread_local_t *tl) {
+ CHECKED(stm_become_globally_unique_transaction(tl, "TESTGUT"));
+}
+
bool _check_stm_validate(void) {
CHECKED(stm_validate(NULL));
}
@@ -491,13 +502,17 @@
def switch(self, thread_num):
assert thread_num != self.current_thread
+ tl = self.tls[self.current_thread]
+ if lib._stm_in_transaction(tl):
+ stm_start_safe_point()
#
self.current_thread = thread_num
tl2 = self.tls[thread_num]
#
if lib._stm_in_transaction(tl2):
lib._stm_test_switch(tl2)
- stm_validate() # can raise
+ stm_stop_safe_point() # can raise Conflict
+ stm_validate() # can raise Conflict
def switch_to_segment(self, seg_num):
lib._stm_test_switch_segment(seg_num)
@@ -542,3 +557,8 @@
tl = self.tls[self.current_thread]
if lib._check_become_inevitable(tl):
raise Conflict()
+
+ def become_globally_unique_transaction(self):
+ tl = self.tls[self.current_thread]
+ if lib._check_become_globally_unique_transaction(tl):
+ raise Conflict()
More information about the pypy-commit
mailing list