[pypy-svn] r31112 - in pypy/dist/pypy/objspace: . cclp test

auc at codespeak.net auc at codespeak.net
Mon Aug 7 16:49:14 CEST 2006


Author: auc
Date: Mon Aug  7 16:49:09 2006
New Revision: 31112

Added:
   pypy/dist/pypy/objspace/cclp/global_state.py
Modified:
   pypy/dist/pypy/objspace/cclp/misc.py
   pypy/dist/pypy/objspace/cclp/scheduler.py
   pypy/dist/pypy/objspace/cclp/space.py
   pypy/dist/pypy/objspace/cclp/thread.py
   pypy/dist/pypy/objspace/cclp/thunk.py
   pypy/dist/pypy/objspace/cclp/variable.py
   pypy/dist/pypy/objspace/logic.py
   pypy/dist/pypy/objspace/test/test_logicobjspace.py
Log:
comp. space reintroduced, ask()


Added: pypy/dist/pypy/objspace/cclp/global_state.py
==============================================================================
--- (empty file)
+++ pypy/dist/pypy/objspace/cclp/global_state.py	Mon Aug  7 16:49:09 2006
@@ -0,0 +1,3 @@
+
+scheduler = []
+

Modified: pypy/dist/pypy/objspace/cclp/misc.py
==============================================================================
--- pypy/dist/pypy/objspace/cclp/misc.py	(original)
+++ pypy/dist/pypy/objspace/cclp/misc.py	Mon Aug  7 16:49:09 2006
@@ -5,7 +5,7 @@
 
 import os
 
-NO_DEBUG_INFO = [False]
+NO_DEBUG_INFO = [True]
 def w(*msgs):
     """writeln"""
     if NO_DEBUG_INFO[0]: return

Modified: pypy/dist/pypy/objspace/cclp/scheduler.py
==============================================================================
--- pypy/dist/pypy/objspace/cclp/scheduler.py	(original)
+++ pypy/dist/pypy/objspace/cclp/scheduler.py	Mon Aug  7 16:49:09 2006
@@ -4,26 +4,25 @@
 
 from pypy.objspace.cclp.types import W_Var, W_FailedValue, aliases
 from pypy.objspace.cclp.misc import w, v, ClonableCoroutine
-#from pypy.objspace.cclp.space import CSpace
-
-scheduler = []
+from pypy.objspace.cclp.space import W_CSpace
+from pypy.objspace.cclp.global_state import scheduler
 
 #-- Singleton scheduler ------------------------------------------------
 
+class FunnyBoat: pass
+
 class Scheduler(object):
 
     def __init__(self, space):
         self.space = space
         self._main = ClonableCoroutine.w_getcurrent(space)
-        # link top_level space to main coroutine
-        #self.top_space = CSpace(space, self._main)
-        #self._main.cspace = self.top_space
-        # ...
         self._init_head(self._main)
         self._init_blocked()
         self._switch_count = 0
-        self._traced = {}
-        w ("MAIN THREAD = ", str(id(self._main)))
+        # more accounting
+        self._per_space_live_threads = {} # space -> nb runnable threads
+        self._traced = {} # thread -> vars
+        w("MAIN THREAD = ", str(id(self._main)))
 
     def get_threads(self):
         threads = [self._head]
@@ -35,8 +34,10 @@
 
     def _init_blocked(self):
         self._blocked = {} # thread set
+        # variables suspension lists
         self._blocked_on = {} # var -> threads
         self._blocked_byneed = {} # var -> threads
+        self._asking = {} # thread -> cspace
 
     def _init_head(self, thread):
         assert isinstance(thread, ClonableCoroutine)
@@ -81,13 +82,37 @@
         r = thread._next
         l._next = r
         r._prev = l
+        self._head = r
         if r == thread: #XXX write a test for me !
             if not we_are_translated(): 
                 import traceback
                 traceback.print_exc()
             self.display_head()
-        thread._next = thread._next = None
-        return thread
+        thread._next = thread._prev = FunnyBoat
+        # cspace/threads account mgmt
+        if thread._cspace is not None:
+            count = self._per_space_live_threads[thread._cspace]
+            assert count > 0
+            self._per_space_live_threads[thread._cspace] = count - 1
+            if count == 1:
+                del self._per_space_live_threads[thread._cspace]
+            
+#        return thread
+
+    #-- cspace helper
+
+    def is_stable(self, cspace):
+        if not self._per_space_live_threads.has_key(cspace):
+            return True
+        return self._per_space_live_threads[cspace] == 0
+
+    def wait_stable(self, cspace):
+        if self.is_stable(cspace):
+            return
+        curr = ClonableCoroutine.w_getcurrent(self.space)
+        self._asking[curr] = cspace
+        self._blocked[curr] = True
+        self.schedule()
 
     #-- to be used by logic objspace
 
@@ -117,6 +142,7 @@
                   or to_be_run.is_dead() \
                   or (to_be_run == current):
             to_be_run = to_be_run._next
+            assert isinstance(to_be_run, ClonableCoroutine)
             if to_be_run == sentinel:
                 if not dont_pass:
                     return ClonableCoroutine.w_getcurrent(self.space)
@@ -127,6 +153,11 @@
                 w(".. SCHEDULER reinitialized")
                 raise OperationError(self.space.w_AllBlockedError,
                                      self.space.wrap("can't schedule, possible deadlock in sight"))
+            if to_be_run in self._asking:
+                if self.is_stable(self._asking[to_be_run]):
+                    del self._asking[to_be_run]
+                    del self._blocked[to_be_run]
+        self._head = to_be_run
         return to_be_run
 
     #XXX call me directly for this to work translated
@@ -154,6 +185,11 @@
     def add_new_thread(self, thread):
         "insert 'thread' at end of running queue"
         assert isinstance(thread, ClonableCoroutine)
+        if thread._cspace != None:
+            print "Yeeeep"
+            count = self._per_space_live_threads.get(thread._cspace, 0)
+            self._per_space_live_threads[thread._cspace] = count + 1
+            assert len(self._per_space_live_threads)
         self._chain_insert(thread)
 
     def add_to_blocked_on(self, w_var, thread):
@@ -168,6 +204,9 @@
             self._blocked_on[w_var] = blocked
         blocked.append(thread)
         self._blocked[thread] = True
+        # cspace accounting
+        if thread._cspace is not None:
+            self._per_space_live_threads[thread._cspace] -= 1
 
     def unblock_on(self, w_var):
         v(".. we UNBLOCK threads dependants of var", str(w_var))
@@ -177,7 +216,11 @@
             blocked = self._blocked_on[w_var]
             del self._blocked_on[w_var]
         w(str([id(thr) for thr in blocked]))
-        for thr in blocked: del self._blocked[thr]
+        for thr in blocked:
+            del self._blocked[thr]
+            # cspace accounting
+            if thr._cspace is not None:
+                self._per_space_live_threads[thr._cspace] += 1
 
     def add_to_blocked_byneed(self, w_var, thread):
         w(".. we BLOCK BYNEED thread", str(id(thread)), "on var", str(w_var))
@@ -190,6 +233,9 @@
             self._blocked_byneed[w_var] = blocked
         blocked.append(thread)
         self._blocked[thread] = True
+        # cspace accounting
+        if thread._cspace is not None:
+            self._per_space_live_threads[thread._cspace] += 1
 
     def unblock_byneed_on(self, w_var):
         v(".. we UNBLOCK BYNEED dependants of var", str(w_var))
@@ -201,7 +247,12 @@
                 del self._blocked_byneed[w_alias]
             w_alias.needed = True
         w(str([id(thr) for thr in blocked]))
-        for thr in blocked: del self._blocked[thr]
+        for thr in blocked:
+            del self._blocked[thr]
+            # cspace accounting
+            if thr._cspace is not None:
+                self._per_space_live_threads[thr._cspace] -= 1
+            
 
     # Logic Variables tracing, helps exception propagation
     # amongst threads
@@ -244,6 +295,31 @@
     return w_ret
 app_sched_info = gateway.interp2app(sched_info)
 
+def sched_all(space):
+    s = scheduler[0]
+    si = space.setitem
+    sw = space.wrap
+    w_ret = space.newdict([])
+    if not we_are_translated():
+        si(w_ret, sw('threads'),
+           sw([id(th) for th in s.get_threads()]))
+        si(w_ret, sw('blocked_on'),
+           sw([(id(th),  [id(th) for th in thl])
+               for var, thl in s._blocked_on.items()]))
+        si(w_ret, sw('blocked_byneed'),
+           sw([(id(var), [id(th) for th in thl])
+               for var, thl in s._blocked_byneed.items()]))
+        si(w_ret, sw('traced'),
+           sw([(id(th), [id(var) for var in lvar])
+               for th, lvar in s._traced.items()]))
+        si(w_ret, sw('space_accounting'),
+           sw([(id(spc), count)
+               for spc, count in s._per_space_live_threads.items()]))
+        si(w_ret, sw('asking'),
+           sw([(id(th), id(spc))
+               for th, spc in s._asking.items()]))
+    return w_ret
+app_sched_all = gateway.interp2app(sched_all)        
 
 def schedule(space):
     "useful til we get preemtive scheduling deep into the vm"

Modified: pypy/dist/pypy/objspace/cclp/space.py
==============================================================================
--- pypy/dist/pypy/objspace/cclp/space.py	(original)
+++ pypy/dist/pypy/objspace/cclp/space.py	Mon Aug  7 16:49:09 2006
@@ -1,25 +1,52 @@
-from pypy.interpreter import baseobjspace
+from pypy.rpython.objectmodel import we_are_translated
+from pypy.interpreter import baseobjspace, gateway, argument, typedef
 from pypy.interpreter.error import OperationError
 
-from pypy.objspace.cclp.misc import ClonableCoroutine
+from pypy.objspace.cclp.misc import ClonableCoroutine, w
+from pypy.objspace.cclp.thunk import FutureThunk, ProcedureThunk
+from pypy.objspace.cclp.global_state import scheduler
+
+def newspace(space, w_callable, __args__):
+    try:
+        args = __args__.normalize()
+        # coro init
+        w_coro = ClonableCoroutine(space)
+        thunk = ProcedureThunk(space, w_callable, args, w_coro)
+        w_coro.bind(thunk)
+        if not we_are_translated():
+            w("NEWSPACE, thread", str(id(w_coro)), "for", str(w_callable.name))
+
+            w_space = W_CSpace(space, w_coro, parent=w_coro._cspace)
+            w_coro._cspace = w_space
+
+            scheduler[0].add_new_thread(w_coro)
+            scheduler[0].schedule()
+    except:
+        print "oh, uh"
+
+    return w_space
+app_newspace = gateway.interp2app(newspace, unwrap_spec=[baseobjspace.ObjSpace,
+                                                         baseobjspace.W_Root,
+                                                         argument.Arguments])
+
+class W_CSpace(baseobjspace.Wrappable):
+
+    def __init__(self, space, thread, parent=None):
+        assert isinstance(thread, ClonableCoroutine)
+        assert (parent is None) or isinstance(parent, CSpace)
+        self.space = space # the object space ;-)
+        self.parent = parent
+        self.main_thread = thread
+
+    def w_ask(self):
+        scheduler[0].wait_stable(self)
+        return self.space.newint(0)
+
+W_CSpace.typedef = typedef.TypeDef("W_CSpace",
+    ask = gateway.interp2app(W_CSpace.w_ask))
+
+
 
-## def newspace(w_callable, __args__):
-##     w_coro = stacklet(w_callable, __args__)
-##     w_space = CSpace(coro, parent=coro.space)
-##     w_coro.space = space
-##     return w_space
-
-class CSpace(baseobjspace.Wrappable):
-
-    def __init__(self, space, distributor, parent=None):
-        pass
-##         assert isinstance(distributor, ClonableCoroutine)
-##         assert (parent is None) or isinstance(parent, CSpace)
-##         self.space = space # the object space ;-)
-##         self.parent = parent
-##         self.distributor = distributor
-##         self.threads = {} # the eventual other threads
-        
 
 ##     def is_top_level(self):
 ##         return self.parent is None
@@ -28,9 +55,6 @@
 ##         #XXX return w_getcurrent().cspace
 ##         pass
 
-##     def newspace():
-##         #XXX fork ?
-##         pass
 
 ##     def clone(self):
 ##         if self.is_top_level():

Modified: pypy/dist/pypy/objspace/cclp/thread.py
==============================================================================
--- pypy/dist/pypy/objspace/cclp/thread.py	(original)
+++ pypy/dist/pypy/objspace/cclp/thread.py	Mon Aug  7 16:49:09 2006
@@ -4,7 +4,7 @@
 from pypy.objspace.cclp.types import W_Var, W_Future, W_FailedValue
 from pypy.objspace.cclp.misc import w, v, ClonableCoroutine
 from pypy.objspace.cclp.thunk import FutureThunk, ProcedureThunk
-from pypy.objspace.cclp.scheduler import scheduler
+from pypy.objspace.cclp.global_state import scheduler
 
 
 #-- Future --------------------------------------------------
@@ -21,7 +21,8 @@
     w_Future = W_Future(space)
     thunk = FutureThunk(space, w_callable, args, w_Future, coro)
     coro.bind(thunk)
-    w("FUTURE", str(id(coro)))
+    if not we_are_translated():
+        w("FUTURE", str(id(coro)), "for", str(w_callable.name))
     scheduler[0].add_new_thread(coro)
     return w_Future
 app_future = gateway.interp2app(future, unwrap_spec=[baseobjspace.ObjSpace,
@@ -39,7 +40,9 @@
     #coro.cspace = ClonableCoroutine.w_getcurrent(space).cspace
     thunk = ProcedureThunk(space, w_callable, args, coro)
     coro.bind(thunk)
-    w("STACKLET", str(id(coro)))
+    print we_are_translated()
+    if not we_are_translated():
+        w("STACKLET", str(id(coro)), "for", str(w_callable.name))
     scheduler[0].add_new_thread(coro)
     scheduler[0].schedule()
     return coro

Modified: pypy/dist/pypy/objspace/cclp/thunk.py
==============================================================================
--- pypy/dist/pypy/objspace/cclp/thunk.py	(original)
+++ pypy/dist/pypy/objspace/cclp/thunk.py	Mon Aug  7 16:49:09 2006
@@ -1,6 +1,6 @@
 from pypy.module._stackless.coroutine import _AppThunk
 from pypy.objspace.cclp.misc import w 
-from pypy.objspace.cclp.scheduler import scheduler
+from pypy.objspace.cclp.global_state import scheduler
 from pypy.objspace.cclp.types import W_Var, W_Future, W_FailedValue
 
 #-- Thunk -----------------------------------------

Modified: pypy/dist/pypy/objspace/cclp/variable.py
==============================================================================
--- pypy/dist/pypy/objspace/cclp/variable.py	(original)
+++ pypy/dist/pypy/objspace/cclp/variable.py	Mon Aug  7 16:49:09 2006
@@ -5,7 +5,7 @@
 from pypy.objspace.std.dictobject import W_DictObject
 
 from pypy.objspace.cclp.misc import w, v, ClonableCoroutine
-from pypy.objspace.cclp.scheduler import scheduler
+from pypy.objspace.cclp.global_state import scheduler
 from pypy.objspace.cclp.types import deref, W_Var, W_Future, W_FailedValue
 
 

Modified: pypy/dist/pypy/objspace/logic.py
==============================================================================
--- pypy/dist/pypy/objspace/logic.py	(original)
+++ pypy/dist/pypy/objspace/logic.py	Mon Aug  7 16:49:09 2006
@@ -16,8 +16,12 @@
 
 from pypy.objspace.cclp.thread import app_future, app_stacklet, app_this_thread
 
-from pypy.objspace.cclp.scheduler import Scheduler, scheduler, app_sched_info, \
-     app_schedule, app_reset_scheduler
+from pypy.objspace.cclp.scheduler import Scheduler,  app_sched_info, \
+     app_schedule, app_reset_scheduler, app_sched_all
+
+from pypy.objspace.cclp.global_state import scheduler
+
+from pypy.objspace.cclp.space import app_newspace, W_CSpace
 
 #-- VARIABLE ------------------------------------------------
 
@@ -188,6 +192,7 @@
     # multimethods hack
     space.model.typeorder[W_Var] = [(W_Var, None), (W_Root, None)] # None means no conversion
     space.model.typeorder[W_Future] = [(W_Future, None), (W_Var, None)]
+    space.model.typeorder[W_CSpace] = [(W_CSpace, None), (baseobjspace.Wrappable, None)]
 ##     space.model.typeorder[W_FiniteDomain] = [(W_FiniteDomain, None), (W_Root, None)] 
 
 
@@ -256,12 +261,16 @@
                   space.wrap(app_wait_needed))
     space.setitem(space.builtin.w_dict, space.wrap('sched_info'),
                   space.wrap(app_sched_info))
+    space.setitem(space.builtin.w_dict, space.wrap('sched_all'),
+                  space.wrap(app_sched_all))
     space.setitem(space.builtin.w_dict, space.wrap('schedule'),
                   space.wrap(app_schedule))
     space.setitem(space.builtin.w_dict, space.wrap('this_thread'),
                   space.wrap(app_this_thread))
     space.setitem(space.builtin.w_dict, space.wrap('reset_scheduler'),
                   space.wrap(app_reset_scheduler))
+    space.setitem(space.builtin.w_dict, space.wrap('newspace'),
+                  space.wrap(app_newspace))
 
     #-- misc -----
     space.setitem(space.builtin.w_dict, space.wrap('interp_id'),

Modified: pypy/dist/pypy/objspace/test/test_logicobjspace.py
==============================================================================
--- pypy/dist/pypy/objspace/test/test_logicobjspace.py	(original)
+++ pypy/dist/pypy/objspace/test/test_logicobjspace.py	Mon Aug  7 16:49:09 2006
@@ -600,3 +600,31 @@
         schedule()
 
         reset_scheduler() # free all the hanging threads
+
+    def test_newspace_ask_noop(self):
+
+        def bar(X): return X + 42
+
+        X = newvar()
+        s = newspace(bar, X)
+        s.ask()
+
+    def test_newspace_ask_wait(self):
+
+        def quux(X):
+            while 1:
+                if is_bound(X):
+                    break
+                schedule()
+
+        def asker(cspace):
+            cspace.ask()
+
+        X = newvar()
+        s = newspace(quux, X)
+        stacklet(asker, s)
+        unify(X, 42)
+        assert len(sched_all()['asking']) == 1
+        schedule() # allow quux exit
+        schedule() # allow asker exit
+        assert len(sched_all()['asking']) == 0



More information about the Pypy-commit mailing list