[pypy-commit] pypy gil-improvement: Port the code. Trying with a random but larger value for

arigo noreply at buildbot.pypy.org
Fri Sep 30 22:42:37 CEST 2011


Author: Armin Rigo <arigo at tunes.org>
Branch: gil-improvement
Changeset: r47732:c9ecb31b1680
Date: 2011-09-30 22:41 +0200
http://bitbucket.org/pypy/pypy/changeset/c9ecb31b1680/

Log:	Port the code. Trying with a random but larger value for
	sys.checkinterval; will experiment more.

diff --git a/pypy/interpreter/executioncontext.py b/pypy/interpreter/executioncontext.py
--- a/pypy/interpreter/executioncontext.py
+++ b/pypy/interpreter/executioncontext.py
@@ -307,7 +307,11 @@
         self._nonperiodic_actions = []
         self.has_bytecode_counter = False
         self.fired_actions = None
-        self.checkinterval_scaled = 100 * TICK_COUNTER_STEP
+        # the default value is not 100, unlike CPython 2.7, but a much
+        # larger value, because we use a technique that not only allows
+        # but actually *forces* another thread to run whenever the counter
+        # reaches zero.
+        self.checkinterval_scaled = 10000 * TICK_COUNTER_STEP
         self._rebuild_action_dispatcher()
 
     def fire(self, action):
diff --git a/pypy/module/thread/gil.py b/pypy/module/thread/gil.py
--- a/pypy/module/thread/gil.py
+++ b/pypy/module/thread/gil.py
@@ -16,7 +16,7 @@
 
 class GILThreadLocals(OSThreadLocals):
     """A version of OSThreadLocals that enforces a GIL."""
-    ll_GIL = thread.null_ll_lock
+    gil_ready = False
 
     def initialize(self, space):
         # add the GIL-releasing callback as an action on the space
@@ -25,12 +25,10 @@
 
     def setup_threads(self, space):
         """Enable threads in the object space, if they haven't already been."""
-        if not self.ll_GIL:
-            try:
-                self.ll_GIL = thread.allocate_ll_lock()
-            except thread.error:
+        if not self.gil_ready:
+            if not thread.gil_allocate():
                 raise wrap_thread_error(space, "can't allocate GIL")
-            thread.acquire_NOAUTO(self.ll_GIL, True)
+            self.gil_ready = True
             self.enter_thread(space)   # setup the main thread
             result = True
         else:
@@ -44,19 +42,16 @@
         # test_lock_again after the global state was cleared by
         # test_compile_lock.  As a workaround, we repatch these global
         # fields systematically.
-        spacestate.ll_GIL = self.ll_GIL
         invoke_around_extcall(before_external_call, after_external_call)
         return result
 
     def reinit_threads(self, space):
-        if self.ll_GIL:
-            self.ll_GIL = thread.allocate_ll_lock()
-            thread.acquire_NOAUTO(self.ll_GIL, True)
-            self.enter_thread(space)
+        if self.gil_ready:
+            self.gil_ready = False
+            self.setup_threads(space)
 
     def yield_thread(self):
-        thread.yield_thread()  # explicitly release the gil (used by test_gil)
-
+        do_yield_thread()
 
 class GILReleaseAction(PeriodicAsyncAction):
     """An action called every sys.checkinterval bytecodes.  It releases
@@ -64,16 +59,12 @@
     """
 
     def perform(self, executioncontext, frame):
-        # Other threads can run between the release() and the acquire()
-        # implicit in the following external function call (which has
-        # otherwise no effect).
-        thread.yield_thread()
+        do_yield_thread()
 
 
 class SpaceState:
 
     def _freeze_(self):
-        self.ll_GIL = thread.null_ll_lock
         self.action_after_thread_switch = None
         # ^^^ set by AsyncAction.fire_after_thread_switch()
         return False
@@ -95,14 +86,14 @@
     # this function must not raise, in such a way that the exception
     # transformer knows that it cannot raise!
     e = get_errno()
-    thread.release_NOAUTO(spacestate.ll_GIL)
+    thread.gil_release()
     set_errno(e)
 before_external_call._gctransformer_hint_cannot_collect_ = True
 before_external_call._dont_reach_me_in_del_ = True
 
 def after_external_call():
     e = get_errno()
-    thread.acquire_NOAUTO(spacestate.ll_GIL, True)
+    thread.gil_acquire()
     thread.gc_thread_run()
     spacestate.after_thread_switch()
     set_errno(e)
@@ -115,3 +106,17 @@
 # pointers in the shadow stack.  This is necessary because the GIL is
 # not held after the call to before_external_call() or before the call
 # to after_external_call().
+
+def do_yield_thread():
+    # explicitly release the gil, in a way that tries to give more
+    # priority to other threads (as opposed to continuing to run in
+    # the same thread).
+    if thread.gil_yield_thread():
+        thread.gc_thread_run()
+        spacestate.after_thread_switch()
+do_yield_thread._gctransformer_hint_close_stack_ = True
+do_yield_thread._dont_reach_me_in_del_ = True
+
+# do_yield_thread() needs a different hint: _gctransformer_hint_close_stack_.
+# The *_external_call() functions are themselves called only from the rffi
+# module from a helper function that also has this hint.
diff --git a/pypy/module/thread/ll_thread.py b/pypy/module/thread/ll_thread.py
--- a/pypy/module/thread/ll_thread.py
+++ b/pypy/module/thread/ll_thread.py
@@ -17,7 +17,8 @@
     include_dirs = [str(py.path.local(autopath.pypydir).join('translator', 'c'))],
     export_symbols = ['RPyThreadGetIdent', 'RPyThreadLockInit',
                       'RPyThreadAcquireLock', 'RPyThreadReleaseLock',
-                      'RPyThreadYield',
+                      'RPyGilAllocate', 'RPyGilYieldThread',
+                      'RPyGilRelease', 'RPyGilAcquire',
                       'RPyThreadGetStackSize', 'RPyThreadSetStackSize',
                       'RPyOpaqueDealloc_ThreadLock',
                       'RPyThreadAfterFork']
@@ -69,8 +70,16 @@
                                          [TLOCKP], lltype.Void,
                                          _nowrapper=True)
 
-# this function does nothing apart from releasing the GIL temporarily.
-yield_thread = llexternal('RPyThreadYield', [], lltype.Void, threadsafe=True)
+# these functions manipulate directly the GIL, whose definition does not
+# escape the C code itself
+gil_allocate     = llexternal('RPyGilAllocate', [], lltype.Signed,
+                              _nowrapper=True)
+gil_yield_thread = llexternal('RPyGilYieldThread', [], lltype.Signed,
+                              _nowrapper=True)
+gil_release      = llexternal('RPyGilRelease', [], lltype.Void,
+                              _nowrapper=True)
+gil_acquire      = llexternal('RPyGilAcquire', [], lltype.Void,
+                              _nowrapper=True)
 
 def allocate_lock():
     return Lock(allocate_ll_lock())
diff --git a/pypy/module/thread/test/test_gil.py b/pypy/module/thread/test/test_gil.py
--- a/pypy/module/thread/test/test_gil.py
+++ b/pypy/module/thread/test/test_gil.py
@@ -30,19 +30,34 @@
     use_threads = True
     bigtest = False
 
-    def test_one_thread(self):
+    def test_one_thread(self, skew=+1):
+        from pypy.rlib.debug import debug_print
         if self.bigtest:
-            N = 1000000
+            N = 100000
+            skew *= 25000
         else:
             N = 100
+            skew *= 25
         space = FakeSpace()
         class State:
             pass
         state = State()
-        def runme():
-            for i in range(N):
+        def runme(main=False):
+            j = 0
+            for i in range(N + [-skew, skew][main]):
+                state.datalen1 += 1   # try to crash if the GIL is not
+                state.datalen2 += 1   # correctly acquired
                 state.data.append((thread.get_ident(), i))
+                state.datalen3 += 1
+                state.datalen4 += 1
+                assert state.datalen1 == len(state.data)
+                assert state.datalen2 == len(state.data)
+                assert state.datalen3 == len(state.data)
+                assert state.datalen4 == len(state.data)
+                debug_print(main, i, state.datalen4)
                 state.threadlocals.yield_thread()
+                assert i == j
+                j += 1
         def bootstrap():
             try:
                 runme()
@@ -50,20 +65,26 @@
                 thread.gc_thread_die()
         def f():
             state.data = []
+            state.datalen1 = 0
+            state.datalen2 = 0
+            state.datalen3 = 0
+            state.datalen4 = 0
             state.threadlocals = gil.GILThreadLocals()
             state.threadlocals.setup_threads(space)
             thread.gc_thread_prepare()
             subident = thread.start_new_thread(bootstrap, ())
             mainident = thread.get_ident()
-            runme()
+            runme(True)
             still_waiting = 3000
             while len(state.data) < 2*N:
+                debug_print(len(state.data))
                 if not still_waiting:
                     raise ValueError("time out")
                 still_waiting -= 1
                 if not we_are_translated(): gil.before_external_call()
                 time.sleep(0.01)
                 if not we_are_translated(): gil.after_external_call()
+            debug_print("leaving!")
             i1 = i2 = 0
             for tid, i in state.data:
                 if tid == mainident:
@@ -72,14 +93,17 @@
                     assert i == i2; i2 += 1
                 else:
                     assert 0
-            assert i1 == N
-            assert i2 == N
+            assert i1 == N + skew
+            assert i2 == N - skew
             return len(state.data)
 
         fn = self.getcompiled(f, [])
         res = fn()
         assert res == 2*N
 
+    def test_one_thread_rev(self):
+        self.test_one_thread(skew=-1)
+
 
 class TestRunDirectly(GILTests):
     def getcompiled(self, f, argtypes):
diff --git a/pypy/translator/c/src/thread.h b/pypy/translator/c/src/thread.h
--- a/pypy/translator/c/src/thread.h
+++ b/pypy/translator/c/src/thread.h
@@ -37,14 +37,9 @@
 
 #endif
 
-/* common helper: this does nothing, but is called with the GIL released.
-   This gives other threads a chance to grab the GIL and run. */
-void RPyThreadYield(void);
-
-#ifndef PYPY_NOT_MAIN_FILE
-void RPyThreadYield(void)
-{
-}
-#endif
+long RPyGilAllocate(void);
+long RPyGilYieldThread(void);
+void RPyGilRelease(void);
+void RPyGilAcquire(void);
 
 #endif
diff --git a/pypy/translator/c/src/thread_pthread.h b/pypy/translator/c/src/thread_pthread.h
--- a/pypy/translator/c/src/thread_pthread.h
+++ b/pypy/translator/c/src/thread_pthread.h
@@ -12,6 +12,7 @@
 #include <signal.h>
 #include <stdio.h>
 #include <errno.h>
+#include <assert.h>
 
 /* The following is hopefully equivalent to what CPython does
    (which is trying to compile a snippet of code using it) */
@@ -459,4 +460,111 @@
 #define RPyThreadTLS_Set(key, value)	pthread_setspecific(key, value)
 
 
+/************************************************************/
+/* GIL code                                                 */
+/************************************************************/
+
+#ifdef __llvm__
+#  define HAS_ATOMIC_ADD
+#endif
+
+#ifdef __GNUC__
+#  if __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 1)
+#    define HAS_ATOMIC_ADD
+#  endif
+#endif
+
+#ifdef HAS_ATOMIC_ADD
+#  define atomic_add __sync_fetch_and_add
+#else
+#  if defined(__amd64__)
+#    define atomic_add(ptr, value)  asm volatile ("lock addq %0, %1"        \
+                                 : : "ri"(value), "m"(*(ptr)) : "memory")
+#  elif defined(__i386__)
+#    define atomic_add(ptr, value)  asm volatile ("lock addl %0, %1"        \
+                                 : : "ri"(value), "m"(*(ptr)) : "memory")
+#  else
+#    error "Please use gcc >= 4.1 or write a custom 'asm' for your CPU."
+#  endif
+#endif
+
+#define ASSERT_STATUS(call)                             \
+    if (call != 0) {                                    \
+        fprintf(stderr, "Fatal error: " #call "\n");    \
+        abort();                                        \
+    }
+
+static void _debug_print(const char *msg)
+{
+    int col = (int)pthread_self();
+    col = 31 + ((col / 8) % 8);
+    fprintf(stderr, "\033[%dm%s\033[0m", col, msg);
+}
+
+static volatile long pending_acquires = -1;
+static pthread_mutex_t mutex_gil = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t cond_gil = PTHREAD_COND_INITIALIZER;
+
+static void assert_has_the_gil(void)
+{
+#ifdef RPY_ASSERT
+    assert(pthread_mutex_trylock(&mutex_gil) != 0);
+    assert(pending_acquires >= 0);
+#endif
+}
+
+long RPyGilAllocate(void)
+{
+    _debug_print("RPyGilAllocate\n");
+    pending_acquires = 0;
+    pthread_mutex_trylock(&mutex_gil);
+    assert_has_the_gil();
+    return 1;
+}
+
+long RPyGilYieldThread(void)
+{
+    /* can be called even before RPyGilAllocate(), but in this case,
+       pending_acquires will be -1 */
+#ifdef RPY_ASSERT
+    if (pending_acquires >= 0)
+        assert_has_the_gil();
+#endif
+    if (pending_acquires <= 0)
+        return 0;
+    atomic_add(&pending_acquires, 1L);
+    _debug_print("{");
+    ASSERT_STATUS(pthread_cond_wait(&cond_gil, &mutex_gil));
+    _debug_print("}");
+    atomic_add(&pending_acquires, -1L);
+    ASSERT_STATUS(pthread_cond_signal(&cond_gil));
+    assert_has_the_gil();
+    return 1;
+}
+
+void RPyGilRelease(void)
+{
+    _debug_print("RPyGilRelease\n");
+#ifdef RPY_ASSERT
+    assert(pending_acquires >= 0);
+#endif
+    assert_has_the_gil();
+    ASSERT_STATUS(pthread_mutex_unlock(&mutex_gil));
+}
+
+void RPyGilAcquire(void)
+{
+    _debug_print("about to RPyGilAcquire...\n");
+#ifdef RPY_ASSERT
+    assert(pending_acquires >= 0);
+#endif
+    atomic_add(&pending_acquires, 1L);
+    ASSERT_STATUS(pthread_mutex_lock(&mutex_gil));
+    atomic_add(&pending_acquires, -1L);
+    assert_has_the_gil();
+    ASSERT_STATUS(pthread_cond_signal(&cond_gil));
+    _debug_print("RPyGilAcquire\n");
+}
+
+
 #endif /* PYPY_NOT_MAIN_FILE */


More information about the pypy-commit mailing list