[pypy-svn] r33462 - pypy/dist/pypy/objspace/cclp

auc at codespeak.net auc at codespeak.net
Thu Oct 19 16:50:32 CEST 2006


Author: auc
Date: Thu Oct 19 16:50:27 2006
New Revision: 33462

Modified:
   pypy/dist/pypy/objspace/cclp/misc.py
   pypy/dist/pypy/objspace/cclp/scheduler.py
   pypy/dist/pypy/objspace/cclp/space.py
   pypy/dist/pypy/objspace/cclp/thread.py
   pypy/dist/pypy/objspace/cclp/types.py
   pypy/dist/pypy/objspace/cclp/variable.py
Log:
now space cloning works (but not toroughly tested) -- got rid of clonable coroutines


Modified: pypy/dist/pypy/objspace/cclp/misc.py
==============================================================================
--- pypy/dist/pypy/objspace/cclp/misc.py	(original)
+++ pypy/dist/pypy/objspace/cclp/misc.py	Thu Oct 19 16:50:27 2006
@@ -3,6 +3,7 @@
 
 # commonly imported there, used from types, variable, thread
 from pypy.module._stackless.clonable import ClonableCoroutine
+from pypy.module._stackless.coroutine import AppCoroutine
 
 import os
 
@@ -25,8 +26,8 @@
         os.write(1, ' ')
 
 def get_current_cspace(space):
-    curr = ClonableCoroutine.w_getcurrent(space)
-    assert isinstance(curr, ClonableCoroutine)
+    curr = AppCoroutine.w_getcurrent(space)
+    assert isinstance(curr, AppCoroutine)
     if curr._cspace is None:
         if not we_are_translated():
             import pdb

Modified: pypy/dist/pypy/objspace/cclp/scheduler.py
==============================================================================
--- pypy/dist/pypy/objspace/cclp/scheduler.py	(original)
+++ pypy/dist/pypy/objspace/cclp/scheduler.py	Thu Oct 19 16:50:27 2006
@@ -4,7 +4,7 @@
 from pypy.objspace.std.listobject import W_ListObject
 
 from pypy.objspace.cclp.types import W_Var, W_FailedValue, aliases
-from pypy.objspace.cclp.misc import w, v, ClonableCoroutine, get_current_cspace
+from pypy.objspace.cclp.misc import w, v, AppCoroutine, get_current_cspace
 from pypy.objspace.cclp.global_state import sched
 
 #-- Singleton scheduler ------------------------------------------------
@@ -22,18 +22,18 @@
         self._head._next = self._head._prev = self._head
         # asking for stability
         self._asking = {} # cspace -> thread set
-        self._asking[top_level_space] = {}
+        self._asking[top_level_space] = {} # XXX
         # variables suspension lists
         self._blocked = {}
         self._blocked_on = {} # var -> threads
         self._blocked_byneed = {} # var -> threads
         
     def _chain_insert(self, group):
-        assert group._next is group
-        assert group._prev is group
-        assert isinstance(group, W_ThreadGroupScheduler)
-        assert isinstance(group._next, W_ThreadGroupScheduler)
-        assert isinstance(group._prev, W_ThreadGroupScheduler)
+        assert group._next is group, "group._next not correctly linked"
+        assert group._prev is group, "group._prev not correctly linked"
+        assert isinstance(group, W_ThreadGroupScheduler), "type error"
+        assert isinstance(group._next, W_ThreadGroupScheduler), "type error"
+        assert isinstance(group._prev, W_ThreadGroupScheduler), "type error"
         r = self._head
         l = r._prev
         l._next = group
@@ -43,7 +43,7 @@
 
     def schedule(self):
         to_be_run = self._select_next()
-        assert isinstance(to_be_run, W_ThreadGroupScheduler)
+        assert isinstance(to_be_run, W_ThreadGroupScheduler), "type error"
         #w(".. SWITCHING (spaces)", str(id(get_current_cspace(self.space))), "=>", str(id(to_be_run)))
         self._switch_count += 1
         to_be_run.schedule() 
@@ -56,7 +56,7 @@
             if to_be_run.is_runnable():
                 break
             to_be_run = to_be_run._next
-            assert isinstance(to_be_run, W_ThreadGroupScheduler)
+            assert isinstance(to_be_run, W_ThreadGroupScheduler), "type error"
             if to_be_run == sentinel:
                 reset_scheduler(self.space)
                 w(".. SCHEDULER reinitialized")
@@ -68,13 +68,13 @@
 
     def add_new_group(self, group):
         "insert 'group' at end of running queue"
-        assert isinstance(group, W_ThreadGroupScheduler)
+        assert isinstance(group, W_ThreadGroupScheduler), "type error"
         w(".. ADDING group", str(id(group)))
         self._asking[group] = {}
         self._chain_insert(group)
 
     def remove_group(self, group):
-        assert isinstance(group, W_ThreadGroupScheduler)
+        assert isinstance(group, W_ThreadGroupScheduler), "type error"
         w(".. REMOVING group", str(id(group)))
         l = group._prev
         r = group._next
@@ -96,8 +96,8 @@
 
     def add_to_blocked_on(self, w_var, thread):
         w(".. we BLOCK thread", str(id(thread)), "on var", str(w_var))
-        assert isinstance(w_var, W_Var)
-        assert isinstance(thread, ClonableCoroutine)
+        assert isinstance(w_var, W_Var), "type error"
+        assert isinstance(thread, AppCoroutine), "type error"
         assert thread not in self._blocked
         if w_var in self._blocked_on:
             blocked = self._blocked_on[w_var]
@@ -112,7 +112,7 @@
             
     def unblock_on(self, w_var):
         v(".. we UNBLOCK threads dependants of var", str(w_var))
-        assert isinstance(w_var, W_Var)
+        assert isinstance(w_var, W_Var), "type error"
         blocked = []
         if w_var in self._blocked_on:
             blocked = self._blocked_on[w_var]
@@ -125,8 +125,8 @@
     #XXX sync the un/block byneed stuff with above, later
     def add_to_blocked_byneed(self, w_var, thread):
         w(".. we BLOCK BYNEED thread", str(id(thread)), "on var", str(w_var))
-        assert isinstance(w_var, W_Var)
-        assert isinstance(thread, ClonableCoroutine)
+        assert isinstance(w_var, W_Var), "type error"
+        assert isinstance(thread, AppCoroutine), "type error"
         if w_var in self._blocked_byneed:
             blocked = self._blocked_byneed[w_var]
         else:
@@ -138,7 +138,7 @@
 
     def unblock_byneed_on(self, w_var):
         v(".. we UNBLOCK BYNEED dependants of var", str(w_var))
-        assert isinstance(w_var, W_Var)
+        assert isinstance(w_var, W_Var), "type error"
         blocked = []
         for w_alias in aliases(self.space, w_var):
             if w_alias in self._blocked_byneed:
@@ -190,10 +190,10 @@
         si = self.space.setitem
         w_all = s.newdict()
         si(w_all, s.newint(id(self._head)), self._head.group_info())
-        assert isinstance(self._head, W_ThreadGroupScheduler)
+        assert isinstance(self._head, W_ThreadGroupScheduler), "type error"
         curr = self._head._next
         while curr != self._head:
-            assert isinstance(curr, W_ThreadGroupScheduler)
+            assert isinstance(curr, W_ThreadGroupScheduler), "type error"
             si(w_all, s.newint(id(curr)), curr.group_info())
             curr = curr._next
         si(w_all, s.wrap('blocked'), self.w_blocked())
@@ -240,18 +240,18 @@
         self.blocked_count = 0
 
     def _init_head(self, thread):
-        assert isinstance(thread, ClonableCoroutine)
+        assert isinstance(thread, AppCoroutine), "type error"
         self._head = thread
         thread._next = thread._prev = thread
         assert self._head._next == self._head
         w("HEAD (main) THREAD = ", str(id(self._head)))
             
     def _chain_insert(self, thread):
-        assert thread._next is thread
-        assert thread._prev is thread
-        assert isinstance(thread, ClonableCoroutine)
-        assert isinstance(thread._next, ClonableCoroutine)
-        assert isinstance(thread._prev, ClonableCoroutine)
+        assert thread._next is thread, "thread._next not correctly linked"
+        assert thread._prev is thread, "thread._prev not correctly linked"
+        assert isinstance(thread, AppCoroutine), "type error"
+        assert isinstance(thread._next, AppCoroutine), "type error"
+        assert isinstance(thread._prev, AppCoroutine), "type error"
         r = self._head
         l = r._prev
         l._next = thread
@@ -281,8 +281,8 @@
           str(id(get_current_cspace(self.space))))
         if self.is_stable():
             return
-        curr = ClonableCoroutine.w_getcurrent(self.space)
-        assert isinstance(curr, ClonableCoroutine)
+        curr = AppCoroutine.w_getcurrent(self.space)
+        assert isinstance(curr, AppCoroutine), "type error"
         asking = sched.uler._asking
         if self in asking:
             asking[self][curr] = True
@@ -297,10 +297,10 @@
             raise OperationError(self.space.w_AllBlockedError,
                                  self.space.wrap("ouch, that's a BUG"))
         to_be_run = self._select_next()
-        if to_be_run == ClonableCoroutine.w_getcurrent(self.space):
+        if to_be_run == AppCoroutine.w_getcurrent(self.space):
             return
-        assert isinstance(to_be_run, ClonableCoroutine)
-        #w(".. SWITCHING (treads)", str(id(ClonableCoroutine.w_getcurrent(self.space))), "=>", str(id(to_be_run)))
+        assert isinstance(to_be_run, AppCoroutine), "type error"
+        #w(".. SWITCHING (treads)", str(id(AppCoroutine.w_getcurrent(self.space))), "=>", str(id(to_be_run)))
         self._switch_count += 1
         to_be_run.w_switch() 
         
@@ -314,7 +314,7 @@
                     th._cspace.blocked_count -= 1
                 sched.uler._asking[self] = {}
                 break
-            assert isinstance(to_be_run, ClonableCoroutine)
+            assert isinstance(to_be_run, AppCoroutine), "type error"
             to_be_run = to_be_run._next
             if to_be_run == sentinel:
                 if not we_are_translated():
@@ -325,13 +325,13 @@
 
     def add_new_thread(self, thread):
         "insert 'thread' at end of running queue"
-        w(".. ADDING thread", str(id(thread)), "to group", str(id(self)), "count ==", str(self.thread_count))
-        assert isinstance(thread, ClonableCoroutine)
+        w(".. ADDING thread", str(id(thread)), "to group", str(id(self)))
+        assert isinstance(thread, AppCoroutine), "type error"
         self._chain_insert(thread)
         self.thread_count += 1
 
     def remove_thread(self, thread):
-        assert isinstance(thread, ClonableCoroutine)
+        assert isinstance(thread, AppCoroutine)
         w(".. REMOVING thread", str(id(thread)))
         assert thread not in sched.uler._blocked
         try:
@@ -355,14 +355,14 @@
     # Logic Variables tracing, "accelerates" exception propagation
     # amongst threads
     def trace_vars(self, thread, lvars):
-        assert isinstance(thread, ClonableCoroutine)
-        assert isinstance(lvars, list)
+        assert isinstance(thread, AppCoroutine), "type error"
+        assert isinstance(lvars, list), "type error"
         #w(".. TRACING logic vars.", str(lvars), "for", str(id(thread)))
         #assert not self._traced.has_key(thread) doesn't translate 
         self._traced[thread] = lvars
 
     def dirty_traced_vars(self, thread, failed_value):
-        assert isinstance(thread, ClonableCoroutine)
+        assert isinstance(thread, AppCoroutine)
         assert isinstance(failed_value, W_FailedValue)
         #w(".. DIRTYING traced vars")
         for w_var in self._traced[thread]:
@@ -372,10 +372,10 @@
     def w_threads(self):
         s = self.space
         thl = [s.newint(id(self._head))]
-        assert isinstance(self._head, ClonableCoroutine)
+        assert isinstance(self._head, AppCoroutine)
         curr = self._head._next
         while curr != self._head:
-            assert isinstance(curr, ClonableCoroutine)
+            assert isinstance(curr, AppCoroutine)
             thl.append(s.newint(id(curr)))
             curr = curr._next
         w_t = W_ListObject(thl)

Modified: pypy/dist/pypy/objspace/cclp/space.py
==============================================================================
--- pypy/dist/pypy/objspace/cclp/space.py	(original)
+++ pypy/dist/pypy/objspace/cclp/space.py	Thu Oct 19 16:50:27 2006
@@ -10,7 +10,7 @@
 
 from pypy.module._stackless.interp_coroutine import AbstractThunk
 
-from pypy.objspace.cclp.misc import ClonableCoroutine, get_current_cspace, w
+from pypy.objspace.cclp.misc import AppCoroutine, get_current_cspace, w
 from pypy.objspace.cclp.thunk import CSpaceThunk, PropagatorThunk
 from pypy.objspace.cclp.global_state import sched
 from pypy.objspace.cclp.variable import newvar
@@ -20,47 +20,19 @@
 from pypy.objspace.cclp.constraint.distributor import distribute
 from pypy.objspace.cclp.scheduler import W_ThreadGroupScheduler
 
+from pypy.rpython.rgc import gc_swap_pool, gc_clone
 
-class NewSpaceThunk(AbstractThunk):
-    "container thread for one comp. space"
-    def __init__(self, space, w_callable, __args__, thread):
-        self.space = space
-        self.thread = thread
-        self.cspace = None
-        self.callable = w_callable
-        self.args = __args__
-
-    def call(self):
-        try:
-            self.cspace = _newspace(self.space,
-                                    self.callable,
-                                    self.args)
-            self.space.wait(self.cspace._finished)
-        finally:
-            sched.uler.remove_thread(self.thread)
-            sched.uler.schedule()
-
-def _newspace(space, w_callable, __args__):
+def newspace(space, w_callable, __args__):
+    "application level creation of a new computation space"
     args = __args__.normalize()
-    dist_thread = ClonableCoroutine(space)
+    dist_thread = AppCoroutine(space)
+    dist_thread._next = dist_thread._prev = dist_thread
     thunk = CSpaceThunk(space, w_callable, args, dist_thread)
     dist_thread.bind(thunk)
     if not we_are_translated():
         w("NEWSPACE, (distributor) thread", str(id(dist_thread)), "for", str(w_callable.name))
     w_space = W_CSpace(space, dist_thread)
     return w_space
-
-def newspace(space, w_callable, __args__):
-    "application level creation of a new computation space"
-    outer_thread = ClonableCoroutine(space)
-    outer_thread._cspace = get_current_cspace(space)
-    thunk = NewSpaceThunk(space, w_callable, __args__, outer_thread)
-    outer_thread.bind(thunk)
-    sched.uler.add_new_thread(outer_thread)
-    sched.uler.schedule() # XXX assumption: thread will be executed before we get back there
-    cspace = thunk.cspace
-    cspace._container = outer_thread
-    return cspace
 app_newspace = gateway.interp2app(newspace, unwrap_spec=[baseobjspace.ObjSpace,
                                                          baseobjspace.W_Root,
                                                          argument.Arguments])
@@ -99,6 +71,7 @@
 
 
 class W_CSpace(W_ThreadGroupScheduler):
+    local_pool = None
 
     def __init__(self, space, dist_thread):
         W_ThreadGroupScheduler.__init__(self, space)
@@ -125,63 +98,20 @@
         self._store[cvar.name] = cvar
 
     def w_clone(self):
-        if not we_are_translated():
-            raise NotImplementedError
-            # build fresh cspace & distributor thread
-            #dist_thread = ClonableCoroutine(self.space)
-            #new_cspace = W_CSpace(self.space, dist_thread)
-            #dist_thread._cspace = new_cspace
-            # new distributor instance
-            #old_dist = self.distributor
-            #new_dist = old_dist.__class__(self.space, old_dist._fanout)
-            #new_dist._cspace = new_cspace
-            #new_cspace.distributor = new_dist
-            # copy the store
-            #for var in self._store.values():
-            #    new_cspace.register_var(var.copy(self.space))
-            # new distributor thunk, binding
-            #f = Function(self.space,
-            #             app_fresh_distributor._code,
-            #             self.space.newdict())
-            #thunk = CSpaceThunk(self.space, f,
-            #                    argument.Arguments(self.space, [new_dist]),
-            #                    dist_thread)
-            #dist_thread.bind(thunk)
-            # relinking to scheduler
-            #sched.uler.add_new_group(new_cspace)
-            #self.add_new_thread(dist_thread)
-            # rebuild propagators
-            #for const in self._constraints:
-            #    ccopy = const.copy(self.space)
-            #    ccopy._cspace = new_cspace
-            #    new_cspace.tell(const)
-            # duh 
-            #new_cspace._last_choice = self._last_choice
-            # copy solution variables
-            #self.space.unify(new_cspace._solution,
-            #                 self.space.newlist([var for var in new_cspace._store.values()]))
-            #new_cspace.wait_stable()
-            #return new_cspace
-        else:
-            # the theory is that
-            # a) we create a (clonable) container thread for any 'newspace'
-            # b) at clone-time, we clone that container, hoping that
-            # indeed everything will come with it
-            w("cloning the container thread")
-            everything = self._container.w_clone()
-            w("getting the fresh cspace from it")
-            new_cspace = everything._cspace
-            w("registering the container clone to top level cspace scheduler")
-            sched.main_thread._cspace.add_new_thread(everything)
-            w("add container clone to blocked on cspace clone variable _finished")
-            # we crash below
-            sched.uler.add_to_blocked_on(new_cspace._finished, everything)
-            # however, we need to keep track of all threads created
-            # from 'within' the space (propagators, or even app-level threads)
-            # -> cspaces as thread groups
-            w("add cloned cspace to new group")
+        if we_are_translated():
+            w("<> cloning the space")
+            if self.local_pool is None:
+                self.local_pool = gc_swap_pool(gc_swap_pool(None))
+            new_cspace, new_cspace.local_pool = gc_clone(self, self.local_pool)
+            w("<> add cloned cspace to new group")
+            assert isinstance(new_cspace, W_CSpace)
+            new_cspace._next = new_cspace._prev = new_cspace
             sched.uler.add_new_group(new_cspace)
+            w("<> returning clone ")
             return new_cspace
+        else:
+            raise NotImplementedError
+
 
     def w_ask(self):
         self.wait_stable()
@@ -217,7 +147,8 @@
 
     def tell(self, w_constraint):
         space = self.space
-        w_coro = ClonableCoroutine(space)
+        w_coro = AppCoroutine(space)
+        w_coro._next = w_coro._prev = w_coro
         w_coro._cspace = self
         thunk = PropagatorThunk(space, w_constraint, w_coro)
         w_coro.bind(thunk)

Modified: pypy/dist/pypy/objspace/cclp/thread.py
==============================================================================
--- pypy/dist/pypy/objspace/cclp/thread.py	(original)
+++ pypy/dist/pypy/objspace/cclp/thread.py	Thu Oct 19 16:50:27 2006
@@ -2,7 +2,7 @@
 from pypy.rpython.objectmodel import we_are_translated
 
 from pypy.objspace.cclp.types import W_Var, W_Future, W_FailedValue
-from pypy.objspace.cclp.misc import w, v, ClonableCoroutine, get_current_cspace
+from pypy.objspace.cclp.misc import w, v, AppCoroutine, get_current_cspace
 from pypy.objspace.cclp.thunk import FutureThunk, ProcedureThunk
 from pypy.objspace.cclp.global_state import sched
 
@@ -13,7 +13,8 @@
     """returns a future result"""
     #XXX we could be much more lazy wrt coro creation
     args = __args__.normalize()
-    coro = ClonableCoroutine(space)
+    coro = AppCoroutine(space)
+    coro._next = coro._prev = coro
     w_Future = W_Future(space)
     thunk = FutureThunk(space, w_callable, args, w_Future, coro)
     coro.bind(thunk)
@@ -31,7 +32,8 @@
 def stacklet(space, w_callable, __args__):
     """returns a coroutine object"""
     args = __args__.normalize()
-    coro = ClonableCoroutine(space)
+    coro = AppCoroutine(space)
+    coro._next = coro._prev = coro
     thunk = ProcedureThunk(space, w_callable, args, coro)
     coro.bind(thunk)
     coro._cspace = get_current_cspace(space)
@@ -46,5 +48,5 @@
 
 
 def this_thread(space):
-    return ClonableCoroutine.w_getcurrent(space)
+    return AppCoroutine.w_getcurrent(space)
 app_this_thread = gateway.interp2app(this_thread)

Modified: pypy/dist/pypy/objspace/cclp/types.py
==============================================================================
--- pypy/dist/pypy/objspace/cclp/types.py	(original)
+++ pypy/dist/pypy/objspace/cclp/types.py	Thu Oct 19 16:50:27 2006
@@ -1,7 +1,7 @@
 from pypy.interpreter import baseobjspace, gateway, typedef
 from pypy.interpreter.error import OperationError
 
-from pypy.objspace.cclp.misc import w, ClonableCoroutine, get_current_cspace
+from pypy.objspace.cclp.misc import w, AppCoroutine, get_current_cspace
 
 W_Root = baseobjspace.W_Root
 
@@ -31,7 +31,7 @@
     "a read-only-by-its-consummer variant of logic. var"
     def __init__(w_self, space):
         W_Var.__init__(w_self, space)
-        w_self._client = ClonableCoroutine.w_getcurrent(space)
+        w_self._client = AppCoroutine.w_getcurrent(space)
         w("FUT", str(w_self))
 
 

Modified: pypy/dist/pypy/objspace/cclp/variable.py
==============================================================================
--- pypy/dist/pypy/objspace/cclp/variable.py	(original)
+++ pypy/dist/pypy/objspace/cclp/variable.py	Thu Oct 19 16:50:27 2006
@@ -5,7 +5,7 @@
 from pypy.objspace.std.dictobject import W_DictObject
 from pypy.objspace.std.stringobject import W_StringObject
 
-from pypy.objspace.cclp.misc import w, v, ClonableCoroutine
+from pypy.objspace.cclp.misc import w, v, AppCoroutine
 from pypy.objspace.cclp.global_state import sched
 from pypy.objspace.cclp.types import deref, W_Var, W_CVar, W_Future, W_FailedValue
 
@@ -27,10 +27,10 @@
     return w_obj
 
 def wait__Var(space, w_var):
-    #w("###:wait", str(id(ClonableCoroutine.w_getcurrent(space))))
+    #w("###:wait", str(id(AppCoroutine.w_getcurrent(space))))
     if space.is_true(space.is_free(w_var)):
         sched.uler.unblock_byneed_on(w_var)
-        sched.uler.add_to_blocked_on(w_var, ClonableCoroutine.w_getcurrent(space))
+        sched.uler.add_to_blocked_on(w_var, AppCoroutine.w_getcurrent(space))
         sched.uler.schedule()
         assert space.is_true(space.is_bound(w_var))
     w_ret = w_var.w_bound_to
@@ -53,11 +53,11 @@
 #-- Wait_needed --------------------------------------------
 
 def wait_needed__Var(space, w_var):
-    #w(":wait_needed", str(id(ClonableCoroutine.w_getcurrent(space))))
+    #w(":wait_needed", str(id(AppCoroutine.w_getcurrent(space))))
     if space.is_true(space.is_free(w_var)):
         if w_var.needed:
             return
-        sched.uler.add_to_blocked_byneed(w_var, ClonableCoroutine.w_getcurrent(space))
+        sched.uler.add_to_blocked_byneed(w_var, AppCoroutine.w_getcurrent(space))
         sched.uler.schedule()
     else:
         raise OperationError(space.w_TypeError,
@@ -194,7 +194,7 @@
 
 def bind__Future_Root(space, w_fut, w_obj):
     #v("future val", str(id(w_fut)))
-    if w_fut._client == ClonableCoroutine.w_getcurrent(space):
+    if w_fut._client == AppCoroutine.w_getcurrent(space):
         raise_future_binding(space)
     return bind__Var_Root(space, w_fut, w_obj) # call-next-method ?
 
@@ -216,7 +216,7 @@
 
 def bind__Future_Var(space, w_fut, w_var):
     #v("future var")
-    if w_fut._client == ClonableCoroutine.w_getcurrent(space):
+    if w_fut._client == AppCoroutine.w_getcurrent(space):
         raise_future_binding(space)
     return bind__Var_Var(space, w_fut, w_var)
 
@@ -224,7 +224,7 @@
 def bind__Var_Future(space, w_var, w_fut): 
     if space.is_true(space.is_bound(w_fut)): #XXX write a test for me !
         return bind__Var_Root(space, w_var, deref(space, w_fut))
-    if w_fut._client == ClonableCoroutine.w_getcurrent(space):
+    if w_fut._client == AppCoroutine.w_getcurrent(space):
         raise_future_binding(space)
     return bind__Var_Var(space, w_var, w_fut) #and for me ...
 



More information about the Pypy-commit mailing list