[pypy-commit] pypy stm-thread: Dynamically shorten transactions if they start failing.

arigo noreply at buildbot.pypy.org
Thu Jun 7 18:05:43 CEST 2012


Author: Armin Rigo <arigo at tunes.org>
Branch: stm-thread
Changeset: r55473:4f11e8850c57
Date: 2012-06-07 18:05 +0200
http://bitbucket.org/pypy/pypy/changeset/4f11e8850c57/

Log:	Dynamically shorten transactions if they start failing.

diff --git a/pypy/translator/stm/src_stm/core.c b/pypy/translator/stm/src_stm/core.c
--- a/pypy/translator/stm/src_stm/core.c
+++ b/pypy/translator/stm/src_stm/core.c
@@ -12,6 +12,8 @@
   /*unsigned long last_known_global_timestamp;*/
   owner_version_t my_lock_word;
   struct OrecList reads;
+  long atomic;   /* 0 = not atomic, > 0 atomic */
+  long reads_size_limit, reads_size_limit_nonatomic; /* see should_break_tr. */
   int active;    /* 0 = inactive, 1 = regular, 2 = inevitable */
   unsigned num_commits;
   unsigned num_aborts[ABORT_REASONS];
@@ -523,6 +525,13 @@
   free(d);
 }
 
+static void update_reads_size_limit(struct tx_descriptor *d)
+{
+  /* 'reads_size_limit' is set to LONG_MAX if we are atomic; else
+     we copy the value from reads_size_limit_nonatomic. */
+  d->reads_size_limit = d->atomic ? LONG_MAX : d->reads_size_limit_nonatomic;
+}
+
 static void begin_transaction(jmp_buf* buf)
 {
   struct tx_descriptor *d = thread_descriptor;
@@ -530,6 +539,15 @@
   d->active = 1;
   d->setjmp_buf = buf;
   d->start_time = (/*d->last_known_global_timestamp*/ global_timestamp) & ~1;
+  update_reads_size_limit(d);
+}
+
+static void make_inevitable(struct tx_descriptor *d)
+{
+  d->setjmp_buf = NULL;
+  d->active = 2;
+  d->reads_size_limit_nonatomic = 0;
+  update_reads_size_limit(d);
 }
 
 void stm_begin_inevitable_transaction(void)
@@ -538,8 +556,7 @@
      except more efficient */
   struct tx_descriptor *d = thread_descriptor;
   assert(d->active == 0);
-  d->active = 2;
-  d->setjmp_buf = NULL;
+  make_inevitable(d);
 
   mutex_lock();
   while (1)
@@ -662,8 +679,8 @@
       mutex_unlock();
       tx_spinloop(6);
     }
-  d->setjmp_buf = NULL;   /* inevitable from now on */
-  d->active = 2;
+  make_inevitable(d);   /* inevitable from now on */
+
 #ifdef RPY_STM_DEBUG_PRINT
   PYPY_DEBUG_STOP("stm-inevitable");
 #endif
@@ -735,24 +752,49 @@
   return is_inevitable(d);
 }
 
-static __thread long stm_atomic = 0;
 static long stm_regular_length_limit = LONG_MAX;
 
 void stm_add_atomic(long delta)
 {
-  stm_atomic += delta;
+  struct tx_descriptor *d = thread_descriptor;
+  d->atomic += delta;
+  update_reads_size_limit(d);
 }
 
 long stm_get_atomic(void)
 {
-  return stm_atomic;
+  struct tx_descriptor *d = thread_descriptor;
+  return d->atomic;
 }
 
 long stm_should_break_transaction(void)
 {
   struct tx_descriptor *d = thread_descriptor;
-  return !stm_atomic && (is_inevitable(d) ||
-                         d->reads.size >= stm_regular_length_limit);
+
+  /* a single comparison to handle all cases:
+
+     - if d->atomic, then we should return False.  This is done by
+       forcing reads_size_limit to LONG_MAX as soon as atomic > 0.
+
+     - otherwise, if is_inevitable(), then we should return True.
+       This is done by forcing both reads_size_limit and
+       reads_size_limit_nonatomic to 0 in that case.
+
+     - finally, the default case: return True if d->reads.size is
+       greater than reads_size_limit == reads_size_limit_nonatomic.
+  */
+#ifdef RPY_STM_ASSERT
+  /* reads_size_limit is LONG_MAX if d->atomic, or else it is equal to
+     reads_size_limit_nonatomic. */
+  assert(d->reads_size_limit == (d->atomic ? LONG_MAX :
+                                     d->reads_size_limit_nonatomic));
+  /* if is_inevitable(), reads_size_limit_nonatomic should be 0
+     (and thus reads_size_limit too, if !d->atomic.) */
+  if (is_inevitable(d))
+    assert(d->reads_size_limit_nonatomic == 0);
+#endif
+
+  return d->reads.size >= d->reads_size_limit;
 }
 
 void stm_set_transaction_length(long length_max)
@@ -770,19 +812,20 @@
   jmp_buf _jmpbuf;
   long volatile v_counter = 0;
   void **volatile v_saved_value;
-  long volatile v_atomic = stm_atomic;
-  assert((!thread_descriptor->active) == (!stm_atomic));
+  long volatile v_atomic = thread_descriptor->atomic;
+  assert((!thread_descriptor->active) == (!v_atomic));
   v_saved_value = *(void***)save_and_restore;
   /***/
   setjmp(_jmpbuf);
   /* After setjmp(), the local variables v_* are preserved because they
    * are volatile.  The other variables are only declared here. */
+  struct tx_descriptor *d = thread_descriptor;
   long counter, result;
   void **restore_value;
   counter = v_counter;
-  stm_atomic = v_atomic;
+  d->atomic = v_atomic;
   restore_value = v_saved_value;
-  if (!stm_atomic)
+  if (!d->atomic)
     {
       /* In non-atomic mode, we are now between two transactions.
          It means that in the next transaction's collections we know
@@ -795,16 +838,22 @@
   do
     {
       v_counter = counter + 1;
-      if (!stm_atomic)
+      /* initialize 'reads_size_limit_nonatomic' from the configured
+         length limit, scaled down by a factor of 2 for each time we
+         retry an aborted transaction.  Note that as soon as such a
+         shortened transaction succeeds, the next one will again have
+         full length, for now. */
+      d->reads_size_limit_nonatomic = stm_regular_length_limit >> counter;
+      if (!d->atomic)
         begin_transaction(&_jmpbuf);
       result = callback(arg, counter);
-      if (!stm_atomic)
+      if (!d->atomic)
         stm_commit_transaction();
       counter = 0;
     }
   while (result == 1);  /* also stops if we got an RPython exception */
 
-  if (stm_atomic && thread_descriptor->setjmp_buf == &_jmpbuf)
+  if (d->atomic && thread_descriptor->setjmp_buf == &_jmpbuf)
     stm_try_inevitable(STM_EXPLAIN1("perform_transaction left with atomic"));
 
   *(void***)save_and_restore = v_saved_value;


More information about the pypy-commit mailing list