[pypy-svn] r30784 - in pypy/dist/pypy/objspace: . cclp cclp/test test
auc at codespeak.net
auc at codespeak.net
Mon Jul 31 15:25:12 CEST 2006
Author: auc
Date: Mon Jul 31 15:25:07 2006
New Revision: 30784
Added:
pypy/dist/pypy/objspace/cclp/scheduler.py
pypy/dist/pypy/objspace/cclp/space.py
pypy/dist/pypy/objspace/cclp/test/
pypy/dist/pypy/objspace/cclp/thunk.py
Modified:
pypy/dist/pypy/objspace/cclp/thread.py
pypy/dist/pypy/objspace/cclp/variable.py
pypy/dist/pypy/objspace/logic.py
pypy/dist/pypy/objspace/test/test_logicobjspace.py
Log:
more file split, this_thread(), stacklet(), reset_scheduler() builtins, some worries ahead
Added: pypy/dist/pypy/objspace/cclp/scheduler.py
==============================================================================
--- (empty file)
+++ pypy/dist/pypy/objspace/cclp/scheduler.py Mon Jul 31 15:25:07 2006
@@ -0,0 +1,231 @@
+from pypy.interpreter.error import OperationError
+from pypy.interpreter import gateway
+
+from pypy.objspace.cclp.types import W_Var, aliases
+from pypy.objspace.cclp.misc import w, v, ClonableCoroutine
+from pypy.objspace.cclp.space import CSpace
+
+scheduler = []
+
+#-- Singleton scheduler ------------------------------------------------
+
+class Scheduler(object):
+
+ def __init__(self, space):
+ self.space = space
+ self._main = ClonableCoroutine.w_getcurrent(space)
+ # link top_level space to main coroutine
+ self.top_space = CSpace(space, self._main)
+ self._main.cspace = self.top_space
+ # ...
+ self._init_head(self._main)
+ self._init_blocked()
+ self._switch_count = 0
+ self._traced = {}
+ w (".. MAIN THREAD = ", str(id(self._main)))
+
+ def _init_blocked(self):
+ self._blocked = {} # thread set
+ self._blocked_on = {} # var -> threads
+ self._blocked_byneed = {} # var -> threads
+
+ def _init_head(self, coro):
+ self._head = coro
+ self._head.next = self._head.prev = self._head
+
+ def _set_head(self, thread):
+ self._head = thread
+
+ def _check_initial_conditions(self):
+ try:
+ assert self._head.next == self._head.prev == self._head
+ assert self._head not in self._blocked
+ assert self._head not in self._blocked_on
+ assert self._head not in self._blocked_byneed
+ except:
+ self.display_head()
+ w("BLOCKED", str(self._blocked))
+ all = {}
+ all.update(self._blocked_on)
+ all.update(self._blocked_byneed)
+ w(str(all))
+ raise
+
+ def _chain_insert(self, thread):
+ assert thread.next is None
+ assert thread.prev is None
+ if self._head is None:
+ thread.next = thread
+ thread.prev = thread
+ self._set_head(thread)
+ else:
+ r = self._head
+ l = r.prev
+ l.next = thread
+ r.prev = thread
+ thread.prev = l
+ thread.next = r
+
+ def remove_thread(self, thread):
+ w(".. REMOVING", str(id(thread)))
+ assert thread not in self._blocked
+ del self._traced[thread]
+ l = thread.prev
+ r = thread.next
+ l.next = r
+ r.prev = l
+ if r == thread:
+ if not we_are_translated():
+ import traceback
+ traceback.print_exc()
+ self.display_head()
+ thread.next = thread.next = None
+ return thread
+
+ #-- to be used by logic objspace
+
+ def schedule(self):
+ to_be_run = self._select_next()
+ w(".. SWITCHING", str(id(ClonableCoroutine.w_getcurrent(self.space))), "=>", str(id(to_be_run)))
+ self._switch_count += 1
+ to_be_run.w_switch()
+
+ def schedule_or_pass(self):
+ to_be_run = self._select_next(dont_pass=False)
+ curr = ClonableCoroutine.w_getcurrent(self.space)
+ if to_be_run == curr:
+ w(".. PASS")
+ return
+ w(".. SWITCHING", str(id(curr)), "=>", str(id(to_be_run)))
+ self._switch_count += 1
+ to_be_run.w_switch()
+
+ def _select_next(self, dont_pass=True):
+ to_be_run = self._head
+ sentinel = to_be_run
+ current = ClonableCoroutine.w_getcurrent(self.space)
+ while (to_be_run in self._blocked) \
+ or to_be_run.is_dead() \
+ or (to_be_run == current):
+ to_be_run = to_be_run.next
+ if to_be_run == sentinel:
+ if not dont_pass:
+ return ClonableCoroutine.w_getcurrent(self.space)
+ self.display_head()
+ ## we RESET sched state so as to keep being usable beyond that
+ self._init_head(self._main)
+ self._init_blocked()
+ w(".. SCHEDULER reinitialized")
+ raise OperationError(self.space.w_RuntimeError,
+ self.space.wrap("can't schedule, possible deadlock in sight"))
+ return to_be_run
+
+ #XXX call me directly for this to work translated
+ def __len__(self):
+ "count of known threads (including dead ones)"
+ curr = self._head
+ sentinel = curr
+ count = 1 # there is always a main thread
+ while curr.next != sentinel:
+ curr = curr.next
+ count += 1
+ return count
+
+ def display_head(self):
+ curr = self._head
+ v('Threads : [', '-'.join([str(id(curr)), str(curr in self._blocked)]))
+ while curr.next != self._head:
+ curr = curr.next
+ v('-'.join([str(id(curr)), str(curr in self._blocked)]))
+ w(']')
+
+ def add_new_thread(self, thread):
+ "insert 'thread' at end of running queue"
+ assert isinstance(thread, ClonableCoroutine)
+ self._chain_insert(thread)
+
+ def add_to_blocked_on(self, w_var, uthread):
+ w(".. we BLOCK thread", str(id(uthread)), "on var", str(w_var))
+ assert isinstance(w_var, W_Var)
+ assert isinstance(uthread, ClonableCoroutine)
+ assert uthread not in self._blocked
+ if w_var in self._blocked_on:
+ blocked = self._blocked_on[w_var]
+ else:
+ blocked = []
+ self._blocked_on[w_var] = blocked
+ blocked.append(uthread)
+ self._blocked[uthread] = True
+
+ def unblock_on(self, w_var):
+ v(".. we UNBLOCK threads dependants of var", str(w_var))
+ assert isinstance(w_var, W_Var)
+ blocked = []
+ if w_var in self._blocked_on:
+ blocked = self._blocked_on[w_var]
+ del self._blocked_on[w_var]
+ w(str([id(thr) for thr in blocked]))
+ for thr in blocked: del self._blocked[thr]
+
+ def add_to_blocked_byneed(self, w_var, uthread):
+ w(".. we BLOCK BYNEED thread", str(id(uthread)), "on var", str(w_var))
+ assert isinstance(w_var, W_Var)
+ assert isinstance(uthread, ClonableCoroutine)
+ if w_var in self._blocked_byneed:
+ blocked = self._blocked_byneed[w_var]
+ else:
+ blocked = []
+ self._blocked_byneed[w_var] = blocked
+ blocked.append(uthread)
+ self._blocked[uthread] = True
+
+ def unblock_byneed_on(self, w_var):
+ v(".. we UNBLOCK BYNEED dependants of var", str(w_var))
+ assert isinstance(w_var, W_Var)
+ blocked = []
+ for w_alias in aliases(self.space, w_var):
+ if w_alias in self._blocked_byneed:
+ blocked += self._blocked_byneed[w_alias]
+ del self._blocked_byneed[w_alias]
+ w_alias.needed = True
+ w(str([id(thr) for thr in blocked]))
+ for thr in blocked: del self._blocked[thr]
+
+ # Logic Variables tracing, helps exception propagation
+ # amongst threads
+ def trace_vars(self, thread, lvars):
+ assert isinstance(thread, ClonableCoroutine)
+ assert isinstance(lvars, list)
+ w(".. TRACING logic vars.", str(lvars), "for", str(id(thread)))
+ #assert not self._traced.has_key(thread) doesn't translate
+ self._traced[thread] = lvars
+
+ def dirty_traced_vars(self, thread, failed_value):
+ w(".. DIRTYING traced vars")
+ for w_var in self._traced[thread]:
+ if self.space.is_true(self.space.is_free(w_var)):
+ self.space.bind(w_var, failed_value)
+
+
+#-- Misc --------------------------------------------------
+def reset_scheduler(space):
+ "garbage collection of threads might pose some problems"
+ scheduler[0] = Scheduler(space)
+app_reset_scheduler = gateway.interp2app(reset_scheduler)
+
+def sched_stats(space):
+ sched = scheduler[0]
+ w_ret = space.newdict([])
+ space.setitem(w_ret, space.wrap('switches'), space.wrap(sched._switch_count))
+ space.setitem(w_ret, space.wrap('threads'), space.wrap(sched.__len__()))
+ space.setitem(w_ret, space.wrap('blocked'), space.wrap(len(sched._blocked)))
+ space.setitem(w_ret, space.wrap('blocked_on'), space.wrap(len(sched._blocked_on)))
+ space.setitem(w_ret, space.wrap('blocked_byneed'), space.wrap(len(sched._blocked_byneed)))
+ return w_ret
+app_sched_stats = gateway.interp2app(sched_stats)
+
+
+def schedule(space):
+ "useful til we get preemtive scheduling deep into the vm"
+ scheduler[0].schedule_or_pass()
+app_schedule = gateway.interp2app(schedule)
Added: pypy/dist/pypy/objspace/cclp/space.py
==============================================================================
--- (empty file)
+++ pypy/dist/pypy/objspace/cclp/space.py Mon Jul 31 15:25:07 2006
@@ -0,0 +1,52 @@
+from pypy.interpreter.error import OperationError
+from pypy.objspace.cclp.misc import ClonableCoroutine
+
+class CSpace:
+
+ def __init__(self, space, distributor, parent=None):
+ assert isinstance(distributor, ClonableCoroutine)
+ assert (parent is None) or isinstance(parent, CSpace)
+ self.space = space # the object space ;-)
+ self.parent = parent
+ self.distributor = distributor
+ self.threads = {} # the eventual other threads
+
+ def is_top_level(self):
+ return self.parent is None
+
+## def current_space():
+## #XXX return w_getcurrent().cspace
+## pass
+
+## def newspace():
+## #XXX fork ?
+## pass
+
+ def clone(self):
+ if self.is_top_level():
+ raise OperationError(self.space.w_RuntimeError,
+ self.space.wrap("Clone"+forbidden_boilerplate))
+ new = CSpace(self.distributor.clone(), parent=self)
+ new.distributor.cspace = new
+ for thread in self.threads:
+ tclone = thread.clone()
+ tclone.cspace = new
+ new.threads[tclone] = True
+
+## def choose(self, n):
+## if self.is_top_level():
+## raise OperationError(self.space.w_RuntimeError,
+## self.space.wrap("Choose"+forbidden_boilerplate))
+
+## def ask(self):
+## if self.is_top_level():
+## raise OperationError(self.space.w_RuntimeError,
+## self.space.wrap("Ask"+forbidden_boilerplate))
+## #XXX basically hang until a call to choose, then return n
+
+## def commit(self, n):
+## if self.is_top_level():
+## raise OperationError(self.space.w_RuntimeError,
+## self.space.wrap("Commit"+forbidden_boilerplate))
+## # ensure 0 < n < chosen n
+## # ...
Modified: pypy/dist/pypy/objspace/cclp/thread.py
==============================================================================
--- pypy/dist/pypy/objspace/cclp/thread.py (original)
+++ pypy/dist/pypy/objspace/cclp/thread.py Mon Jul 31 15:25:07 2006
@@ -1,287 +1,58 @@
from pypy.interpreter import gateway, baseobjspace, argument
from pypy.interpreter.error import OperationError
+from pypy.rpython.objectmodel import we_are_translated
-from pypy.objspace.cclp.types import W_Var, W_Future, W_FailedValue, aliases
+from pypy.objspace.cclp.types import W_Var, W_Future, W_FailedValue
from pypy.objspace.cclp.misc import w, v, ClonableCoroutine
-from pypy.module._stackless.coroutine import _AppThunk
-
-#-- Singleton scheduler ------------------------------------------------
-
-scheduler = []
-
-class Scheduler(object):
-
- def __init__(self, space):
- self.space = space
- self._main = ClonableCoroutine.w_getcurrent(space)
- self._init_head(self._main)
- self._init_blocked()
- self._switch_count = 0
- self._traced = {}
- w (".. MAIN THREAD = ", str(id(self._main)))
-
- def _init_blocked(self):
- self._blocked = {} # thread set
- self._blocked_on = {} # var -> threads
- self._blocked_byneed = {} # var -> threads
-
- def _init_head(self, coro):
- self._head = coro
- self._head.next = self._head.prev = self._head
-
- def _set_head(self, thread):
- self._head = thread
-
- def _check_initial_conditions(self):
- try:
- assert self._head.next == self._head.prev == self._head
- assert self._head not in self._blocked
- assert self._head not in self._blocked_on
- assert self._head not in self._blocked_byneed
- except:
- self.display_head()
- w("BLOCKED", str(self._blocked))
- all = {}
- all.update(self._blocked_on)
- all.update(self._blocked_byneed)
- w(str(all))
- raise
-
- def _chain_insert(self, thread):
- assert thread.next is None
- assert thread.prev is None
- if self._head is None:
- thread.next = thread
- thread.prev = thread
- self._set_head(thread)
- else:
- r = self._head
- l = r.prev
- l.next = thread
- r.prev = thread
- thread.prev = l
- thread.next = r
-
- def remove_thread(self, thread):
- w(".. REMOVING", str(id(thread)))
- assert thread not in self._blocked
- del self._traced[thread]
- l = thread.prev
- r = thread.next
- l.next = r
- r.prev = l
- if r == thread:
- import traceback
- traceback.print_exc()
- self.display_head()
- thread.next = thread.next = None
- return thread
-
- #-- to be used by logic objspace
-
- def schedule(self):
- to_be_run = self._select_next()
- w(".. SWITCHING", str(id(ClonableCoroutine.w_getcurrent(self.space))), "=>", str(id(to_be_run)))
- self._switch_count += 1
- to_be_run.w_switch()
-
- def schedule_or_pass(self):
- to_be_run = self._select_next(dont_pass=False)
- curr = ClonableCoroutine.w_getcurrent(self.space)
- if to_be_run == curr:
- w(".. PASS")
- return
- w(".. SWITCHING", str(id(curr)), "=>", str(id(to_be_run)))
- self._switch_count += 1
- to_be_run.w_switch()
-
- def _select_next(self, dont_pass=True):
- to_be_run = self._head
- sentinel = to_be_run
- current = ClonableCoroutine.w_getcurrent(self.space)
- while (to_be_run in self._blocked) \
- or to_be_run.is_dead() \
- or (dont_pass and (to_be_run == current)):
- to_be_run = to_be_run.next
- if to_be_run == sentinel:
- self.display_head()
- ## we RESET sched state so as to keep being usable beyond that
- self._init_head(self._main)
- self._init_blocked()
- w(".. SCHEDULER reinitialized")
- raise OperationError(self.space.w_RuntimeError,
- self.space.wrap("can't schedule, possible deadlock in sight"))
- return to_be_run
-
- #XXX call me directly for this to work translated
- def __len__(self):
- "count of known threads (including dead ones)"
- curr = self._head
- sentinel = curr
- count = 1 # there is always a main thread
- while curr.next != sentinel:
- curr = curr.next
- count += 1
- return count
-
- def display_head(self):
- curr = self._head
- v('Threads : [', '-'.join([str(id(curr)), str(curr in self._blocked)]))
- while curr.next != self._head:
- curr = curr.next
- v('-'.join([str(id(curr)), str(curr in self._blocked)]))
- w(']')
-
- def add_new_thread(self, thread):
- "insert 'thread' at end of running queue"
- assert isinstance(thread, ClonableCoroutine)
- self._chain_insert(thread)
-
- def add_to_blocked_on(self, w_var, uthread):
- w(".. we BLOCK thread", str(id(uthread)), "on var", str(w_var))
- assert isinstance(w_var, W_Var)
- assert isinstance(uthread, ClonableCoroutine)
- assert uthread not in self._blocked
- if w_var in self._blocked_on:
- blocked = self._blocked_on[w_var]
- else:
- blocked = []
- self._blocked_on[w_var] = blocked
- blocked.append(uthread)
- self._blocked[uthread] = True
-
- def unblock_on(self, w_var):
- v(".. we UNBLOCK threads dependants of var", str(w_var))
- assert isinstance(w_var, W_Var)
- blocked = []
- if w_var in self._blocked_on:
- blocked = self._blocked_on[w_var]
- del self._blocked_on[w_var]
- w(str([id(thr) for thr in blocked]))
- for thr in blocked: del self._blocked[thr]
-
- def add_to_blocked_byneed(self, w_var, uthread):
- w(".. we BLOCK BYNEED thread", str(id(uthread)), "on var", str(w_var))
- assert isinstance(w_var, W_Var)
- assert isinstance(uthread, ClonableCoroutine)
- if w_var in self._blocked_byneed:
- blocked = self._blocked_byneed[w_var]
- else:
- blocked = []
- self._blocked_byneed[w_var] = blocked
- blocked.append(uthread)
- self._blocked[uthread] = True
-
- def unblock_byneed_on(self, w_var):
- v(".. we UNBLOCK BYNEED dependants of var", str(w_var))
- assert isinstance(w_var, W_Var)
- blocked = []
- for w_alias in aliases(self.space, w_var):
- if w_alias in self._blocked_byneed:
- blocked += self._blocked_byneed[w_alias]
- del self._blocked_byneed[w_alias]
- w_alias.needed = True
- w(str([id(thr) for thr in blocked]))
- for thr in blocked: del self._blocked[thr]
-
- # Logic Variables tracing, helps exception propagation
- # amongst threads
- def trace_vars(self, thread, lvars):
- assert isinstance(thread, ClonableCoroutine)
- assert isinstance(lvars, list)
- w(".. TRACING logic vars.", str(lvars), "for", str(id(thread)))
- #assert not self._traced.has_key(thread) doesn't translate
- self._traced[thread] = lvars
-
- def dirty_traced_vars(self, thread, failed_value):
- w(".. DIRTYING traced vars")
- for w_var in self._traced[thread]:
- if self.space.is_true(self.space.is_free(w_var)):
- self.space.bind(w_var, failed_value)
-
-
-#-- Thunk -----------------------------------------
-
-class FutureThunk(_AppThunk):
- def __init__(self, space, w_callable, args, w_Result, coro):
- _AppThunk.__init__(self, space, coro.costate, w_callable, args)
- self.w_Result = w_Result
- self._coro = coro
-
- def call(self):
- w(".. initial thunk CALL in", str(id(self._coro)))
- scheduler[0].trace_vars(self._coro, logic_args(self.args.unpack()))
- try:
- try:
- _AppThunk.call(self)
- except Exception, exc:
- w(".. exceptional EXIT of", str(id(self._coro)), "with", str(exc))
- failed_val = W_FailedValue(exc)
- self.space.bind(self.w_Result, failed_val)
- scheduler[0].dirty_traced_vars(self._coro, failed_val)
- self._coro._dead = True
- else:
- w(".. clean EXIT of", str(id(self._coro)),
- "-- setting future result", str(self.w_Result), "to",
- str(self.costate.w_tempval))
- self.space.unify(self.w_Result, self.costate.w_tempval)
- finally:
- scheduler[0].remove_thread(self._coro)
- scheduler[0].schedule()
-
-
-def logic_args(args):
- "returns logic vars found in unpacked normalized args"
- assert isinstance(args, tuple)
- pos = args[0]
- kwa = args[1]
- pos_l = [arg for arg in pos
- if isinstance(arg, W_Var)]
- kwa_l = [arg for arg in kwa.keys()
- if isinstance(arg, W_Var)]
- return pos_l + kwa_l
+from pypy.objspace.cclp.thunk import FutureThunk, ProcedureThunk
+from pypy.objspace.cclp.scheduler import scheduler
#-- Future --------------------------------------------------
def future(space, w_callable, __args__):
"""returns a future result"""
+ #XXX we could be much more lazy wrt coro creation
args = __args__.normalize()
# coro init
coro = ClonableCoroutine(space)
# prepare thread chaining, create missing slots
coro.next = coro.prev = None
+ # computation space is the same as in the parent
+ coro.cspace = ClonableCoroutine.w_getcurrent(space).cspace
# feed the coro
w_Future = W_Future(space)
thunk = FutureThunk(space, w_callable, args, w_Future, coro)
coro.bind(thunk)
w("THREAD", str(id(coro)))
scheduler[0].add_new_thread(coro)
- # XXX we should think about a way to make it read-only for the client
- # (i.e the originator), aka true futures
return w_Future
app_future = gateway.interp2app(future, unwrap_spec=[baseobjspace.ObjSpace,
baseobjspace.W_Root,
argument.Arguments])
-
-# need (applevel) : getcurrent(), getmain(),
-#-- Misc --------------------------------------------------
-
-def sched_stats(space):
- sched = scheduler[0]
- w_ret = space.newdict([])
- space.setitem(w_ret, space.wrap('switches'), space.wrap(sched._switch_count))
- space.setitem(w_ret, space.wrap('threads'), space.wrap(sched.__len__()))
- space.setitem(w_ret, space.wrap('blocked'), space.wrap(len(sched._blocked)))
- space.setitem(w_ret, space.wrap('blocked_on'), space.wrap(len(sched._blocked_on)))
- space.setitem(w_ret, space.wrap('blocked_byneed'), space.wrap(len(sched._blocked_byneed)))
- return w_ret
-app_sched_stats = gateway.interp2app(sched_stats)
+#-- plain Coroutine -----------------------------------------
+def stacklet(space, w_callable, __args__):
+ """returns a coroutine object"""
+ args = __args__.normalize()
+ # coro init
+ coro = ClonableCoroutine(space)
+ # prepare thread chaining, create missing slots
+ coro.next = coro.prev = None
+ # computation space is the same as in the parent
+ coro.cspace = ClonableCoroutine.w_getcurrent(space).cspace
+ thunk = ProcedureThunk(space, w_callable, args, coro)
+ coro.bind(thunk)
+ w("THREAD", str(id(coro)))
+ scheduler[0].add_new_thread(coro)
+ scheduler[0].schedule()
+ return coro
+app_stacklet = gateway.interp2app(stacklet, unwrap_spec=[baseobjspace.ObjSpace,
+ baseobjspace.W_Root,
+ argument.Arguments])
-def schedule(space):
- "useful til we get preemtive scheduling deep into the vm"
- scheduler[0].schedule_or_pass()
-app_schedule = gateway.interp2app(schedule)
+def this_thread(space):
+ return ClonableCoroutine.w_getcurrent(space)
+app_this_thread = gateway.interp2app(this_thread)
Added: pypy/dist/pypy/objspace/cclp/thunk.py
==============================================================================
--- (empty file)
+++ pypy/dist/pypy/objspace/cclp/thunk.py Mon Jul 31 15:25:07 2006
@@ -0,0 +1,70 @@
+from pypy.module._stackless.coroutine import _AppThunk
+from pypy.objspace.cclp.misc import w
+from pypy.objspace.cclp.scheduler import scheduler
+from pypy.objspace.cclp.types import W_Var, W_Future, W_FailedValue
+
+#-- Thunk -----------------------------------------
+
+class ProcedureThunk(_AppThunk):
+ def __init__(self, space, w_callable, args, coro):
+ _AppThunk.__init__(self, space, coro.costate, w_callable, args)
+ self._coro = coro
+
+ def call(self):
+ w(".! initial (returnless) thunk CALL in", str(id(self._coro)))
+ scheduler[0].trace_vars(self._coro, logic_args(self.args.unpack()))
+ try:
+ try:
+ _AppThunk.call(self)
+ except Exception, exc:
+ w(".! exceptional EXIT of", str(id(self._coro)), "with", str(exc))
+ scheduler[0].dirty_traced_vars(self._coro, W_FailedValue(exc))
+ self._coro._dead = True
+ else:
+ w(".! clean (valueless) EXIT of", str(id(self._coro)),
+ "-- setting future result", str(self.w_Result), "to",
+ str(self.costate.w_tempval))
+ finally:
+ scheduler[0].remove_thread(self._coro)
+ scheduler[0].schedule()
+
+
+class FutureThunk(_AppThunk):
+ def __init__(self, space, w_callable, args, w_Result, coro):
+ _AppThunk.__init__(self, space, coro.costate, w_callable, args)
+ self.w_Result = w_Result
+ self._coro = coro
+
+ def call(self):
+ w(".! initial thunk CALL in", str(id(self._coro)))
+ scheduler[0].trace_vars(self._coro, logic_args(self.args.unpack()))
+ try:
+ try:
+ _AppThunk.call(self)
+ except Exception, exc:
+ w(".! exceptional EXIT of", str(id(self._coro)), "with", str(exc))
+ failed_val = W_FailedValue(exc)
+ self.space.bind(self.w_Result, failed_val)
+ scheduler[0].dirty_traced_vars(self._coro, failed_val)
+ self._coro._dead = True
+ else:
+ w(".! clean EXIT of", str(id(self._coro)),
+ "-- setting future result", str(self.w_Result), "to",
+ str(self.costate.w_tempval))
+ self.space.unify(self.w_Result, self.costate.w_tempval)
+ finally:
+ scheduler[0].remove_thread(self._coro)
+ scheduler[0].schedule()
+
+
+def logic_args(args):
+ "returns logic vars found in unpacked normalized args"
+ assert isinstance(args, tuple)
+ pos = args[0]
+ kwa = args[1]
+ pos_l = [arg for arg in pos
+ if isinstance(arg, W_Var)]
+ kwa_l = [arg for arg in kwa.keys()
+ if isinstance(arg, W_Var)]
+ return pos_l + kwa_l
+
Modified: pypy/dist/pypy/objspace/cclp/variable.py
==============================================================================
--- pypy/dist/pypy/objspace/cclp/variable.py (original)
+++ pypy/dist/pypy/objspace/cclp/variable.py Mon Jul 31 15:25:07 2006
@@ -5,7 +5,7 @@
from pypy.objspace.std.dictobject import W_DictObject
from pypy.objspace.cclp.misc import w, v, ClonableCoroutine
-from pypy.objspace.cclp.thread import scheduler
+from pypy.objspace.cclp.scheduler import scheduler
from pypy.objspace.cclp.types import deref, W_Var, W_Future, W_FailedValue
@@ -29,7 +29,6 @@
scheduler[0].unblock_byneed_on(w_var)
scheduler[0].add_to_blocked_on(w_var, ClonableCoroutine.w_getcurrent(space))
scheduler[0].schedule()
- print "pooooopp ?", id(ClonableCoroutine.w_getcurrent(space))
assert space.is_true(space.is_bound(w_var))
w_ret = w_var.w_bound_to
if isinstance(w_ret, W_FailedValue):
Modified: pypy/dist/pypy/objspace/logic.py
==============================================================================
--- pypy/dist/pypy/objspace/logic.py (original)
+++ pypy/dist/pypy/objspace/logic.py Mon Jul 31 15:25:07 2006
@@ -14,8 +14,10 @@
#-- THREADING/COROUTINING -----------------------------------
-from pypy.objspace.cclp.thread import Scheduler, app_future, app_sched_stats, \
- app_schedule, scheduler
+from pypy.objspace.cclp.thread import app_future, app_stacklet, app_this_thread
+
+from pypy.objspace.cclp.scheduler import Scheduler, scheduler, app_sched_stats, \
+ app_schedule, app_reset_scheduler
#-- VARIABLE ------------------------------------------------
@@ -246,6 +248,8 @@
#-- threading --
space.setitem(space.builtin.w_dict, space.wrap('future'),
space.wrap(app_future))
+ space.setitem(space.builtin.w_dict, space.wrap('stacklet'),
+ space.wrap(app_stacklet))
space.setitem(space.builtin.w_dict, space.wrap('wait'),
space.wrap(app_wait))
space.setitem(space.builtin.w_dict, space.wrap('wait_needed'),
@@ -254,6 +258,10 @@
space.wrap(app_sched_stats))
space.setitem(space.builtin.w_dict, space.wrap('schedule'),
space.wrap(app_schedule))
+ space.setitem(space.builtin.w_dict, space.wrap('this_thread'),
+ space.wrap(app_this_thread))
+ space.setitem(space.builtin.w_dict, space.wrap('reset_scheduler'),
+ space.wrap(app_reset_scheduler))
#-- misc -----
space.setitem(space.builtin.w_dict, space.wrap('interp_id'),
Modified: pypy/dist/pypy/objspace/test/test_logicobjspace.py
==============================================================================
--- pypy/dist/pypy/objspace/test/test_logicobjspace.py (original)
+++ pypy/dist/pypy/objspace/test/test_logicobjspace.py Mon Jul 31 15:25:07 2006
@@ -1,6 +1,6 @@
from pypy.conftest import gettestobjspace
from py.test import skip
-
+
class AppTest_Logic(object):
def setup_class(cls):
@@ -196,13 +196,14 @@
raises(Exception, unify, f1.b, 24)
-class AppTest_LogicThreads(object):
+class AppTest_LogicFutures(object):
def setup_class(cls):
cls.space = gettestobjspace('logic', usemodules=("_stackless",))
def test_future_value(self):
-
+ print "future value", sched_stats()
+
def poop(X):
return X + 1
@@ -210,7 +211,6 @@
Y = future(poop, X)
unify(X, 42)
assert Y == 43
- print "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAa"
assert sched_stats()['threads'] == 1
X = newvar()
@@ -233,6 +233,7 @@
assert sched_stats()['threads'] == 3
def test_one_future_exception(self):
+ print "one future exception", sched_stats()
class FooException(Exception): pass
def poop(X):
@@ -249,6 +250,7 @@
assert False
def test_exception_in_chain(self):
+ print "exception in chain", sched_stats()
class FooException(Exception): pass
def raise_foo():
@@ -272,6 +274,7 @@
assert False
def test_exception_in_group(self):
+ print "exception in groups", sched_stats()
class FooException(Exception): pass
def loop_or_raise(Canary, crit, Bomb_signal):
@@ -303,6 +306,7 @@
"""check that a wait nested in a tree of
threads works correctly
"""
+ print "nested threads", sched_stats()
def sleep(X):
wait(X)
return X
@@ -318,6 +322,7 @@
assert v == 42
def test_wait_needed(self):
+ print "wait_needed", sched_stats()
X = newvar()
def binder(V):
@@ -334,6 +339,7 @@
assert X == 42
def test_eager_producer_consummer(self):
+ print "eager_producer_consummer", sched_stats()
def generate(n, limit):
if n < limit:
@@ -355,6 +361,7 @@
def test_lazy_producer_consummer(self):
+ print "lazy_producer_consummer", sched_stats()
def lgenerate(n, L):
"""wait-needed version of generate"""
@@ -383,6 +390,8 @@
assert T == 45
def test_wait_two(self):
+ print "wait_two", sched_stats()
+
def sleep(X, Barrier):
wait(X)
bind(Barrier, True)
@@ -424,3 +433,36 @@
print F
except Exception, e:
print e
+
+ def test_stacklet(self):
+
+ print "stacklet", sched_stats()
+ reset_scheduler()
+ #XXX each of the previous test decorates
+ # the scheduler with unreclaimed stuff
+ # In this case, side-effect happen. Nasty.
+
+ count = [0]
+
+ def inc_and_greet(count, max_, Finished, Failed):
+ if count[0] >= max_:
+ count[0] += 1
+ bind(Finished, count[0])
+ return
+ count[0] += 1
+
+ Finished, Failed = newvar(), newvar()
+ max_spawn = 2
+ erring = 3
+ for i in range(max_spawn + erring):
+ stacklet(inc_and_greet, count, max_spawn, Finished, Failed)
+
+ wait(Finished)
+ assert count[0] == max_spawn + erring
+ try:
+ wait(Failed)
+ except Exception, e: # Unification Failure
+ assert sched_stats()['threads'] == 1
+ return
+ assert False
+
More information about the Pypy-commit
mailing list