[pypy-commit] pypy default: Merge gil-improvement: improve the balance of threads getting the GIL,

arigo noreply at buildbot.pypy.org
Mon Oct 3 11:28:31 CEST 2011


Author: Armin Rigo <arigo at tunes.org>
Branch: 
Changeset: r47789:99233931c2bf
Date: 2011-10-03 11:28 +0200
http://bitbucket.org/pypy/pypy/changeset/99233931c2bf/

Log:	Merge gil-improvement: improve the balance of threads getting the
	GIL, notably when there is a mixture of CPU-intensive and I/O
	threads.

	Previously, any CPU-intensive thread would release and immediately
	re-acquire the GIL every 100 bytecodes; but most of the time on
	multi-CPU machines this didn't actually gave other threads a chance
	to run. And so when an I/O thread did any operation, chances were
	that they would loose control and not regain it for quite a long
	time.

	Now, the yield-after-N-bytecodes step is different from the
	releasing and reacquiring the GIL around I/O: the former actually
	forces another thread to run (if there is one), while the later does
	not (which, in the common case where the I/O is very short, tends to
	let the same thread continuing to run). Also, using 10000 instead
	of 100 bytecodes as sys.checkinterval makes more sense in this
	version.

	This makes the example in https://bugs.pypy.org/issue884 work
	without any noticable delay at all, at least on Linux.

	The Windows version was tested too, but didn't give so good
	improvements; see comment in thread_nt.c. Will try to tweak a bit
	more, so I'm not closing the gil-improvement branch right now...

diff --git a/pypy/interpreter/executioncontext.py b/pypy/interpreter/executioncontext.py
--- a/pypy/interpreter/executioncontext.py
+++ b/pypy/interpreter/executioncontext.py
@@ -307,7 +307,11 @@
         self._nonperiodic_actions = []
         self.has_bytecode_counter = False
         self.fired_actions = None
-        self.checkinterval_scaled = 100 * TICK_COUNTER_STEP
+        # the default value is not 100, unlike CPython 2.7, but a much
+        # larger value, because we use a technique that not only allows
+        # but actually *forces* another thread to run whenever the counter
+        # reaches zero.
+        self.checkinterval_scaled = 10000 * TICK_COUNTER_STEP
         self._rebuild_action_dispatcher()
 
     def fire(self, action):
diff --git a/pypy/module/thread/gil.py b/pypy/module/thread/gil.py
--- a/pypy/module/thread/gil.py
+++ b/pypy/module/thread/gil.py
@@ -16,7 +16,7 @@
 
 class GILThreadLocals(OSThreadLocals):
     """A version of OSThreadLocals that enforces a GIL."""
-    ll_GIL = thread.null_ll_lock
+    gil_ready = False
 
     def initialize(self, space):
         # add the GIL-releasing callback as an action on the space
@@ -25,12 +25,10 @@
 
     def setup_threads(self, space):
         """Enable threads in the object space, if they haven't already been."""
-        if not self.ll_GIL:
-            try:
-                self.ll_GIL = thread.allocate_ll_lock()
-            except thread.error:
+        if not self.gil_ready:
+            if not thread.gil_allocate():
                 raise wrap_thread_error(space, "can't allocate GIL")
-            thread.acquire_NOAUTO(self.ll_GIL, True)
+            self.gil_ready = True
             self.enter_thread(space)   # setup the main thread
             result = True
         else:
@@ -44,19 +42,16 @@
         # test_lock_again after the global state was cleared by
         # test_compile_lock.  As a workaround, we repatch these global
         # fields systematically.
-        spacestate.ll_GIL = self.ll_GIL
         invoke_around_extcall(before_external_call, after_external_call)
         return result
 
     def reinit_threads(self, space):
-        if self.ll_GIL:
-            self.ll_GIL = thread.allocate_ll_lock()
-            thread.acquire_NOAUTO(self.ll_GIL, True)
-            self.enter_thread(space)
+        if self.gil_ready:
+            self.gil_ready = False
+            self.setup_threads(space)
 
     def yield_thread(self):
-        thread.yield_thread()  # explicitly release the gil (used by test_gil)
-
+        do_yield_thread()
 
 class GILReleaseAction(PeriodicAsyncAction):
     """An action called every sys.checkinterval bytecodes.  It releases
@@ -64,16 +59,12 @@
     """
 
     def perform(self, executioncontext, frame):
-        # Other threads can run between the release() and the acquire()
-        # implicit in the following external function call (which has
-        # otherwise no effect).
-        thread.yield_thread()
+        do_yield_thread()
 
 
 class SpaceState:
 
     def _freeze_(self):
-        self.ll_GIL = thread.null_ll_lock
         self.action_after_thread_switch = None
         # ^^^ set by AsyncAction.fire_after_thread_switch()
         return False
@@ -95,14 +86,14 @@
     # this function must not raise, in such a way that the exception
     # transformer knows that it cannot raise!
     e = get_errno()
-    thread.release_NOAUTO(spacestate.ll_GIL)
+    thread.gil_release()
     set_errno(e)
 before_external_call._gctransformer_hint_cannot_collect_ = True
 before_external_call._dont_reach_me_in_del_ = True
 
 def after_external_call():
     e = get_errno()
-    thread.acquire_NOAUTO(spacestate.ll_GIL, True)
+    thread.gil_acquire()
     thread.gc_thread_run()
     spacestate.after_thread_switch()
     set_errno(e)
@@ -115,3 +106,18 @@
 # pointers in the shadow stack.  This is necessary because the GIL is
 # not held after the call to before_external_call() or before the call
 # to after_external_call().
+
+def do_yield_thread():
+    # explicitly release the gil, in a way that tries to give more
+    # priority to other threads (as opposed to continuing to run in
+    # the same thread).
+    if thread.gil_yield_thread():
+        thread.gc_thread_run()
+        spacestate.after_thread_switch()
+do_yield_thread._gctransformer_hint_close_stack_ = True
+do_yield_thread._dont_reach_me_in_del_ = True
+do_yield_thread._dont_inline_ = True
+
+# do_yield_thread() needs a different hint: _gctransformer_hint_close_stack_.
+# The *_external_call() functions are themselves called only from the rffi
+# module from a helper function that also has this hint.
diff --git a/pypy/module/thread/ll_thread.py b/pypy/module/thread/ll_thread.py
--- a/pypy/module/thread/ll_thread.py
+++ b/pypy/module/thread/ll_thread.py
@@ -17,7 +17,8 @@
     include_dirs = [str(py.path.local(autopath.pypydir).join('translator', 'c'))],
     export_symbols = ['RPyThreadGetIdent', 'RPyThreadLockInit',
                       'RPyThreadAcquireLock', 'RPyThreadReleaseLock',
-                      'RPyThreadYield',
+                      'RPyGilAllocate', 'RPyGilYieldThread',
+                      'RPyGilRelease', 'RPyGilAcquire',
                       'RPyThreadGetStackSize', 'RPyThreadSetStackSize',
                       'RPyOpaqueDealloc_ThreadLock',
                       'RPyThreadAfterFork']
@@ -69,8 +70,16 @@
                                          [TLOCKP], lltype.Void,
                                          _nowrapper=True)
 
-# this function does nothing apart from releasing the GIL temporarily.
-yield_thread = llexternal('RPyThreadYield', [], lltype.Void, threadsafe=True)
+# these functions manipulate directly the GIL, whose definition does not
+# escape the C code itself
+gil_allocate     = llexternal('RPyGilAllocate', [], lltype.Signed,
+                              _nowrapper=True)
+gil_yield_thread = llexternal('RPyGilYieldThread', [], lltype.Signed,
+                              _nowrapper=True)
+gil_release      = llexternal('RPyGilRelease', [], lltype.Void,
+                              _nowrapper=True)
+gil_acquire      = llexternal('RPyGilAcquire', [], lltype.Void,
+                              _nowrapper=True)
 
 def allocate_lock():
     return Lock(allocate_ll_lock())
diff --git a/pypy/module/thread/test/test_gil.py b/pypy/module/thread/test/test_gil.py
--- a/pypy/module/thread/test/test_gil.py
+++ b/pypy/module/thread/test/test_gil.py
@@ -30,19 +30,34 @@
     use_threads = True
     bigtest = False
 
-    def test_one_thread(self):
+    def test_one_thread(self, skew=+1):
+        from pypy.rlib.debug import debug_print
         if self.bigtest:
-            N = 1000000
+            N = 100000
+            skew *= 25000
         else:
             N = 100
+            skew *= 25
         space = FakeSpace()
         class State:
             pass
         state = State()
-        def runme():
-            for i in range(N):
+        def runme(main=False):
+            j = 0
+            for i in range(N + [-skew, skew][main]):
+                state.datalen1 += 1   # try to crash if the GIL is not
+                state.datalen2 += 1   # correctly acquired
                 state.data.append((thread.get_ident(), i))
+                state.datalen3 += 1
+                state.datalen4 += 1
+                assert state.datalen1 == len(state.data)
+                assert state.datalen2 == len(state.data)
+                assert state.datalen3 == len(state.data)
+                assert state.datalen4 == len(state.data)
+                debug_print(main, i, state.datalen4)
                 state.threadlocals.yield_thread()
+                assert i == j
+                j += 1
         def bootstrap():
             try:
                 runme()
@@ -50,20 +65,26 @@
                 thread.gc_thread_die()
         def f():
             state.data = []
+            state.datalen1 = 0
+            state.datalen2 = 0
+            state.datalen3 = 0
+            state.datalen4 = 0
             state.threadlocals = gil.GILThreadLocals()
             state.threadlocals.setup_threads(space)
             thread.gc_thread_prepare()
             subident = thread.start_new_thread(bootstrap, ())
             mainident = thread.get_ident()
-            runme()
+            runme(True)
             still_waiting = 3000
             while len(state.data) < 2*N:
+                debug_print(len(state.data))
                 if not still_waiting:
                     raise ValueError("time out")
                 still_waiting -= 1
                 if not we_are_translated(): gil.before_external_call()
                 time.sleep(0.01)
                 if not we_are_translated(): gil.after_external_call()
+            debug_print("leaving!")
             i1 = i2 = 0
             for tid, i in state.data:
                 if tid == mainident:
@@ -72,14 +93,17 @@
                     assert i == i2; i2 += 1
                 else:
                     assert 0
-            assert i1 == N
-            assert i2 == N
+            assert i1 == N + skew
+            assert i2 == N - skew
             return len(state.data)
 
         fn = self.getcompiled(f, [])
         res = fn()
         assert res == 2*N
 
+    def test_one_thread_rev(self):
+        self.test_one_thread(skew=-1)
+
 
 class TestRunDirectly(GILTests):
     def getcompiled(self, f, argtypes):
diff --git a/pypy/module/thread/test/test_thread.py b/pypy/module/thread/test/test_thread.py
--- a/pypy/module/thread/test/test_thread.py
+++ b/pypy/module/thread/test/test_thread.py
@@ -225,7 +225,8 @@
 
         def busy_wait():
             for x in range(1000):
-                time.sleep(0.01)
+                print 'tick...', x  # <-force the GIL to be released, as
+                time.sleep(0.01)    #   time.sleep doesn't do non-translated
 
         # This is normally called by app_main.py
         signal.signal(signal.SIGINT, signal.default_int_handler)
diff --git a/pypy/translator/c/gcc/trackgcroot.py b/pypy/translator/c/gcc/trackgcroot.py
--- a/pypy/translator/c/gcc/trackgcroot.py
+++ b/pypy/translator/c/gcc/trackgcroot.py
@@ -486,6 +486,8 @@
         'paddq', 'pinsr',
         # zero-extending moves should not produce GC pointers
         'movz', 
+        # locked operations should not move GC pointers, at least so far
+        'lock',
         ])
 
     # a partial list is hopefully good enough for now; it's all to support
diff --git a/pypy/translator/c/src/thread.h b/pypy/translator/c/src/thread.h
--- a/pypy/translator/c/src/thread.h
+++ b/pypy/translator/c/src/thread.h
@@ -37,14 +37,9 @@
 
 #endif
 
-/* common helper: this does nothing, but is called with the GIL released.
-   This gives other threads a chance to grab the GIL and run. */
-void RPyThreadYield(void);
-
-#ifndef PYPY_NOT_MAIN_FILE
-void RPyThreadYield(void)
-{
-}
-#endif
+long RPyGilAllocate(void);
+long RPyGilYieldThread(void);
+void RPyGilRelease(void);
+void RPyGilAcquire(void);
 
 #endif
diff --git a/pypy/translator/c/src/thread_nt.h b/pypy/translator/c/src/thread_nt.h
--- a/pypy/translator/c/src/thread_nt.h
+++ b/pypy/translator/c/src/thread_nt.h
@@ -221,4 +221,57 @@
 #define RPyThreadTLS_Set(key, value)	TlsSetValue(key, value)
 
 
+/************************************************************/
+/* GIL code                                                 */
+/************************************************************/
+
+static volatile LONG pending_acquires = -1;
+static CRITICAL_SECTION mutex_gil;
+static HANDLE cond_gil;
+
+long RPyGilAllocate(void)
+{
+    pending_acquires = 0;
+    InitializeCriticalSection(&mutex_gil);
+    EnterCriticalSection(&mutex_gil);
+    cond_gil = CreateEvent (NULL, FALSE, FALSE, NULL);
+    return 1;
+}
+
+long RPyGilYieldThread(void)
+{
+    /* can be called even before RPyGilAllocate(), but in this case,
+       pending_acquires will be -1 */
+    if (pending_acquires <= 0)
+        return 0;
+    InterlockedIncrement(&pending_acquires);
+    PulseEvent(&cond_gil);
+
+    /* hack: the three following lines do a pthread_cond_wait(), and
+       normally specifying a timeout of INFINITE would be fine.  But the
+       first and second operations are not done atomically, so there is a
+       (small) risk that PulseEvent misses the WaitForSingleObject().
+       In this case the process will just sleep a few milliseconds. */
+    LeaveCriticalSection(&mutex_gil);
+    WaitForSingleObject(&cond_gil, 15);
+    EnterCriticalSection(&mutex_gil);
+
+    InterlockedDecrement(&pending_acquires);
+    return 1;
+}
+
+void RPyGilRelease(void)
+{
+    LeaveCriticalSection(&mutex_gil);
+    PulseEvent(&cond_gil);
+}
+
+void RPyGilAcquire(void)
+{
+    InterlockedIncrement(&pending_acquires);
+    EnterCriticalSection(&mutex_gil);
+    InterlockedDecrement(&pending_acquires);
+}
+
+
 #endif /* PYPY_NOT_MAIN_FILE */
diff --git a/pypy/translator/c/src/thread_pthread.h b/pypy/translator/c/src/thread_pthread.h
--- a/pypy/translator/c/src/thread_pthread.h
+++ b/pypy/translator/c/src/thread_pthread.h
@@ -12,6 +12,7 @@
 #include <signal.h>
 #include <stdio.h>
 #include <errno.h>
+#include <assert.h>
 
 /* The following is hopefully equivalent to what CPython does
    (which is trying to compile a snippet of code using it) */
@@ -459,4 +460,113 @@
 #define RPyThreadTLS_Set(key, value)	pthread_setspecific(key, value)
 
 
+/************************************************************/
+/* GIL code                                                 */
+/************************************************************/
+
+#ifdef __llvm__
+#  define HAS_ATOMIC_ADD
+#endif
+
+#ifdef __GNUC__
+#  if __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 1)
+#    define HAS_ATOMIC_ADD
+#  endif
+#endif
+
+#ifdef HAS_ATOMIC_ADD
+#  define atomic_add __sync_fetch_and_add
+#else
+#  if defined(__amd64__)
+#    define atomic_add(ptr, value)  asm volatile ("lock addq %0, %1"        \
+                                 : : "ri"(value), "m"(*(ptr)) : "memory")
+#  elif defined(__i386__)
+#    define atomic_add(ptr, value)  asm volatile ("lock addl %0, %1"        \
+                                 : : "ri"(value), "m"(*(ptr)) : "memory")
+#  else
+#    error "Please use gcc >= 4.1 or write a custom 'asm' for your CPU."
+#  endif
+#endif
+
+#define ASSERT_STATUS(call)                             \
+    if (call != 0) {                                    \
+        fprintf(stderr, "Fatal error: " #call "\n");    \
+        abort();                                        \
+    }
+
+static void _debug_print(const char *msg)
+{
+#if 0
+    int col = (int)pthread_self();
+    col = 31 + ((col / 8) % 8);
+    fprintf(stderr, "\033[%dm%s\033[0m", col, msg);
+#endif
+}
+
+static volatile long pending_acquires = -1;
+static pthread_mutex_t mutex_gil = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t cond_gil = PTHREAD_COND_INITIALIZER;
+
+static void assert_has_the_gil(void)
+{
+#ifdef RPY_ASSERT
+    assert(pthread_mutex_trylock(&mutex_gil) != 0);
+    assert(pending_acquires >= 0);
+#endif
+}
+
+long RPyGilAllocate(void)
+{
+    _debug_print("RPyGilAllocate\n");
+    pending_acquires = 0;
+    pthread_mutex_trylock(&mutex_gil);
+    assert_has_the_gil();
+    return 1;
+}
+
+long RPyGilYieldThread(void)
+{
+    /* can be called even before RPyGilAllocate(), but in this case,
+       pending_acquires will be -1 */
+#ifdef RPY_ASSERT
+    if (pending_acquires >= 0)
+        assert_has_the_gil();
+#endif
+    if (pending_acquires <= 0)
+        return 0;
+    atomic_add(&pending_acquires, 1L);
+    _debug_print("{");
+    ASSERT_STATUS(pthread_cond_signal(&cond_gil));
+    ASSERT_STATUS(pthread_cond_wait(&cond_gil, &mutex_gil));
+    _debug_print("}");
+    atomic_add(&pending_acquires, -1L);
+    assert_has_the_gil();
+    return 1;
+}
+
+void RPyGilRelease(void)
+{
+    _debug_print("RPyGilRelease\n");
+#ifdef RPY_ASSERT
+    assert(pending_acquires >= 0);
+#endif
+    assert_has_the_gil();
+    ASSERT_STATUS(pthread_mutex_unlock(&mutex_gil));
+    ASSERT_STATUS(pthread_cond_signal(&cond_gil));
+}
+
+void RPyGilAcquire(void)
+{
+    _debug_print("about to RPyGilAcquire...\n");
+#ifdef RPY_ASSERT
+    assert(pending_acquires >= 0);
+#endif
+    atomic_add(&pending_acquires, 1L);
+    ASSERT_STATUS(pthread_mutex_lock(&mutex_gil));
+    atomic_add(&pending_acquires, -1L);
+    assert_has_the_gil();
+    _debug_print("RPyGilAcquire\n");
+}
+
+
 #endif /* PYPY_NOT_MAIN_FILE */


More information about the pypy-commit mailing list