[pypy-svn] pypy default: Port the thread+fork() logic to asmgcc, painfully.
arigo
commits-noreply at bitbucket.org
Wed Feb 2 16:33:48 CET 2011
Author: Armin Rigo <arigo at tunes.org>
Branch:
Changeset: r41571:78912da6a09a
Date: 2011-02-02 15:31 +0100
http://bitbucket.org/pypy/pypy/changeset/78912da6a09a/
Log: Port the thread+fork() logic to asmgcc, painfully.
diff --git a/pypy/rpython/llinterp.py b/pypy/rpython/llinterp.py
--- a/pypy/rpython/llinterp.py
+++ b/pypy/rpython/llinterp.py
@@ -824,6 +824,9 @@
def op_gc_thread_run(self):
self.heap.thread_run()
+ def op_gc_thread_start(self):
+ self.heap.thread_start()
+
def op_gc_thread_die(self):
self.heap.thread_die()
diff --git a/pypy/rpython/memory/gctransform/asmgcroot.py b/pypy/rpython/memory/gctransform/asmgcroot.py
--- a/pypy/rpython/memory/gctransform/asmgcroot.py
+++ b/pypy/rpython/memory/gctransform/asmgcroot.py
@@ -149,9 +149,110 @@
def need_thread_support(self, gctransformer, getfn):
# Threads supported "out of the box" by the rest of the code.
- # In particular, we can ignore the gc_thread_prepare,
- # gc_thread_run and gc_thread_die operations.
- pass
+ # The whole code in this function is only there to support
+ # fork()ing in a multithreaded process :-(
+ # For this, we need to handle gc_thread_start and gc_thread_die
+ # to record the mapping {thread_id: stack_start}, and
+ # gc_thread_before_fork and gc_thread_after_fork to get rid of
+ # all ASM_FRAMEDATA structures that do no belong to the current
+ # thread after a fork().
+ from pypy.module.thread import ll_thread
+ from pypy.rpython.memory.support import AddressDict
+ from pypy.rpython.memory.support import copy_without_null_values
+ from pypy.annotation import model as annmodel
+ gcdata = self.gcdata
+
+ def get_aid():
+ """Return the thread identifier, cast to an (opaque) address."""
+ return llmemory.cast_int_to_adr(ll_thread.get_ident())
+
+ def thread_start():
+ value = llop.stack_current(llmemory.Address)
+ gcdata.aid2stack.setitem(get_aid(), value)
+ thread_start._always_inline_ = True
+
+ def thread_setup():
+ gcdata.aid2stack = AddressDict()
+ gcdata.dead_threads_count = 0
+ # to also register the main thread's stack
+ thread_start()
+ thread_setup._always_inline_ = True
+
+ def thread_die():
+ gcdata.aid2stack.setitem(get_aid(), llmemory.NULL)
+ # from time to time, rehash the dictionary to remove
+ # old NULL entries
+ gcdata.dead_threads_count += 1
+ if (gcdata.dead_threads_count & 511) == 0:
+ gcdata.aid2stack = copy_without_null_values(gcdata.aid2stack)
+
+ def belongs_to_current_thread(framedata):
+ # xxx obscure: the answer is Yes if, as a pointer, framedata
+ # lies between the start of the current stack and the top of it.
+ stack_start = gcdata.aid2stack.get(get_aid(), llmemory.NULL)
+ ll_assert(stack_start != llmemory.NULL,
+ "current thread not found in gcdata.aid2stack!")
+ stack_stop = llop.stack_current(llmemory.Address)
+ return (stack_start <= framedata <= stack_stop or
+ stack_start >= framedata >= stack_stop)
+
+ def thread_before_fork():
+ # before fork(): collect all ASM_FRAMEDATA structures that do
+ # not belong to the current thread, and move them out of the
+ # way, i.e. out of the main circular doubly linked list.
+ detached_pieces = llmemory.NULL
+ anchor = llmemory.cast_ptr_to_adr(gcrootanchor)
+ initialframedata = anchor.address[1]
+ while initialframedata != anchor: # while we have not looped back
+ if not belongs_to_current_thread(initialframedata):
+ # Unlink it
+ prev = initialframedata.address[0]
+ next = initialframedata.address[1]
+ prev.address[1] = next
+ next.address[0] = prev
+ # Link it to the singly linked list 'detached_pieces'
+ initialframedata.address[0] = detached_pieces
+ detached_pieces = initialframedata
+ rffi.stackcounter.stacks_counter -= 1
+ # Then proceed to the next piece of stack
+ initialframedata = initialframedata.address[1]
+ return detached_pieces
+
+ def thread_after_fork(result_of_fork, detached_pieces):
+ if result_of_fork == 0:
+ # We are in the child process. Assumes that only the
+ # current thread survived. All the detached_pieces
+ # are pointers in other stacks, so have likely been
+ # freed already by the multithreaded library.
+ # Nothing more for us to do.
+ pass
+ else:
+ # We are still in the parent process. The fork() may
+ # have succeeded or not, but that's irrelevant here.
+ # We need to reattach the detached_pieces now, to the
+ # circular doubly linked list at 'gcrootanchor'. The
+ # order is not important.
+ anchor = llmemory.cast_ptr_to_adr(gcrootanchor)
+ while detached_pieces != llmemory.NULL:
+ reattach = detached_pieces
+ detached_pieces = detached_pieces.address[0]
+ a_next = anchor.address[1]
+ reattach.address[0] = anchor
+ reattach.address[1] = a_next
+ anchor.address[1] = reattach
+ a_next.address[0] = reattach
+ rffi.stackcounter.stacks_counter += 1
+
+ self.thread_setup = thread_setup
+ self.thread_start_ptr = getfn(thread_start, [], annmodel.s_None,
+ inline=True)
+ self.thread_die_ptr = getfn(thread_die, [], annmodel.s_None)
+ self.thread_before_fork_ptr = getfn(thread_before_fork, [],
+ annmodel.SomeAddress())
+ self.thread_after_fork_ptr = getfn(thread_after_fork,
+ [annmodel.SomeInteger(),
+ annmodel.SomeAddress()],
+ annmodel.s_None)
def walk_stack_roots(self, collect_stack_root):
gcdata = self.gcdata
diff --git a/pypy/module/thread/ll_thread.py b/pypy/module/thread/ll_thread.py
--- a/pypy/module/thread/ll_thread.py
+++ b/pypy/module/thread/ll_thread.py
@@ -152,7 +152,7 @@
# ____________________________________________________________
#
# Thread integration.
-# These are five completely ad-hoc operations at the moment.
+# These are six completely ad-hoc operations at the moment.
def gc_thread_prepare():
"""To call just before thread.start_new_thread(). This
@@ -171,6 +171,12 @@
llop.gc_thread_run(lltype.Void)
gc_thread_run._always_inline_ = True
+def gc_thread_start():
+ """To call at the beginning of a new thread.
+ """
+ if we_are_translated():
+ llop.gc_thread_start(lltype.Void)
+
def gc_thread_die():
"""To call just before the final GIL release done by a dying
thread. After a thread_die(), no more gc operation should
diff --git a/pypy/translator/c/test/test_standalone.py b/pypy/translator/c/test/test_standalone.py
--- a/pypy/translator/c/test/test_standalone.py
+++ b/pypy/translator/c/test/test_standalone.py
@@ -841,6 +841,7 @@
self.tail = tail
def bootstrap():
+ ll_thread.gc_thread_start()
state.xlist.append(Cons(123, Cons(456, None)))
gc.collect()
ll_thread.gc_thread_die()
@@ -941,6 +942,7 @@
return childpid
def bootstrap():
+ ll_thread.gc_thread_start()
childpid = run_in_thread()
gc.collect() # collect both in the child and in the parent
gc.collect()
diff --git a/pypy/rpython/memory/gctransform/framework.py b/pypy/rpython/memory/gctransform/framework.py
--- a/pypy/rpython/memory/gctransform/framework.py
+++ b/pypy/rpython/memory/gctransform/framework.py
@@ -955,6 +955,11 @@
if hasattr(self.root_walker, 'thread_run_ptr'):
hop.genop("direct_call", [self.root_walker.thread_run_ptr])
+ def gct_gc_thread_start(self, hop):
+ assert self.translator.config.translation.thread
+ if hasattr(self.root_walker, 'thread_start_ptr'):
+ hop.genop("direct_call", [self.root_walker.thread_start_ptr])
+
def gct_gc_thread_die(self, hop):
assert self.translator.config.translation.thread
if hasattr(self.root_walker, 'thread_die_ptr'):
@@ -1258,6 +1263,7 @@
class BaseRootWalker(object):
need_root_stack = False
+ thread_setup = None
def __init__(self, gctransformer):
self.gcdata = gctransformer.gcdata
@@ -1267,7 +1273,8 @@
return True
def setup_root_walker(self):
- pass
+ if self.thread_setup is not None:
+ self.thread_setup()
def walk_roots(self, collect_stack_root,
collect_static_in_prebuilt_nongc,
@@ -1300,7 +1307,6 @@
class ShadowStackRootWalker(BaseRootWalker):
need_root_stack = True
- thread_setup = None
collect_stacks_from_other_threads = None
def __init__(self, gctransformer):
@@ -1340,8 +1346,7 @@
ll_assert(bool(stackbase), "could not allocate root stack")
self.gcdata.root_stack_top = stackbase
self.gcdata.root_stack_base = stackbase
- if self.thread_setup is not None:
- self.thread_setup()
+ BaseRootWalker.setup_root_walker(self)
def walk_stack_roots(self, collect_stack_root):
gcdata = self.gcdata
@@ -1500,7 +1505,9 @@
self.thread_prepare_ptr = getfn(thread_prepare, [], annmodel.s_None)
self.thread_run_ptr = getfn(thread_run, [], annmodel.s_None,
inline=True)
+ # no thread_start_ptr here
self.thread_die_ptr = getfn(thread_die, [], annmodel.s_None)
+ # no thread_before_fork_ptr here
self.thread_after_fork_ptr = getfn(thread_after_fork,
[annmodel.SomeInteger(),
annmodel.SomeAddress()],
diff --git a/pypy/translator/c/src/mem.h b/pypy/translator/c/src/mem.h
--- a/pypy/translator/c/src/mem.h
+++ b/pypy/translator/c/src/mem.h
@@ -84,6 +84,9 @@
#endif
+/* used by pypy.rlib.rstack, but also by asmgcc */
+#define OP_STACK_CURRENT(r) r = (long)&r
+
#define RAW_MALLOC_ZERO_FILLED 0
diff --git a/pypy/translator/c/src/stack.h b/pypy/translator/c/src/stack.h
--- a/pypy/translator/c/src/stack.h
+++ b/pypy/translator/c/src/stack.h
@@ -17,7 +17,6 @@
char LL_stack_too_big_slowpath(long); /* returns 0 (ok) or 1 (too big) */
/* some macros referenced from pypy.rlib.rstack */
-#define OP_STACK_CURRENT(r) r = (long)&r
#define LL_stack_get_start() ((long)_LLstacktoobig_stack_start)
#define LL_stack_get_length() MAX_STACK_SIZE
#define LL_stack_get_start_adr() ((long)&_LLstacktoobig_stack_start) /* JIT */
diff --git a/pypy/rpython/lltypesystem/lloperation.py b/pypy/rpython/lltypesystem/lloperation.py
--- a/pypy/rpython/lltypesystem/lloperation.py
+++ b/pypy/rpython/lltypesystem/lloperation.py
@@ -465,6 +465,7 @@
# ^^^ but canunwindgc=False, as it is
# allocating non-GC structures only
'gc_thread_run' : LLOp(),
+ 'gc_thread_start' : LLOp(),
'gc_thread_die' : LLOp(),
'gc_thread_before_fork':LLOp(), # returns an opaque address
'gc_thread_after_fork': LLOp(), # arguments: (result_of_fork, opaqueaddr)
diff --git a/pypy/rpython/lltypesystem/llheap.py b/pypy/rpython/lltypesystem/llheap.py
--- a/pypy/rpython/lltypesystem/llheap.py
+++ b/pypy/rpython/lltypesystem/llheap.py
@@ -31,5 +31,8 @@
def thread_run():
pass
+def thread_start():
+ pass
+
def thread_die():
pass
diff --git a/pypy/module/thread/os_thread.py b/pypy/module/thread/os_thread.py
--- a/pypy/module/thread/os_thread.py
+++ b/pypy/module/thread/os_thread.py
@@ -83,6 +83,7 @@
# Note that when this runs, we already hold the GIL. This is ensured
# by rffi's callback mecanism: we are a callback for the
# c_thread_start() external function.
+ thread.gc_thread_start()
space = bootstrapper.space
w_callable = bootstrapper.w_callable
args = bootstrapper.args
More information about the Pypy-commit
mailing list