[pypy-svn] r31154 - in pypy/dist/pypy: lib objspace objspace/cclp objspace/test

auc at codespeak.net auc at codespeak.net
Tue Aug 8 10:57:32 CEST 2006


Author: auc
Date: Tue Aug  8 10:57:24 2006
New Revision: 31154

Modified:
   pypy/dist/pypy/lib/_exceptions.py
   pypy/dist/pypy/objspace/cclp/scheduler.py
   pypy/dist/pypy/objspace/cclp/space.py
   pypy/dist/pypy/objspace/cclp/thread.py
   pypy/dist/pypy/objspace/cclp/thunk.py
   pypy/dist/pypy/objspace/logic.py
   pypy/dist/pypy/objspace/test/test_logicobjspace.py
Log:
ask, waitstable, commit, choose ...


Modified: pypy/dist/pypy/lib/_exceptions.py
==============================================================================
--- pypy/dist/pypy/lib/_exceptions.py	(original)
+++ pypy/dist/pypy/lib/_exceptions.py	Tue Aug  8 10:57:24 2006
@@ -445,7 +445,7 @@
 class LOError(Exception): pass
 
 class UnificationError(LOError): pass
-class RebindingError(LOError): pass
+class RebindingError(UnificationError): pass
 class FutureBindingError(LOError): pass
 
 class AllBlockedError(LOError): pass

Modified: pypy/dist/pypy/objspace/cclp/scheduler.py
==============================================================================
--- pypy/dist/pypy/objspace/cclp/scheduler.py	(original)
+++ pypy/dist/pypy/objspace/cclp/scheduler.py	Tue Aug  8 10:57:24 2006
@@ -55,6 +55,7 @@
             assert self._head not in self._blocked
             assert self._head not in self._blocked_on
             assert self._head not in self._blocked_byneed
+            assert self._head not in self._asking
         except:
             #XXX give sched_info maybe
             raise OperationError(self.space.w_RuntimeError,
@@ -91,18 +92,15 @@
         thread._next = thread._prev = FunnyBoat
         # cspace/threads account mgmt
         if thread._cspace is not None:
-            count = self._per_space_live_threads[thread._cspace]
-            assert count > 0
-            self._per_space_live_threads[thread._cspace] = count - 1
-            if count == 1:
+            count = self.dec_live_thread_count(thread._cspace)
+            if count == 0:
                 del self._per_space_live_threads[thread._cspace]
-            
-#        return thread
 
     #-- cspace helper
 
     def is_stable(self, cspace):
         if not self._per_space_live_threads.has_key(cspace):
+            #XXX meaning ?
             return True
         return self._per_space_live_threads[cspace] == 0
 
@@ -112,7 +110,26 @@
         curr = ClonableCoroutine.w_getcurrent(self.space)
         self._asking[curr] = cspace
         self._blocked[curr] = True
-        self.schedule()
+        # either we choose() from inside
+        if curr._cspace == cspace:
+            self.dec_live_thread_count(cspace)
+            self.schedule()
+            self.inc_live_thread_count(cspace)
+        else: # or we ask() from outside
+            self.schedule()
+
+    #-- cspace -> thread_count helpers
+    def inc_live_thread_count(self, cspace):
+        count = self._per_space_live_threads.get(cspace, 0) + 1
+        self._per_space_live_threads[cspace]  = count
+        return count
+
+    def dec_live_thread_count(self, cspace):
+        count = self._per_space_live_threads[cspace] -1
+        assert count >= 0
+        self._per_space_live_threads[cspace] = count 
+        return count 
+    #-- /
 
     #-- to be used by logic objspace
 
@@ -141,6 +158,7 @@
         while (to_be_run in self._blocked) \
                   or to_be_run.is_dead() \
                   or (to_be_run == current):
+            
             to_be_run = to_be_run._next
             assert isinstance(to_be_run, ClonableCoroutine)
             if to_be_run == sentinel:
@@ -153,10 +171,12 @@
                 w(".. SCHEDULER reinitialized")
                 raise OperationError(self.space.w_AllBlockedError,
                                      self.space.wrap("can't schedule, possible deadlock in sight"))
-            if to_be_run in self._asking:
+            # asking threads
+            if to_be_run in self._asking.keys():
                 if self.is_stable(self._asking[to_be_run]):
                     del self._asking[to_be_run]
                     del self._blocked[to_be_run]
+                    break
         self._head = to_be_run
         return to_be_run
 
@@ -185,11 +205,10 @@
     def add_new_thread(self, thread):
         "insert 'thread' at end of running queue"
         assert isinstance(thread, ClonableCoroutine)
+        # cspace account mgmt
         if thread._cspace != None:
-            print "Yeeeep"
-            count = self._per_space_live_threads.get(thread._cspace, 0)
-            self._per_space_live_threads[thread._cspace] = count + 1
-            assert len(self._per_space_live_threads)
+            self._per_space_live_threads.get(thread._cspace, 0)
+            self.inc_live_thread_count(thread._cspace)
         self._chain_insert(thread)
 
     def add_to_blocked_on(self, w_var, thread):
@@ -206,7 +225,7 @@
         self._blocked[thread] = True
         # cspace accounting
         if thread._cspace is not None:
-            self._per_space_live_threads[thread._cspace] -= 1
+            self.dec_live_thread_count(thread._cspace)
 
     def unblock_on(self, w_var):
         v(".. we UNBLOCK threads dependants of var", str(w_var))
@@ -220,7 +239,7 @@
             del self._blocked[thr]
             # cspace accounting
             if thr._cspace is not None:
-                self._per_space_live_threads[thr._cspace] += 1
+                self.inc_live_thread_count(thr._cspace)
 
     def add_to_blocked_byneed(self, w_var, thread):
         w(".. we BLOCK BYNEED thread", str(id(thread)), "on var", str(w_var))
@@ -235,7 +254,7 @@
         self._blocked[thread] = True
         # cspace accounting
         if thread._cspace is not None:
-            self._per_space_live_threads[thread._cspace] += 1
+            self.dec_live_thread_count(thread._cspace)
 
     def unblock_byneed_on(self, w_var):
         v(".. we UNBLOCK BYNEED dependants of var", str(w_var))
@@ -251,7 +270,7 @@
             del self._blocked[thr]
             # cspace accounting
             if thr._cspace is not None:
-                self._per_space_live_threads[thr._cspace] -= 1
+                self.inc_live_thread_count(thr._cspace)
             
 
     # Logic Variables tracing, helps exception propagation
@@ -304,7 +323,7 @@
         si(w_ret, sw('threads'),
            sw([id(th) for th in s.get_threads()]))
         si(w_ret, sw('blocked_on'),
-           sw([(id(th),  [id(th) for th in thl])
+           sw([(id(var),  [id(th) for th in thl])
                for var, thl in s._blocked_on.items()]))
         si(w_ret, sw('blocked_byneed'),
            sw([(id(var), [id(th) for th in thl])

Modified: pypy/dist/pypy/objspace/cclp/space.py
==============================================================================
--- pypy/dist/pypy/objspace/cclp/space.py	(original)
+++ pypy/dist/pypy/objspace/cclp/space.py	Tue Aug  8 10:57:24 2006
@@ -2,33 +2,47 @@
 from pypy.interpreter import baseobjspace, gateway, argument, typedef
 from pypy.interpreter.error import OperationError
 
+from pypy.objspace.std.intobject import W_IntObject
+
 from pypy.objspace.cclp.misc import ClonableCoroutine, w
-from pypy.objspace.cclp.thunk import FutureThunk, ProcedureThunk
+from pypy.objspace.cclp.thunk import CSpaceThunk
 from pypy.objspace.cclp.global_state import scheduler
+from pypy.objspace.cclp.variable import newvar
+
 
 def newspace(space, w_callable, __args__):
-    try:
-        args = __args__.normalize()
-        # coro init
-        w_coro = ClonableCoroutine(space)
-        thunk = ProcedureThunk(space, w_callable, args, w_coro)
-        w_coro.bind(thunk)
-        if not we_are_translated():
-            w("NEWSPACE, thread", str(id(w_coro)), "for", str(w_callable.name))
-
-            w_space = W_CSpace(space, w_coro, parent=w_coro._cspace)
-            w_coro._cspace = w_space
-
-            scheduler[0].add_new_thread(w_coro)
-            scheduler[0].schedule()
-    except:
-        print "oh, uh"
+    args = __args__.normalize()
+    # coro init
+    w_coro = ClonableCoroutine(space)
+    thunk = CSpaceThunk(space, w_callable, args, w_coro)
+    w_coro.bind(thunk)
+    if not we_are_translated():
+        w("NEWSPACE, thread", str(id(w_coro)), "for", str(w_callable.name))
+    w_space = W_CSpace(space, w_coro, parent=w_coro._cspace)
+    w_coro._cspace = w_space
+
+    scheduler[0].add_new_thread(w_coro)
+    scheduler[0].schedule()
 
     return w_space
 app_newspace = gateway.interp2app(newspace, unwrap_spec=[baseobjspace.ObjSpace,
                                                          baseobjspace.W_Root,
                                                          argument.Arguments])
 
+
+def choose(space, w_n):
+    assert isinstance(w_n, W_IntObject)
+    n = space.int_w(w_n)
+    cspace = ClonableCoroutine.w_getcurrent(space)._cspace
+    if cspace != None:
+        assert isinstance(cspace, W_CSpace)
+        return cspace.choose(n)
+    raise OperationError(space.w_RuntimeError,
+                         space.wrap("choose is forbidden from the top-level space"))
+app_choose = gateway.interp2app(choose)
+
+
+
 class W_CSpace(baseobjspace.Wrappable):
 
     def __init__(self, space, thread, parent=None):
@@ -37,13 +51,37 @@
         self.space = space # the object space ;-)
         self.parent = parent
         self.main_thread = thread
+        # choice mgmt
+        self._choice = newvar(space)
+        self._committed = newvar(space)
 
     def w_ask(self):
         scheduler[0].wait_stable(self)
-        return self.space.newint(0)
+        self.space.wait(self._choice)
+        return self.space.newint(self._choice)
+
+    def choose(self, n):
+        assert n > 1
+        scheduler[0].wait_stable(self)
+        assert self.space.is_true(self.space.is_free(self._choice))
+        assert self.space.is_true(self.space.is_free(self._committed))
+        self.space.bind(self._choice, self.space.wrap(n))
+        self.space.wait(self._committed)
+        committed = self._committed
+        self._committed = newvar(self.space)
+        return committed
+
+    def w_commit(self, w_n):
+        assert self.space.is_true(self.space.is_bound(self._choice))
+        assert 0 < self.space.int_w(w_n)
+        assert self.space.int_w(w_n) <= self._choice
+        self.space.bind(self._committed, w_n)
+        self._choice = newvar(self.space)
+        
 
 W_CSpace.typedef = typedef.TypeDef("W_CSpace",
-    ask = gateway.interp2app(W_CSpace.w_ask))
+    ask = gateway.interp2app(W_CSpace.w_ask),
+    commit = gateway.interp2app(W_CSpace.w_commit))
 
 
 

Modified: pypy/dist/pypy/objspace/cclp/thread.py
==============================================================================
--- pypy/dist/pypy/objspace/cclp/thread.py	(original)
+++ pypy/dist/pypy/objspace/cclp/thread.py	Tue Aug  8 10:57:24 2006
@@ -40,7 +40,6 @@
     #coro.cspace = ClonableCoroutine.w_getcurrent(space).cspace
     thunk = ProcedureThunk(space, w_callable, args, coro)
     coro.bind(thunk)
-    print we_are_translated()
     if not we_are_translated():
         w("STACKLET", str(id(coro)), "for", str(w_callable.name))
     scheduler[0].add_new_thread(coro)

Modified: pypy/dist/pypy/objspace/cclp/thunk.py
==============================================================================
--- pypy/dist/pypy/objspace/cclp/thunk.py	(original)
+++ pypy/dist/pypy/objspace/cclp/thunk.py	Tue Aug  8 10:57:24 2006
@@ -21,9 +21,7 @@
                 scheduler[0].dirty_traced_vars(self._coro, W_FailedValue(exc))
                 self._coro._dead = True
             else:
-                w(".! clean (valueless) EXIT of", str(id(self._coro)),
-                  "-- setting future result", str(self.w_Result), "to",
-                  str(self.costate.w_tempval))
+                w(".! clean (valueless) EXIT of", str(id(self._coro)))
         finally:
             scheduler[0].remove_thread(self._coro)
             scheduler[0].schedule()
@@ -56,6 +54,35 @@
             scheduler[0].remove_thread(self._coro)
             scheduler[0].schedule()
 
+SPACE_FAILURE = 0
+SPACE_SOLUTION = 1
+
+class CSpaceThunk(_AppThunk):
+    def __init__(self, space, w_callable, args, coro):
+        _AppThunk.__init__(self, space, coro.costate, w_callable, args)
+        self._coro = coro
+
+    def call(self):
+        w(". initial (returnless) thunk CALL in", str(id(self._coro)))
+        scheduler[0].trace_vars(self._coro, logic_args(self.args.unpack()))
+        cspace = self._coro._cspace
+        try:
+            try:
+                _AppThunk.call(self)
+            except Exception, exc:
+                w(".% exceptional EXIT of", str(id(self._coro)), "with", str(exc))
+                scheduler[0].dirty_traced_vars(self._coro, W_FailedValue(exc))
+                self._coro._dead = True
+                self.space.bind(cspace._choice, self.space.wrap(SPACE_FAILURE))
+            else:
+                w(".% clean (valueless) EXIT of", str(id(self._coro)))
+                self.space.bind(cspace._choice, self.space.wrap(SPACE_SOLUTION))
+        finally:
+            scheduler[0].remove_thread(self._coro)
+            scheduler[0].schedule()
+
+
+
 
 def logic_args(args):
     "returns logic vars found in unpacked normalized args"

Modified: pypy/dist/pypy/objspace/logic.py
==============================================================================
--- pypy/dist/pypy/objspace/logic.py	(original)
+++ pypy/dist/pypy/objspace/logic.py	Tue Aug  8 10:57:24 2006
@@ -21,7 +21,7 @@
 
 from pypy.objspace.cclp.global_state import scheduler
 
-from pypy.objspace.cclp.space import app_newspace, W_CSpace
+from pypy.objspace.cclp.space import app_newspace, app_choose, W_CSpace
 
 #-- VARIABLE ------------------------------------------------
 
@@ -269,8 +269,11 @@
                   space.wrap(app_this_thread))
     space.setitem(space.builtin.w_dict, space.wrap('reset_scheduler'),
                   space.wrap(app_reset_scheduler))
+    #-- comp. spaces --
     space.setitem(space.builtin.w_dict, space.wrap('newspace'),
                   space.wrap(app_newspace))
+    space.setitem(space.builtin.w_dict, space.wrap('choose'),
+                  space.wrap(app_choose))
 
     #-- misc -----
     space.setitem(space.builtin.w_dict, space.wrap('interp_id'),

Modified: pypy/dist/pypy/objspace/test/test_logicobjspace.py
==============================================================================
--- pypy/dist/pypy/objspace/test/test_logicobjspace.py	(original)
+++ pypy/dist/pypy/objspace/test/test_logicobjspace.py	Tue Aug  8 10:57:24 2006
@@ -603,11 +603,24 @@
 
     def test_newspace_ask_noop(self):
 
-        def bar(X): return X + 42
+        def in_space(X): return X + 42
+
+        def asker():
+            ask()
 
         X = newvar()
-        s = newspace(bar, X)
-        s.ask()
+        s = newspace(in_space, X)
+
+        assert sched_all()['space_accounting'][0][1] == 0 # live threads
+        assert len(sched_all()['blocked_on']) == 1
+
+        stacklet(asker)
+
+        unify(X, 42)
+        schedule()
+        assert len(sched_all()['threads']) == 1
+        
+
 
     def test_newspace_ask_wait(self):
 
@@ -628,3 +641,38 @@
         schedule() # allow quux exit
         schedule() # allow asker exit
         assert len(sched_all()['asking']) == 0
+
+    def test_ask_choose(self):
+
+        def chooser(X):
+            choice = choose(3)
+            unify(X, choice)
+
+        def asker(cspace):
+            choices = cspace.ask()
+            cspace.commit(2)
+
+        X = newvar()
+
+        s = newspace(chooser, X)
+        stacklet(asker, s)
+        schedule()
+        assert X == 2
+
+
+    def test_ask_choose(self):
+
+        def chooser(X):
+            choice = choose(3)
+            unify(X, choice)
+
+        def asker(cspace):
+            choices = cspace.ask()
+            cspace.commit(2)
+
+        X = newvar()
+
+        s = newspace(chooser, X)
+        stacklet(asker, s)
+        schedule()
+        assert X == 2



More information about the Pypy-commit mailing list