[pypy-svn] r24216 - pypy/dist/pypy/objspace

cfbolz at codespeak.net cfbolz at codespeak.net
Fri Mar 10 13:19:47 CET 2006


Author: cfbolz
Date: Fri Mar 10 13:19:46 2006
New Revision: 24216

Modified:
   pypy/dist/pypy/objspace/logic.py
Log:
(cfbolz, pedronis):

added an implementation of uthreads that use stackless and interpreter-level
coroutines. seems to be translatable and it is possible to run the examples.
Sidenode: it seems to be possible to use greenlets to implement
interpreter-level coroutines at least to some extend.


Modified: pypy/dist/pypy/objspace/logic.py
==============================================================================
--- pypy/dist/pypy/objspace/logic.py	(original)
+++ pypy/dist/pypy/objspace/logic.py	Fri Mar 10 13:19:46 2006
@@ -1,31 +1,130 @@
 from pypy.objspace.proxy import patch_space_in_place
 from pypy.interpreter import gateway, baseobjspace, argument
 from pypy.interpreter.error import OperationError
+from pypy.rpython.objectmodel import we_are_translated
 
-USE_GREENLETS = False
+USE_COROUTINES = True
+HAVE_GREENLETS = True
 try:
     from py.magic import greenlet
 except ImportError:
-    USE_GREENLETS = False
+    HAVE_GREENLETS = False
 
-if USE_GREENLETS:
-    runnable_uthreads = {}
-    uthreads_blocked_on = {}
-    main_greenlet = greenlet.getcurrent()
+def have_uthreads():
+    if USE_COROUTINES:
+        if we_are_translated():
+            return True
+        else:
+            return HAVE_GREENLETS
+    return False
+
+if USE_COROUTINES:
+    from pypy.module.stackless.interp_coroutine import Coroutine, AbstractThunk
+
+    class ScheduleState(object):
+        def __init__(self):
+            self.runnable_uthreads = {}
+            self.uthreads_blocked_on = {}
+
+        def pop_runnable_thread(self):
+            # umpf, no popitem in RPython
+            key = None
+            for key, item in self.runnable_uthreads.iteritems():
+                break
+            del self.runnable_uthreads[key]
+            return key 
+
+        def add_to_runnable(self, uthread):
+            self.runnable_uthreads[uthread] = True
+
+        def remove_from_runnable(self, uthread):
+            del self.runnable_uthreads[uthread]
+
+        def have_runnable_threads(self):
+            return bool(self.runnable_uthreads)
+
+        def have_blocked_threads(self):
+            return bool(self.uthreads_blocked_on)
+
+        def add_to_blocked(self, w_var, uthread):
+            if w_var in self.uthreads_blocked_on:
+                blocked = self.uthreads_blocked_on[w_var]
+            else:
+                blocked = []
+                self.uthreads_blocked_on[w_var] = blocked
+            blocked.append(uthread)
+
+        def pop_blocked_on(self, w_var):
+            if w_var not in self.uthreads_blocked_on:
+                return []
+            blocked = self.uthreads_blocked_on[w_var]
+            del self.uthreads_blocked_on[w_var]
+            return blocked
+
+    schedule_state = ScheduleState()
+
+    class Thunk(AbstractThunk):
+        def __init__(self, space, w_callable, args, w_Result):
+            self.space = space
+            self.w_callable = w_callable
+            self.args = args
+            self.w_Result = w_Result # the upper-case R is because it is a logic variable
+
+        def call(self):
+            bind(self.space, self.w_Result,
+                 self.space.call_args(self.w_callable, self.args))
+
+    class GreenletCoroutine(object):
+        def bind(self, thunk):
+            self.greenlet = greenlet(thunk.call)
+
+        def switch(self):
+            self.greenlet.switch()
+
+        def is_alive(self):
+            return bool(self.greenlet)
+
+        def getcurrent():
+            result = GreenletCoroutine()
+            result.greenlet = greenlet.getcurrent()
+            return result
+        getcurrent = staticmethod(getcurrent)
+
+        def __hash__(self):
+            return hash(self.greenlet)
+
+        def __eq__(self, other):
+            return self.greenlet == other.greenlet
+
+        def __ne__(self, other):
+            return not (self == other)
+
+    def construct_coroutine():
+        if we_are_translated():
+            return Coroutine()
+        else:
+            return GreenletCoroutine()
+
+    def get_current_coroutine():
+        if we_are_translated():
+            return Coroutine.getcurrent()
+        else:
+            return GreenletCoroutine.getcurrent()
 
     def uthread(space, w_callable, __args__):
+        args = __args__.normalize()
         w_Result = W_Var()
-        def run():
-            space.eq(w_Result, space.call_args(w_callable, __args__))
-        gr = greenlet(run)
-        current = greenlet.getcurrent()
-        runnable_uthreads[current] = True
-        gr.switch()
-        while runnable_uthreads:
-            next_greenlet, _ = runnable_uthreads.popitem()
-            if next_greenlet and next_greenlet is not current:
-                runnable_uthreads[current] = True
-                next_greenlet.switch()
+        thunk = Thunk(space, w_callable, args, w_Result)
+        coro = construct_coroutine()
+        coro.bind(thunk)
+        current = get_current_coroutine()
+        schedule_state.add_to_runnable(current)
+        coro.switch()
+        while schedule_state.have_runnable_threads():
+            next_coro = schedule_state.pop_runnable_thread()
+            if next_coro.is_alive() and next_coro != current:
+                schedule_state.add_to_runnable(current)
+                next_coro.switch()
         return w_Result
     app_uthread = gateway.interp2app(uthread, unwrap_spec=[baseobjspace.ObjSpace,
                                                            baseobjspace.W_Root,
@@ -54,16 +153,16 @@
         w_obj = w_last.w_bound_to
         if w_obj is None:
             # XXX here we would have to suspend the current thread
-            if not USE_GREENLETS:
+            if not have_uthreads():
                 raise OperationError(space.w_RuntimeError,
                                      space.wrap("trying to perform an operation on an unbound variable"))
             else:
-                current = greenlet.getcurrent()
-                uthreads_blocked_on.setdefault(w_last, []).append(current)
-                while runnable_uthreads:
-                    next_greenlet, _ = runnable_uthreads.popitem()
-                    if next_greenlet:
-                        next_greenlet.switch()
+                current = get_current_coroutine()
+                schedule_state.add_to_blocked(w_last, current)
+                while schedule_state.have_runnable_threads():
+                    next_coro = schedule_state.pop_runnable_thread()
+                    if next_coro.is_alive():
+                        next_coro.switch()
                         # there is a value here now
                         break
                 else:
@@ -117,10 +216,10 @@
         w_next = w_curr.w_bound_to
         w_curr.w_bound_to = w_obj
         w_curr = w_next
-    if USE_GREENLETS:
-        now_unblocked_uthreads = uthreads_blocked_on.pop(w_last, [])
+    if have_uthreads():
+        now_unblocked_uthreads = schedule_state.pop_blocked_on(w_last)
         for uthread in now_unblocked_uthreads:
-            runnable_uthreads[uthread] = True
+            schedule_state.add_to_runnable(uthread)
     return space.w_None
 app_bind = gateway.interp2app(bind)
 
@@ -250,20 +349,18 @@
                   space.wrap(app_is_unbound))
     space.setitem(space.builtin.w_dict, space.wrap('bind'),
                  space.wrap(app_bind))
-    if USE_GREENLETS:
+    if USE_COROUTINES:
+        import os
         def exitfunc():
-            current = greenlet.getcurrent()
-            while runnable_uthreads:
-                next_greenlet, _ = runnable_uthreads.popitem()
-                if next_greenlet and next_greenlet is not current:
-                    runnable_uthreads[current] = True
-                    next_greenlet.switch()
-                    del runnable_uthreads[current]
-            if uthreads_blocked_on:
-                print "there are still blocked uthreads!"
-                for var, blocked in uthreads_blocked_on.iteritems():
-                    print var, blocked
-                assert 0
+            current = get_current_coroutine()
+            while schedule_state.have_runnable_threads():
+                next_coro = schedule_state.pop_runnable_thread()
+                if next_coro.is_alive and next_coro != current:
+                    schedule_state.add_to_runnable(current)
+                    next_coro.switch()
+                    schedule_state.remove_from_runnable(current)
+            if schedule_state.have_blocked_threads():
+                os.write(2, "there are still blocked uthreads!")
         app_exitfunc = gateway.interp2app(exitfunc, unwrap_spec=[])
         space.setitem(space.sys.w_dict, space.wrap("exitfunc"), space.wrap(app_exitfunc))
         space.setitem(space.builtin.w_dict, space.wrap('uthread'),



More information about the Pypy-commit mailing list