[pypy-svn] r24216 - pypy/dist/pypy/objspace
cfbolz at codespeak.net
cfbolz at codespeak.net
Fri Mar 10 13:19:47 CET 2006
Author: cfbolz
Date: Fri Mar 10 13:19:46 2006
New Revision: 24216
Modified:
pypy/dist/pypy/objspace/logic.py
Log:
(cfbolz, pedronis):
added an implementation of uthreads that use stackless and interpreter-level
coroutines. seems to be translatable and it is possible to run the examples.
Sidenode: it seems to be possible to use greenlets to implement
interpreter-level coroutines at least to some extend.
Modified: pypy/dist/pypy/objspace/logic.py
==============================================================================
--- pypy/dist/pypy/objspace/logic.py (original)
+++ pypy/dist/pypy/objspace/logic.py Fri Mar 10 13:19:46 2006
@@ -1,31 +1,130 @@
from pypy.objspace.proxy import patch_space_in_place
from pypy.interpreter import gateway, baseobjspace, argument
from pypy.interpreter.error import OperationError
+from pypy.rpython.objectmodel import we_are_translated
-USE_GREENLETS = False
+USE_COROUTINES = True
+HAVE_GREENLETS = True
try:
from py.magic import greenlet
except ImportError:
- USE_GREENLETS = False
+ HAVE_GREENLETS = False
-if USE_GREENLETS:
- runnable_uthreads = {}
- uthreads_blocked_on = {}
- main_greenlet = greenlet.getcurrent()
+def have_uthreads():
+ if USE_COROUTINES:
+ if we_are_translated():
+ return True
+ else:
+ return HAVE_GREENLETS
+ return False
+
+if USE_COROUTINES:
+ from pypy.module.stackless.interp_coroutine import Coroutine, AbstractThunk
+
+ class ScheduleState(object):
+ def __init__(self):
+ self.runnable_uthreads = {}
+ self.uthreads_blocked_on = {}
+
+ def pop_runnable_thread(self):
+ # umpf, no popitem in RPython
+ key = None
+ for key, item in self.runnable_uthreads.iteritems():
+ break
+ del self.runnable_uthreads[key]
+ return key
+
+ def add_to_runnable(self, uthread):
+ self.runnable_uthreads[uthread] = True
+
+ def remove_from_runnable(self, uthread):
+ del self.runnable_uthreads[uthread]
+
+ def have_runnable_threads(self):
+ return bool(self.runnable_uthreads)
+
+ def have_blocked_threads(self):
+ return bool(self.uthreads_blocked_on)
+
+ def add_to_blocked(self, w_var, uthread):
+ if w_var in self.uthreads_blocked_on:
+ blocked = self.uthreads_blocked_on[w_var]
+ else:
+ blocked = []
+ self.uthreads_blocked_on[w_var] = blocked
+ blocked.append(uthread)
+
+ def pop_blocked_on(self, w_var):
+ if w_var not in self.uthreads_blocked_on:
+ return []
+ blocked = self.uthreads_blocked_on[w_var]
+ del self.uthreads_blocked_on[w_var]
+ return blocked
+
+ schedule_state = ScheduleState()
+
+ class Thunk(AbstractThunk):
+ def __init__(self, space, w_callable, args, w_Result):
+ self.space = space
+ self.w_callable = w_callable
+ self.args = args
+ self.w_Result = w_Result # the upper-case R is because it is a logic variable
+
+ def call(self):
+ bind(self.space, self.w_Result,
+ self.space.call_args(self.w_callable, self.args))
+
+ class GreenletCoroutine(object):
+ def bind(self, thunk):
+ self.greenlet = greenlet(thunk.call)
+
+ def switch(self):
+ self.greenlet.switch()
+
+ def is_alive(self):
+ return bool(self.greenlet)
+
+ def getcurrent():
+ result = GreenletCoroutine()
+ result.greenlet = greenlet.getcurrent()
+ return result
+ getcurrent = staticmethod(getcurrent)
+
+ def __hash__(self):
+ return hash(self.greenlet)
+
+ def __eq__(self, other):
+ return self.greenlet == other.greenlet
+
+ def __ne__(self, other):
+ return not (self == other)
+
+ def construct_coroutine():
+ if we_are_translated():
+ return Coroutine()
+ else:
+ return GreenletCoroutine()
+
+ def get_current_coroutine():
+ if we_are_translated():
+ return Coroutine.getcurrent()
+ else:
+ return GreenletCoroutine.getcurrent()
def uthread(space, w_callable, __args__):
+ args = __args__.normalize()
w_Result = W_Var()
- def run():
- space.eq(w_Result, space.call_args(w_callable, __args__))
- gr = greenlet(run)
- current = greenlet.getcurrent()
- runnable_uthreads[current] = True
- gr.switch()
- while runnable_uthreads:
- next_greenlet, _ = runnable_uthreads.popitem()
- if next_greenlet and next_greenlet is not current:
- runnable_uthreads[current] = True
- next_greenlet.switch()
+ thunk = Thunk(space, w_callable, args, w_Result)
+ coro = construct_coroutine()
+ coro.bind(thunk)
+ current = get_current_coroutine()
+ schedule_state.add_to_runnable(current)
+ coro.switch()
+ while schedule_state.have_runnable_threads():
+ next_coro = schedule_state.pop_runnable_thread()
+ if next_coro.is_alive() and next_coro != current:
+ schedule_state.add_to_runnable(current)
+ next_coro.switch()
return w_Result
app_uthread = gateway.interp2app(uthread, unwrap_spec=[baseobjspace.ObjSpace,
baseobjspace.W_Root,
@@ -54,16 +153,16 @@
w_obj = w_last.w_bound_to
if w_obj is None:
# XXX here we would have to suspend the current thread
- if not USE_GREENLETS:
+ if not have_uthreads():
raise OperationError(space.w_RuntimeError,
space.wrap("trying to perform an operation on an unbound variable"))
else:
- current = greenlet.getcurrent()
- uthreads_blocked_on.setdefault(w_last, []).append(current)
- while runnable_uthreads:
- next_greenlet, _ = runnable_uthreads.popitem()
- if next_greenlet:
- next_greenlet.switch()
+ current = get_current_coroutine()
+ schedule_state.add_to_blocked(w_last, current)
+ while schedule_state.have_runnable_threads():
+ next_coro = schedule_state.pop_runnable_thread()
+ if next_coro.is_alive():
+ next_coro.switch()
# there is a value here now
break
else:
@@ -117,10 +216,10 @@
w_next = w_curr.w_bound_to
w_curr.w_bound_to = w_obj
w_curr = w_next
- if USE_GREENLETS:
- now_unblocked_uthreads = uthreads_blocked_on.pop(w_last, [])
+ if have_uthreads():
+ now_unblocked_uthreads = schedule_state.pop_blocked_on(w_last)
for uthread in now_unblocked_uthreads:
- runnable_uthreads[uthread] = True
+ schedule_state.add_to_runnable(uthread)
return space.w_None
app_bind = gateway.interp2app(bind)
@@ -250,20 +349,18 @@
space.wrap(app_is_unbound))
space.setitem(space.builtin.w_dict, space.wrap('bind'),
space.wrap(app_bind))
- if USE_GREENLETS:
+ if USE_COROUTINES:
+ import os
def exitfunc():
- current = greenlet.getcurrent()
- while runnable_uthreads:
- next_greenlet, _ = runnable_uthreads.popitem()
- if next_greenlet and next_greenlet is not current:
- runnable_uthreads[current] = True
- next_greenlet.switch()
- del runnable_uthreads[current]
- if uthreads_blocked_on:
- print "there are still blocked uthreads!"
- for var, blocked in uthreads_blocked_on.iteritems():
- print var, blocked
- assert 0
+ current = get_current_coroutine()
+ while schedule_state.have_runnable_threads():
+ next_coro = schedule_state.pop_runnable_thread()
+ if next_coro.is_alive and next_coro != current:
+ schedule_state.add_to_runnable(current)
+ next_coro.switch()
+ schedule_state.remove_from_runnable(current)
+ if schedule_state.have_blocked_threads():
+ os.write(2, "there are still blocked uthreads!")
app_exitfunc = gateway.interp2app(exitfunc, unwrap_spec=[])
space.setitem(space.sys.w_dict, space.wrap("exitfunc"), space.wrap(app_exitfunc))
space.setitem(space.builtin.w_dict, space.wrap('uthread'),
More information about the Pypy-commit
mailing list