[pypy-svn] pypy default: Port the thread+fork() logic to asmgcc, painfully.

arigo commits-noreply at bitbucket.org
Wed Feb 2 16:33:48 CET 2011


Author: Armin Rigo <arigo at tunes.org>
Branch: 
Changeset: r41571:78912da6a09a
Date: 2011-02-02 15:31 +0100
http://bitbucket.org/pypy/pypy/changeset/78912da6a09a/

Log:	Port the thread+fork() logic to asmgcc, painfully.

diff --git a/pypy/rpython/llinterp.py b/pypy/rpython/llinterp.py
--- a/pypy/rpython/llinterp.py
+++ b/pypy/rpython/llinterp.py
@@ -824,6 +824,9 @@
     def op_gc_thread_run(self):
         self.heap.thread_run()
 
+    def op_gc_thread_start(self):
+        self.heap.thread_start()
+
     def op_gc_thread_die(self):
         self.heap.thread_die()
 

diff --git a/pypy/rpython/memory/gctransform/asmgcroot.py b/pypy/rpython/memory/gctransform/asmgcroot.py
--- a/pypy/rpython/memory/gctransform/asmgcroot.py
+++ b/pypy/rpython/memory/gctransform/asmgcroot.py
@@ -149,9 +149,110 @@
 
     def need_thread_support(self, gctransformer, getfn):
         # Threads supported "out of the box" by the rest of the code.
-        # In particular, we can ignore the gc_thread_prepare,
-        # gc_thread_run and gc_thread_die operations.
-        pass
+        # The whole code in this function is only there to support
+        # fork()ing in a multithreaded process :-(
+        # For this, we need to handle gc_thread_start and gc_thread_die
+        # to record the mapping {thread_id: stack_start}, and
+        # gc_thread_before_fork and gc_thread_after_fork to get rid of
+        # all ASM_FRAMEDATA structures that do no belong to the current
+        # thread after a fork().
+        from pypy.module.thread import ll_thread
+        from pypy.rpython.memory.support import AddressDict
+        from pypy.rpython.memory.support import copy_without_null_values
+        from pypy.annotation import model as annmodel
+        gcdata = self.gcdata
+
+        def get_aid():
+            """Return the thread identifier, cast to an (opaque) address."""
+            return llmemory.cast_int_to_adr(ll_thread.get_ident())
+
+        def thread_start():
+            value = llop.stack_current(llmemory.Address)
+            gcdata.aid2stack.setitem(get_aid(), value)
+        thread_start._always_inline_ = True
+
+        def thread_setup():
+            gcdata.aid2stack = AddressDict()
+            gcdata.dead_threads_count = 0
+            # to also register the main thread's stack
+            thread_start()
+        thread_setup._always_inline_ = True
+
+        def thread_die():
+            gcdata.aid2stack.setitem(get_aid(), llmemory.NULL)
+            # from time to time, rehash the dictionary to remove
+            # old NULL entries
+            gcdata.dead_threads_count += 1
+            if (gcdata.dead_threads_count & 511) == 0:
+                gcdata.aid2stack = copy_without_null_values(gcdata.aid2stack)
+
+        def belongs_to_current_thread(framedata):
+            # xxx obscure: the answer is Yes if, as a pointer, framedata
+            # lies between the start of the current stack and the top of it.
+            stack_start = gcdata.aid2stack.get(get_aid(), llmemory.NULL)
+            ll_assert(stack_start != llmemory.NULL,
+                      "current thread not found in gcdata.aid2stack!")
+            stack_stop  = llop.stack_current(llmemory.Address)
+            return (stack_start <= framedata <= stack_stop or
+                    stack_start >= framedata >= stack_stop)
+
+        def thread_before_fork():
+            # before fork(): collect all ASM_FRAMEDATA structures that do
+            # not belong to the current thread, and move them out of the
+            # way, i.e. out of the main circular doubly linked list.
+            detached_pieces = llmemory.NULL
+            anchor = llmemory.cast_ptr_to_adr(gcrootanchor)
+            initialframedata = anchor.address[1]
+            while initialframedata != anchor:   # while we have not looped back
+                if not belongs_to_current_thread(initialframedata):
+                    # Unlink it
+                    prev = initialframedata.address[0]
+                    next = initialframedata.address[1]
+                    prev.address[1] = next
+                    next.address[0] = prev
+                    # Link it to the singly linked list 'detached_pieces'
+                    initialframedata.address[0] = detached_pieces
+                    detached_pieces = initialframedata
+                    rffi.stackcounter.stacks_counter -= 1
+                # Then proceed to the next piece of stack
+                initialframedata = initialframedata.address[1]
+            return detached_pieces
+
+        def thread_after_fork(result_of_fork, detached_pieces):
+            if result_of_fork == 0:
+                # We are in the child process.  Assumes that only the
+                # current thread survived.  All the detached_pieces
+                # are pointers in other stacks, so have likely been
+                # freed already by the multithreaded library.
+                # Nothing more for us to do.
+                pass
+            else:
+                # We are still in the parent process.  The fork() may
+                # have succeeded or not, but that's irrelevant here.
+                # We need to reattach the detached_pieces now, to the
+                # circular doubly linked list at 'gcrootanchor'.  The
+                # order is not important.
+                anchor = llmemory.cast_ptr_to_adr(gcrootanchor)
+                while detached_pieces != llmemory.NULL:
+                    reattach = detached_pieces
+                    detached_pieces = detached_pieces.address[0]
+                    a_next = anchor.address[1]
+                    reattach.address[0] = anchor
+                    reattach.address[1] = a_next
+                    anchor.address[1] = reattach
+                    a_next.address[0] = reattach
+                    rffi.stackcounter.stacks_counter += 1
+
+        self.thread_setup = thread_setup
+        self.thread_start_ptr = getfn(thread_start, [], annmodel.s_None,
+                                      inline=True)
+        self.thread_die_ptr = getfn(thread_die, [], annmodel.s_None)
+        self.thread_before_fork_ptr = getfn(thread_before_fork, [],
+                                            annmodel.SomeAddress())
+        self.thread_after_fork_ptr = getfn(thread_after_fork,
+                                           [annmodel.SomeInteger(),
+                                            annmodel.SomeAddress()],
+                                           annmodel.s_None)
 
     def walk_stack_roots(self, collect_stack_root):
         gcdata = self.gcdata

diff --git a/pypy/module/thread/ll_thread.py b/pypy/module/thread/ll_thread.py
--- a/pypy/module/thread/ll_thread.py
+++ b/pypy/module/thread/ll_thread.py
@@ -152,7 +152,7 @@
 # ____________________________________________________________
 #
 # Thread integration.
-# These are five completely ad-hoc operations at the moment.
+# These are six completely ad-hoc operations at the moment.
 
 def gc_thread_prepare():
     """To call just before thread.start_new_thread().  This
@@ -171,6 +171,12 @@
         llop.gc_thread_run(lltype.Void)
 gc_thread_run._always_inline_ = True
 
+def gc_thread_start():
+    """To call at the beginning of a new thread.
+    """
+    if we_are_translated():
+        llop.gc_thread_start(lltype.Void)
+
 def gc_thread_die():
     """To call just before the final GIL release done by a dying
     thread.  After a thread_die(), no more gc operation should

diff --git a/pypy/translator/c/test/test_standalone.py b/pypy/translator/c/test/test_standalone.py
--- a/pypy/translator/c/test/test_standalone.py
+++ b/pypy/translator/c/test/test_standalone.py
@@ -841,6 +841,7 @@
                 self.tail = tail
 
         def bootstrap():
+            ll_thread.gc_thread_start()
             state.xlist.append(Cons(123, Cons(456, None)))
             gc.collect()
             ll_thread.gc_thread_die()
@@ -941,6 +942,7 @@
             return childpid
 
         def bootstrap():
+            ll_thread.gc_thread_start()
             childpid = run_in_thread()
             gc.collect()        # collect both in the child and in the parent
             gc.collect()

diff --git a/pypy/rpython/memory/gctransform/framework.py b/pypy/rpython/memory/gctransform/framework.py
--- a/pypy/rpython/memory/gctransform/framework.py
+++ b/pypy/rpython/memory/gctransform/framework.py
@@ -955,6 +955,11 @@
         if hasattr(self.root_walker, 'thread_run_ptr'):
             hop.genop("direct_call", [self.root_walker.thread_run_ptr])
 
+    def gct_gc_thread_start(self, hop):
+        assert self.translator.config.translation.thread
+        if hasattr(self.root_walker, 'thread_start_ptr'):
+            hop.genop("direct_call", [self.root_walker.thread_start_ptr])
+
     def gct_gc_thread_die(self, hop):
         assert self.translator.config.translation.thread
         if hasattr(self.root_walker, 'thread_die_ptr'):
@@ -1258,6 +1263,7 @@
 
 class BaseRootWalker(object):
     need_root_stack = False
+    thread_setup = None
 
     def __init__(self, gctransformer):
         self.gcdata = gctransformer.gcdata
@@ -1267,7 +1273,8 @@
         return True
 
     def setup_root_walker(self):
-        pass
+        if self.thread_setup is not None:
+            self.thread_setup()
 
     def walk_roots(self, collect_stack_root,
                    collect_static_in_prebuilt_nongc,
@@ -1300,7 +1307,6 @@
 
 class ShadowStackRootWalker(BaseRootWalker):
     need_root_stack = True
-    thread_setup = None
     collect_stacks_from_other_threads = None
 
     def __init__(self, gctransformer):
@@ -1340,8 +1346,7 @@
         ll_assert(bool(stackbase), "could not allocate root stack")
         self.gcdata.root_stack_top  = stackbase
         self.gcdata.root_stack_base = stackbase
-        if self.thread_setup is not None:
-            self.thread_setup()
+        BaseRootWalker.setup_root_walker(self)
 
     def walk_stack_roots(self, collect_stack_root):
         gcdata = self.gcdata
@@ -1500,7 +1505,9 @@
         self.thread_prepare_ptr = getfn(thread_prepare, [], annmodel.s_None)
         self.thread_run_ptr = getfn(thread_run, [], annmodel.s_None,
                                     inline=True)
+        # no thread_start_ptr here
         self.thread_die_ptr = getfn(thread_die, [], annmodel.s_None)
+        # no thread_before_fork_ptr here
         self.thread_after_fork_ptr = getfn(thread_after_fork,
                                            [annmodel.SomeInteger(),
                                             annmodel.SomeAddress()],

diff --git a/pypy/translator/c/src/mem.h b/pypy/translator/c/src/mem.h
--- a/pypy/translator/c/src/mem.h
+++ b/pypy/translator/c/src/mem.h
@@ -84,6 +84,9 @@
 #endif
 
 
+/* used by pypy.rlib.rstack, but also by asmgcc */
+#define OP_STACK_CURRENT(r)  r = (long)&r
+
 
 #define RAW_MALLOC_ZERO_FILLED 0
 

diff --git a/pypy/translator/c/src/stack.h b/pypy/translator/c/src/stack.h
--- a/pypy/translator/c/src/stack.h
+++ b/pypy/translator/c/src/stack.h
@@ -17,7 +17,6 @@
 char LL_stack_too_big_slowpath(long);    /* returns 0 (ok) or 1 (too big) */
 
 /* some macros referenced from pypy.rlib.rstack */
-#define OP_STACK_CURRENT(r)  r = (long)&r
 #define LL_stack_get_start() ((long)_LLstacktoobig_stack_start)
 #define LL_stack_get_length() MAX_STACK_SIZE
 #define LL_stack_get_start_adr() ((long)&_LLstacktoobig_stack_start)  /* JIT */

diff --git a/pypy/rpython/lltypesystem/lloperation.py b/pypy/rpython/lltypesystem/lloperation.py
--- a/pypy/rpython/lltypesystem/lloperation.py
+++ b/pypy/rpython/lltypesystem/lloperation.py
@@ -465,6 +465,7 @@
                                  # ^^^ but canunwindgc=False, as it is
                                  # allocating non-GC structures only
     'gc_thread_run'       : LLOp(),
+    'gc_thread_start'     : LLOp(),
     'gc_thread_die'       : LLOp(),
     'gc_thread_before_fork':LLOp(),   # returns an opaque address
     'gc_thread_after_fork': LLOp(),   # arguments: (result_of_fork, opaqueaddr)

diff --git a/pypy/rpython/lltypesystem/llheap.py b/pypy/rpython/lltypesystem/llheap.py
--- a/pypy/rpython/lltypesystem/llheap.py
+++ b/pypy/rpython/lltypesystem/llheap.py
@@ -31,5 +31,8 @@
 def thread_run():
     pass
 
+def thread_start():
+    pass
+
 def thread_die():
     pass

diff --git a/pypy/module/thread/os_thread.py b/pypy/module/thread/os_thread.py
--- a/pypy/module/thread/os_thread.py
+++ b/pypy/module/thread/os_thread.py
@@ -83,6 +83,7 @@
         # Note that when this runs, we already hold the GIL.  This is ensured
         # by rffi's callback mecanism: we are a callback for the
         # c_thread_start() external function.
+        thread.gc_thread_start()
         space = bootstrapper.space
         w_callable = bootstrapper.w_callable
         args = bootstrapper.args


More information about the Pypy-commit mailing list