[pypy-svn] r30784 - in pypy/dist/pypy/objspace: . cclp cclp/test test

auc at codespeak.net auc at codespeak.net
Mon Jul 31 15:25:12 CEST 2006


Author: auc
Date: Mon Jul 31 15:25:07 2006
New Revision: 30784

Added:
   pypy/dist/pypy/objspace/cclp/scheduler.py
   pypy/dist/pypy/objspace/cclp/space.py
   pypy/dist/pypy/objspace/cclp/test/
   pypy/dist/pypy/objspace/cclp/thunk.py
Modified:
   pypy/dist/pypy/objspace/cclp/thread.py
   pypy/dist/pypy/objspace/cclp/variable.py
   pypy/dist/pypy/objspace/logic.py
   pypy/dist/pypy/objspace/test/test_logicobjspace.py
Log:
more file split, this_thread(), stacklet(), reset_scheduler() builtins, some worries ahead


Added: pypy/dist/pypy/objspace/cclp/scheduler.py
==============================================================================
--- (empty file)
+++ pypy/dist/pypy/objspace/cclp/scheduler.py	Mon Jul 31 15:25:07 2006
@@ -0,0 +1,231 @@
+from pypy.interpreter.error import OperationError
+from pypy.interpreter import gateway
+
+from pypy.objspace.cclp.types import W_Var, aliases
+from pypy.objspace.cclp.misc import w, v, ClonableCoroutine
+from pypy.objspace.cclp.space import CSpace
+
+scheduler = []
+
+#-- Singleton scheduler ------------------------------------------------
+
+class Scheduler(object):
+
+    def __init__(self, space):
+        self.space = space
+        self._main = ClonableCoroutine.w_getcurrent(space)
+        # link top_level space to main coroutine
+        self.top_space = CSpace(space, self._main)
+        self._main.cspace = self.top_space
+        # ...
+        self._init_head(self._main)
+        self._init_blocked()
+        self._switch_count = 0
+        self._traced = {}
+        w (".. MAIN THREAD = ", str(id(self._main)))
+
+    def _init_blocked(self):
+        self._blocked = {} # thread set
+        self._blocked_on = {} # var -> threads
+        self._blocked_byneed = {} # var -> threads
+
+    def _init_head(self, coro):
+        self._head = coro
+        self._head.next = self._head.prev = self._head
+
+    def _set_head(self, thread):
+        self._head = thread
+
+    def _check_initial_conditions(self):
+        try:
+            assert self._head.next == self._head.prev == self._head
+            assert self._head not in self._blocked
+            assert self._head not in self._blocked_on
+            assert self._head not in self._blocked_byneed
+        except:
+            self.display_head()
+            w("BLOCKED", str(self._blocked))
+            all = {}
+            all.update(self._blocked_on)
+            all.update(self._blocked_byneed)
+            w(str(all))
+            raise
+            
+    def _chain_insert(self, thread):
+        assert thread.next is None
+        assert thread.prev is None
+        if self._head is None:
+            thread.next = thread
+            thread.prev = thread
+            self._set_head(thread)
+        else:
+            r = self._head
+            l = r.prev
+            l.next = thread
+            r.prev = thread
+            thread.prev = l
+            thread.next = r
+
+    def remove_thread(self, thread):
+        w(".. REMOVING", str(id(thread)))
+        assert thread not in self._blocked
+        del self._traced[thread]
+        l = thread.prev
+        r = thread.next
+        l.next = r
+        r.prev = l
+        if r == thread:
+            if not we_are_translated():
+                import traceback
+                traceback.print_exc()
+            self.display_head()
+        thread.next = thread.next = None
+        return thread
+
+    #-- to be used by logic objspace
+
+    def schedule(self):
+        to_be_run = self._select_next()
+        w(".. SWITCHING", str(id(ClonableCoroutine.w_getcurrent(self.space))), "=>", str(id(to_be_run)))
+        self._switch_count += 1
+        to_be_run.w_switch() 
+
+    def schedule_or_pass(self):
+        to_be_run = self._select_next(dont_pass=False)
+        curr = ClonableCoroutine.w_getcurrent(self.space)
+        if to_be_run == curr:
+            w(".. PASS")
+            return
+        w(".. SWITCHING", str(id(curr)), "=>", str(id(to_be_run)))
+        self._switch_count += 1
+        to_be_run.w_switch() 
+        
+    def _select_next(self, dont_pass=True):
+        to_be_run = self._head
+        sentinel = to_be_run
+        current = ClonableCoroutine.w_getcurrent(self.space)
+        while (to_be_run in self._blocked) \
+                  or to_be_run.is_dead() \
+                  or (to_be_run == current):
+            to_be_run = to_be_run.next
+            if to_be_run == sentinel:
+                if not dont_pass:
+                    return ClonableCoroutine.w_getcurrent(self.space)
+                self.display_head()
+                ## we RESET sched state so as to keep being usable beyond that
+                self._init_head(self._main)
+                self._init_blocked()
+                w(".. SCHEDULER reinitialized")
+                raise OperationError(self.space.w_RuntimeError,
+                                     self.space.wrap("can't schedule, possible deadlock in sight"))
+        return to_be_run
+
+    #XXX call me directly for this to work translated
+    def __len__(self):
+        "count of known threads (including dead ones)"
+        curr = self._head
+        sentinel = curr
+        count = 1 # there is always a main thread
+        while curr.next != sentinel:
+            curr = curr.next
+            count += 1
+        return count
+
+    def display_head(self):
+        curr = self._head
+        v('Threads : [', '-'.join([str(id(curr)), str(curr in self._blocked)]))
+        while curr.next != self._head:
+            curr = curr.next
+            v('-'.join([str(id(curr)), str(curr in self._blocked)]))
+        w(']')
+
+    def add_new_thread(self, thread):
+        "insert 'thread' at end of running queue"
+        assert isinstance(thread, ClonableCoroutine)
+        self._chain_insert(thread)
+
+    def add_to_blocked_on(self, w_var, uthread):
+        w(".. we BLOCK thread", str(id(uthread)), "on var", str(w_var))
+        assert isinstance(w_var, W_Var)
+        assert isinstance(uthread, ClonableCoroutine)
+        assert uthread not in self._blocked
+        if w_var in self._blocked_on:
+            blocked = self._blocked_on[w_var]
+        else:
+            blocked = []
+            self._blocked_on[w_var] = blocked
+        blocked.append(uthread)
+        self._blocked[uthread] = True
+
+    def unblock_on(self, w_var):
+        v(".. we UNBLOCK threads dependants of var", str(w_var))
+        assert isinstance(w_var, W_Var)
+        blocked = []
+        if w_var in self._blocked_on:
+            blocked = self._blocked_on[w_var]
+            del self._blocked_on[w_var]
+        w(str([id(thr) for thr in blocked]))
+        for thr in blocked: del self._blocked[thr]
+
+    def add_to_blocked_byneed(self, w_var, uthread):
+        w(".. we BLOCK BYNEED thread", str(id(uthread)), "on var", str(w_var))
+        assert isinstance(w_var, W_Var)
+        assert isinstance(uthread, ClonableCoroutine)
+        if w_var in self._blocked_byneed:
+            blocked = self._blocked_byneed[w_var]
+        else:
+            blocked = []
+            self._blocked_byneed[w_var] = blocked
+        blocked.append(uthread)
+        self._blocked[uthread] = True
+
+    def unblock_byneed_on(self, w_var):
+        v(".. we UNBLOCK BYNEED dependants of var", str(w_var))
+        assert isinstance(w_var, W_Var)
+        blocked = []
+        for w_alias in aliases(self.space, w_var):
+            if w_alias in self._blocked_byneed:
+                blocked += self._blocked_byneed[w_alias]
+                del self._blocked_byneed[w_alias]
+            w_alias.needed = True
+        w(str([id(thr) for thr in blocked]))
+        for thr in blocked: del self._blocked[thr]
+
+    # Logic Variables tracing, helps exception propagation
+    # amongst threads
+    def trace_vars(self, thread, lvars):
+        assert isinstance(thread, ClonableCoroutine)
+        assert isinstance(lvars, list)
+        w(".. TRACING logic vars.", str(lvars), "for", str(id(thread)))
+        #assert not self._traced.has_key(thread) doesn't translate 
+        self._traced[thread] = lvars
+
+    def dirty_traced_vars(self, thread, failed_value):
+        w(".. DIRTYING traced vars")
+        for w_var in self._traced[thread]:
+            if self.space.is_true(self.space.is_free(w_var)):
+                self.space.bind(w_var, failed_value)
+
+
+#-- Misc --------------------------------------------------
+def reset_scheduler(space):
+    "garbage collection of threads might pose some problems"
+    scheduler[0] = Scheduler(space)
+app_reset_scheduler = gateway.interp2app(reset_scheduler)
+
+def sched_stats(space):
+    sched = scheduler[0]
+    w_ret = space.newdict([])
+    space.setitem(w_ret, space.wrap('switches'), space.wrap(sched._switch_count))
+    space.setitem(w_ret, space.wrap('threads'), space.wrap(sched.__len__()))
+    space.setitem(w_ret, space.wrap('blocked'), space.wrap(len(sched._blocked)))
+    space.setitem(w_ret, space.wrap('blocked_on'), space.wrap(len(sched._blocked_on)))
+    space.setitem(w_ret, space.wrap('blocked_byneed'), space.wrap(len(sched._blocked_byneed)))
+    return w_ret
+app_sched_stats = gateway.interp2app(sched_stats)
+
+
+def schedule(space):
+    "useful til we get preemtive scheduling deep into the vm"
+    scheduler[0].schedule_or_pass()
+app_schedule = gateway.interp2app(schedule)

Added: pypy/dist/pypy/objspace/cclp/space.py
==============================================================================
--- (empty file)
+++ pypy/dist/pypy/objspace/cclp/space.py	Mon Jul 31 15:25:07 2006
@@ -0,0 +1,52 @@
+from pypy.interpreter.error import OperationError
+from pypy.objspace.cclp.misc import ClonableCoroutine
+
+class CSpace:
+
+    def __init__(self, space, distributor, parent=None):
+        assert isinstance(distributor, ClonableCoroutine)
+        assert (parent is None) or isinstance(parent, CSpace)
+        self.space = space # the object space ;-)
+        self.parent = parent
+        self.distributor = distributor
+        self.threads = {} # the eventual other threads
+
+    def is_top_level(self):
+        return self.parent is None
+
+##     def current_space():
+##         #XXX return w_getcurrent().cspace
+##         pass
+
+##     def newspace():
+##         #XXX fork ?
+##         pass
+
+    def clone(self):
+        if self.is_top_level():
+            raise OperationError(self.space.w_RuntimeError,
+                                 self.space.wrap("Clone"+forbidden_boilerplate))
+        new = CSpace(self.distributor.clone(), parent=self)
+        new.distributor.cspace = new
+        for thread in self.threads:
+            tclone = thread.clone()
+            tclone.cspace = new
+            new.threads[tclone] = True
+
+##     def choose(self, n):
+##         if self.is_top_level():
+##             raise OperationError(self.space.w_RuntimeError,
+##                                  self.space.wrap("Choose"+forbidden_boilerplate))
+
+##     def ask(self):
+##         if self.is_top_level():
+##             raise OperationError(self.space.w_RuntimeError,
+##                                  self.space.wrap("Ask"+forbidden_boilerplate))
+##         #XXX basically hang until a call to choose, then return n
+
+##     def commit(self, n):
+##         if self.is_top_level():
+##             raise OperationError(self.space.w_RuntimeError,
+##                                  self.space.wrap("Commit"+forbidden_boilerplate))
+##         # ensure 0 < n < chosen n
+##         # ...

Modified: pypy/dist/pypy/objspace/cclp/thread.py
==============================================================================
--- pypy/dist/pypy/objspace/cclp/thread.py	(original)
+++ pypy/dist/pypy/objspace/cclp/thread.py	Mon Jul 31 15:25:07 2006
@@ -1,287 +1,58 @@
 from pypy.interpreter import gateway, baseobjspace, argument
 from pypy.interpreter.error import OperationError
+from pypy.rpython.objectmodel import we_are_translated
 
-from pypy.objspace.cclp.types import W_Var, W_Future, W_FailedValue, aliases
+from pypy.objspace.cclp.types import W_Var, W_Future, W_FailedValue
 from pypy.objspace.cclp.misc import w, v, ClonableCoroutine
-from pypy.module._stackless.coroutine import _AppThunk
-
-#-- Singleton scheduler ------------------------------------------------
-
-scheduler = []
-
-class Scheduler(object):
-
-    def __init__(self, space):
-        self.space = space
-        self._main = ClonableCoroutine.w_getcurrent(space)
-        self._init_head(self._main)
-        self._init_blocked()
-        self._switch_count = 0
-        self._traced = {}
-        w (".. MAIN THREAD = ", str(id(self._main)))
-
-    def _init_blocked(self):
-        self._blocked = {} # thread set
-        self._blocked_on = {} # var -> threads
-        self._blocked_byneed = {} # var -> threads
-
-    def _init_head(self, coro):
-        self._head = coro
-        self._head.next = self._head.prev = self._head
-
-    def _set_head(self, thread):
-        self._head = thread
-
-    def _check_initial_conditions(self):
-        try:
-            assert self._head.next == self._head.prev == self._head
-            assert self._head not in self._blocked
-            assert self._head not in self._blocked_on
-            assert self._head not in self._blocked_byneed
-        except:
-            self.display_head()
-            w("BLOCKED", str(self._blocked))
-            all = {}
-            all.update(self._blocked_on)
-            all.update(self._blocked_byneed)
-            w(str(all))
-            raise
-            
-    def _chain_insert(self, thread):
-        assert thread.next is None
-        assert thread.prev is None
-        if self._head is None:
-            thread.next = thread
-            thread.prev = thread
-            self._set_head(thread)
-        else:
-            r = self._head
-            l = r.prev
-            l.next = thread
-            r.prev = thread
-            thread.prev = l
-            thread.next = r
-
-    def remove_thread(self, thread):
-        w(".. REMOVING", str(id(thread)))
-        assert thread not in self._blocked
-        del self._traced[thread]
-        l = thread.prev
-        r = thread.next
-        l.next = r
-        r.prev = l
-        if r == thread:
-            import traceback
-            traceback.print_exc()
-            self.display_head()
-        thread.next = thread.next = None
-        return thread
-
-    #-- to be used by logic objspace
-
-    def schedule(self):
-        to_be_run = self._select_next()
-        w(".. SWITCHING", str(id(ClonableCoroutine.w_getcurrent(self.space))), "=>", str(id(to_be_run)))
-        self._switch_count += 1
-        to_be_run.w_switch() 
-
-    def schedule_or_pass(self):
-        to_be_run = self._select_next(dont_pass=False)
-        curr = ClonableCoroutine.w_getcurrent(self.space)
-        if to_be_run == curr:
-            w(".. PASS")
-            return
-        w(".. SWITCHING", str(id(curr)), "=>", str(id(to_be_run)))
-        self._switch_count += 1
-        to_be_run.w_switch() 
-        
-    def _select_next(self, dont_pass=True):
-        to_be_run = self._head
-        sentinel = to_be_run
-        current = ClonableCoroutine.w_getcurrent(self.space)
-        while (to_be_run in self._blocked) \
-                  or to_be_run.is_dead() \
-                  or (dont_pass and (to_be_run == current)):
-            to_be_run = to_be_run.next
-            if to_be_run == sentinel:
-                self.display_head()
-                ## we RESET sched state so as to keep being usable beyond that
-                self._init_head(self._main)
-                self._init_blocked()
-                w(".. SCHEDULER reinitialized")
-                raise OperationError(self.space.w_RuntimeError,
-                                     self.space.wrap("can't schedule, possible deadlock in sight"))
-        return to_be_run
-
-    #XXX call me directly for this to work translated
-    def __len__(self):
-        "count of known threads (including dead ones)"
-        curr = self._head
-        sentinel = curr
-        count = 1 # there is always a main thread
-        while curr.next != sentinel:
-            curr = curr.next
-            count += 1
-        return count
-
-    def display_head(self):
-        curr = self._head
-        v('Threads : [', '-'.join([str(id(curr)), str(curr in self._blocked)]))
-        while curr.next != self._head:
-            curr = curr.next
-            v('-'.join([str(id(curr)), str(curr in self._blocked)]))
-        w(']')
-
-    def add_new_thread(self, thread):
-        "insert 'thread' at end of running queue"
-        assert isinstance(thread, ClonableCoroutine)
-        self._chain_insert(thread)
-
-    def add_to_blocked_on(self, w_var, uthread):
-        w(".. we BLOCK thread", str(id(uthread)), "on var", str(w_var))
-        assert isinstance(w_var, W_Var)
-        assert isinstance(uthread, ClonableCoroutine)
-        assert uthread not in self._blocked
-        if w_var in self._blocked_on:
-            blocked = self._blocked_on[w_var]
-        else:
-            blocked = []
-            self._blocked_on[w_var] = blocked
-        blocked.append(uthread)
-        self._blocked[uthread] = True
-
-    def unblock_on(self, w_var):
-        v(".. we UNBLOCK threads dependants of var", str(w_var))
-        assert isinstance(w_var, W_Var)
-        blocked = []
-        if w_var in self._blocked_on:
-            blocked = self._blocked_on[w_var]
-            del self._blocked_on[w_var]
-        w(str([id(thr) for thr in blocked]))
-        for thr in blocked: del self._blocked[thr]
-
-    def add_to_blocked_byneed(self, w_var, uthread):
-        w(".. we BLOCK BYNEED thread", str(id(uthread)), "on var", str(w_var))
-        assert isinstance(w_var, W_Var)
-        assert isinstance(uthread, ClonableCoroutine)
-        if w_var in self._blocked_byneed:
-            blocked = self._blocked_byneed[w_var]
-        else:
-            blocked = []
-            self._blocked_byneed[w_var] = blocked
-        blocked.append(uthread)
-        self._blocked[uthread] = True
-
-    def unblock_byneed_on(self, w_var):
-        v(".. we UNBLOCK BYNEED dependants of var", str(w_var))
-        assert isinstance(w_var, W_Var)
-        blocked = []
-        for w_alias in aliases(self.space, w_var):
-            if w_alias in self._blocked_byneed:
-                blocked += self._blocked_byneed[w_alias]
-                del self._blocked_byneed[w_alias]
-            w_alias.needed = True
-        w(str([id(thr) for thr in blocked]))
-        for thr in blocked: del self._blocked[thr]
-
-    # Logic Variables tracing, helps exception propagation
-    # amongst threads
-    def trace_vars(self, thread, lvars):
-        assert isinstance(thread, ClonableCoroutine)
-        assert isinstance(lvars, list)
-        w(".. TRACING logic vars.", str(lvars), "for", str(id(thread)))
-        #assert not self._traced.has_key(thread) doesn't translate 
-        self._traced[thread] = lvars
-
-    def dirty_traced_vars(self, thread, failed_value):
-        w(".. DIRTYING traced vars")
-        for w_var in self._traced[thread]:
-            if self.space.is_true(self.space.is_free(w_var)):
-                self.space.bind(w_var, failed_value)
-
-
-#-- Thunk -----------------------------------------
-
-class FutureThunk(_AppThunk):
-    def __init__(self, space, w_callable, args, w_Result, coro):
-        _AppThunk.__init__(self, space, coro.costate, w_callable, args)
-        self.w_Result = w_Result 
-        self._coro = coro
-
-    def call(self):
-        w(".. initial thunk CALL in", str(id(self._coro)))
-        scheduler[0].trace_vars(self._coro, logic_args(self.args.unpack()))
-        try:
-            try:
-                _AppThunk.call(self)
-            except Exception, exc:
-                w(".. exceptional EXIT of", str(id(self._coro)), "with", str(exc))
-                failed_val = W_FailedValue(exc)
-                self.space.bind(self.w_Result, failed_val)
-                scheduler[0].dirty_traced_vars(self._coro, failed_val)
-                self._coro._dead = True
-            else:
-                w(".. clean EXIT of", str(id(self._coro)),
-                  "-- setting future result", str(self.w_Result), "to",
-                  str(self.costate.w_tempval))
-                self.space.unify(self.w_Result, self.costate.w_tempval)
-        finally:
-            scheduler[0].remove_thread(self._coro)
-            scheduler[0].schedule()
-
-
-def logic_args(args):
-    "returns logic vars found in unpacked normalized args"
-    assert isinstance(args, tuple)
-    pos = args[0]
-    kwa = args[1]
-    pos_l = [arg for arg in pos
-             if isinstance(arg, W_Var)]
-    kwa_l = [arg for arg in kwa.keys()
-             if isinstance(arg, W_Var)]
-    return pos_l + kwa_l
+from pypy.objspace.cclp.thunk import FutureThunk, ProcedureThunk
+from pypy.objspace.cclp.scheduler import scheduler
 
 
 #-- Future --------------------------------------------------
 
 def future(space, w_callable, __args__):
     """returns a future result"""
+    #XXX we could be much more lazy wrt coro creation
     args = __args__.normalize()
     # coro init
     coro = ClonableCoroutine(space)
     # prepare thread chaining, create missing slots
     coro.next = coro.prev = None
+    # computation space is the same as in the parent
+    coro.cspace = ClonableCoroutine.w_getcurrent(space).cspace
     # feed the coro
     w_Future = W_Future(space)
     thunk = FutureThunk(space, w_callable, args, w_Future, coro)
     coro.bind(thunk)
     w("THREAD", str(id(coro)))
     scheduler[0].add_new_thread(coro)
-    # XXX we should think about a way to make it read-only for the client
-    #     (i.e the originator), aka true futures
     return w_Future
 app_future = gateway.interp2app(future, unwrap_spec=[baseobjspace.ObjSpace,
                                                      baseobjspace.W_Root,
                                                      argument.Arguments])
-    
-# need (applevel) : getcurrent(), getmain(), 
 
-#-- Misc --------------------------------------------------
-
-def sched_stats(space):
-    sched = scheduler[0]
-    w_ret = space.newdict([])
-    space.setitem(w_ret, space.wrap('switches'), space.wrap(sched._switch_count))
-    space.setitem(w_ret, space.wrap('threads'), space.wrap(sched.__len__()))
-    space.setitem(w_ret, space.wrap('blocked'), space.wrap(len(sched._blocked)))
-    space.setitem(w_ret, space.wrap('blocked_on'), space.wrap(len(sched._blocked_on)))
-    space.setitem(w_ret, space.wrap('blocked_byneed'), space.wrap(len(sched._blocked_byneed)))
-    return w_ret
-app_sched_stats = gateway.interp2app(sched_stats)
+#-- plain Coroutine -----------------------------------------
 
+def stacklet(space, w_callable, __args__):
+    """returns a coroutine object"""
+    args = __args__.normalize()
+    # coro init
+    coro = ClonableCoroutine(space)
+    # prepare thread chaining, create missing slots
+    coro.next = coro.prev = None
+    # computation space is the same as in the parent
+    coro.cspace = ClonableCoroutine.w_getcurrent(space).cspace
+    thunk = ProcedureThunk(space, w_callable, args, coro)
+    coro.bind(thunk)
+    w("THREAD", str(id(coro)))
+    scheduler[0].add_new_thread(coro)
+    scheduler[0].schedule()
+    return coro
+app_stacklet = gateway.interp2app(stacklet, unwrap_spec=[baseobjspace.ObjSpace,
+                                                         baseobjspace.W_Root,
+                                                         argument.Arguments])
 
-def schedule(space):
-    "useful til we get preemtive scheduling deep into the vm"
-    scheduler[0].schedule_or_pass()
-app_schedule = gateway.interp2app(schedule)
 
+def this_thread(space):
+    return ClonableCoroutine.w_getcurrent(space)
+app_this_thread = gateway.interp2app(this_thread)

Added: pypy/dist/pypy/objspace/cclp/thunk.py
==============================================================================
--- (empty file)
+++ pypy/dist/pypy/objspace/cclp/thunk.py	Mon Jul 31 15:25:07 2006
@@ -0,0 +1,70 @@
+from pypy.module._stackless.coroutine import _AppThunk
+from pypy.objspace.cclp.misc import w 
+from pypy.objspace.cclp.scheduler import scheduler
+from pypy.objspace.cclp.types import W_Var, W_Future, W_FailedValue
+
+#-- Thunk -----------------------------------------
+
+class ProcedureThunk(_AppThunk):
+    def __init__(self, space, w_callable, args, coro):
+        _AppThunk.__init__(self, space, coro.costate, w_callable, args)
+        self._coro = coro
+
+    def call(self):
+        w(".! initial (returnless) thunk CALL in", str(id(self._coro)))
+        scheduler[0].trace_vars(self._coro, logic_args(self.args.unpack()))
+        try:
+            try:
+                _AppThunk.call(self)
+            except Exception, exc:
+                w(".! exceptional EXIT of", str(id(self._coro)), "with", str(exc))
+                scheduler[0].dirty_traced_vars(self._coro, W_FailedValue(exc))
+                self._coro._dead = True
+            else:
+                w(".! clean (valueless) EXIT of", str(id(self._coro)),
+                  "-- setting future result", str(self.w_Result), "to",
+                  str(self.costate.w_tempval))
+        finally:
+            scheduler[0].remove_thread(self._coro)
+            scheduler[0].schedule()
+
+
+class FutureThunk(_AppThunk):
+    def __init__(self, space, w_callable, args, w_Result, coro):
+        _AppThunk.__init__(self, space, coro.costate, w_callable, args)
+        self.w_Result = w_Result 
+        self._coro = coro
+
+    def call(self):
+        w(".! initial thunk CALL in", str(id(self._coro)))
+        scheduler[0].trace_vars(self._coro, logic_args(self.args.unpack()))
+        try:
+            try:
+                _AppThunk.call(self)
+            except Exception, exc:
+                w(".! exceptional EXIT of", str(id(self._coro)), "with", str(exc))
+                failed_val = W_FailedValue(exc)
+                self.space.bind(self.w_Result, failed_val)
+                scheduler[0].dirty_traced_vars(self._coro, failed_val)
+                self._coro._dead = True
+            else:
+                w(".! clean EXIT of", str(id(self._coro)),
+                  "-- setting future result", str(self.w_Result), "to",
+                  str(self.costate.w_tempval))
+                self.space.unify(self.w_Result, self.costate.w_tempval)
+        finally:
+            scheduler[0].remove_thread(self._coro)
+            scheduler[0].schedule()
+
+
+def logic_args(args):
+    "returns logic vars found in unpacked normalized args"
+    assert isinstance(args, tuple)
+    pos = args[0]
+    kwa = args[1]
+    pos_l = [arg for arg in pos
+             if isinstance(arg, W_Var)]
+    kwa_l = [arg for arg in kwa.keys()
+             if isinstance(arg, W_Var)]
+    return pos_l + kwa_l
+

Modified: pypy/dist/pypy/objspace/cclp/variable.py
==============================================================================
--- pypy/dist/pypy/objspace/cclp/variable.py	(original)
+++ pypy/dist/pypy/objspace/cclp/variable.py	Mon Jul 31 15:25:07 2006
@@ -5,7 +5,7 @@
 from pypy.objspace.std.dictobject import W_DictObject
 
 from pypy.objspace.cclp.misc import w, v, ClonableCoroutine
-from pypy.objspace.cclp.thread import scheduler
+from pypy.objspace.cclp.scheduler import scheduler
 from pypy.objspace.cclp.types import deref, W_Var, W_Future, W_FailedValue
 
 
@@ -29,7 +29,6 @@
         scheduler[0].unblock_byneed_on(w_var)
         scheduler[0].add_to_blocked_on(w_var, ClonableCoroutine.w_getcurrent(space))
         scheduler[0].schedule()
-        print "pooooopp ?", id(ClonableCoroutine.w_getcurrent(space))
         assert space.is_true(space.is_bound(w_var))
     w_ret = w_var.w_bound_to
     if isinstance(w_ret, W_FailedValue):

Modified: pypy/dist/pypy/objspace/logic.py
==============================================================================
--- pypy/dist/pypy/objspace/logic.py	(original)
+++ pypy/dist/pypy/objspace/logic.py	Mon Jul 31 15:25:07 2006
@@ -14,8 +14,10 @@
 
 #-- THREADING/COROUTINING -----------------------------------
 
-from pypy.objspace.cclp.thread import Scheduler, app_future, app_sched_stats, \
-     app_schedule, scheduler
+from pypy.objspace.cclp.thread import app_future, app_stacklet, app_this_thread
+
+from pypy.objspace.cclp.scheduler import Scheduler, scheduler, app_sched_stats, \
+     app_schedule, app_reset_scheduler
 
 #-- VARIABLE ------------------------------------------------
 
@@ -246,6 +248,8 @@
     #-- threading --
     space.setitem(space.builtin.w_dict, space.wrap('future'),
                  space.wrap(app_future))
+    space.setitem(space.builtin.w_dict, space.wrap('stacklet'),
+                 space.wrap(app_stacklet))
     space.setitem(space.builtin.w_dict, space.wrap('wait'),
                  space.wrap(app_wait))
     space.setitem(space.builtin.w_dict, space.wrap('wait_needed'),
@@ -254,6 +258,10 @@
                   space.wrap(app_sched_stats))
     space.setitem(space.builtin.w_dict, space.wrap('schedule'),
                   space.wrap(app_schedule))
+    space.setitem(space.builtin.w_dict, space.wrap('this_thread'),
+                  space.wrap(app_this_thread))
+    space.setitem(space.builtin.w_dict, space.wrap('reset_scheduler'),
+                  space.wrap(app_reset_scheduler))
 
     #-- misc -----
     space.setitem(space.builtin.w_dict, space.wrap('interp_id'),

Modified: pypy/dist/pypy/objspace/test/test_logicobjspace.py
==============================================================================
--- pypy/dist/pypy/objspace/test/test_logicobjspace.py	(original)
+++ pypy/dist/pypy/objspace/test/test_logicobjspace.py	Mon Jul 31 15:25:07 2006
@@ -1,6 +1,6 @@
 from pypy.conftest import gettestobjspace
 from py.test import skip
- 
+
 class AppTest_Logic(object):
 
     def setup_class(cls):
@@ -196,13 +196,14 @@
         raises(Exception, unify, f1.b, 24)
 
 
-class AppTest_LogicThreads(object):
+class AppTest_LogicFutures(object):
 
     def setup_class(cls):
         cls.space = gettestobjspace('logic', usemodules=("_stackless",))
 
     def test_future_value(self):
-        
+        print "future value", sched_stats()
+
         def poop(X):
             return X + 1
 
@@ -210,7 +211,6 @@
         Y = future(poop, X)
         unify(X, 42)
         assert Y == 43
-        print "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAa"
         assert sched_stats()['threads'] == 1
 
         X = newvar()
@@ -233,6 +233,7 @@
         assert sched_stats()['threads'] == 3
 
     def test_one_future_exception(self):
+        print "one future exception", sched_stats()
         class FooException(Exception): pass
         
         def poop(X):
@@ -249,6 +250,7 @@
         assert False
 
     def test_exception_in_chain(self):
+        print "exception in chain", sched_stats()
         class FooException(Exception): pass
 
         def raise_foo():
@@ -272,6 +274,7 @@
         assert False
 
     def test_exception_in_group(self):
+        print "exception in groups", sched_stats()
         class FooException(Exception): pass
 
         def loop_or_raise(Canary, crit, Bomb_signal):
@@ -303,6 +306,7 @@
         """check that a wait nested in a tree of
            threads works correctly
         """
+        print "nested threads", sched_stats()
         def sleep(X):
             wait(X)
             return X
@@ -318,6 +322,7 @@
         assert v == 42
 
     def test_wait_needed(self):
+        print "wait_needed", sched_stats()
         X = newvar()
 
         def binder(V):
@@ -334,6 +339,7 @@
         assert X == 42
 
     def test_eager_producer_consummer(self):
+        print "eager_producer_consummer", sched_stats()
 
         def generate(n, limit):
             if n < limit:
@@ -355,6 +361,7 @@
 
 
     def test_lazy_producer_consummer(self):
+        print "lazy_producer_consummer", sched_stats()
 
         def lgenerate(n, L):
             """wait-needed version of generate"""
@@ -383,6 +390,8 @@
         assert T == 45
 
     def test_wait_two(self):
+        print "wait_two", sched_stats()
+
         def sleep(X, Barrier):
             wait(X)
             bind(Barrier, True)
@@ -424,3 +433,36 @@
             print F
         except Exception, e:
             print e
+
+    def test_stacklet(self):
+
+        print "stacklet", sched_stats()
+        reset_scheduler()
+        #XXX each of the previous test decorates
+        #    the scheduler with unreclaimed stuff
+        #    In this case, side-effect happen. Nasty.
+
+        count = [0]
+
+        def inc_and_greet(count, max_, Finished, Failed):
+            if count[0] >= max_:
+                count[0] += 1
+                bind(Finished, count[0])
+                return
+            count[0] += 1
+
+        Finished, Failed = newvar(), newvar()
+        max_spawn = 2
+        erring = 3
+        for i in range(max_spawn + erring):
+            stacklet(inc_and_greet, count, max_spawn, Finished, Failed)
+
+        wait(Finished)
+        assert count[0] == max_spawn + erring
+        try:
+            wait(Failed)
+        except Exception, e: # Unification Failure
+            assert sched_stats()['threads'] == 1
+            return
+        assert False
+                



More information about the Pypy-commit mailing list