[pypy-svn] r33462 - pypy/dist/pypy/objspace/cclp
auc at codespeak.net
auc at codespeak.net
Thu Oct 19 16:50:32 CEST 2006
Author: auc
Date: Thu Oct 19 16:50:27 2006
New Revision: 33462
Modified:
pypy/dist/pypy/objspace/cclp/misc.py
pypy/dist/pypy/objspace/cclp/scheduler.py
pypy/dist/pypy/objspace/cclp/space.py
pypy/dist/pypy/objspace/cclp/thread.py
pypy/dist/pypy/objspace/cclp/types.py
pypy/dist/pypy/objspace/cclp/variable.py
Log:
now space cloning works (but not toroughly tested) -- got rid of clonable coroutines
Modified: pypy/dist/pypy/objspace/cclp/misc.py
==============================================================================
--- pypy/dist/pypy/objspace/cclp/misc.py (original)
+++ pypy/dist/pypy/objspace/cclp/misc.py Thu Oct 19 16:50:27 2006
@@ -3,6 +3,7 @@
# commonly imported there, used from types, variable, thread
from pypy.module._stackless.clonable import ClonableCoroutine
+from pypy.module._stackless.coroutine import AppCoroutine
import os
@@ -25,8 +26,8 @@
os.write(1, ' ')
def get_current_cspace(space):
- curr = ClonableCoroutine.w_getcurrent(space)
- assert isinstance(curr, ClonableCoroutine)
+ curr = AppCoroutine.w_getcurrent(space)
+ assert isinstance(curr, AppCoroutine)
if curr._cspace is None:
if not we_are_translated():
import pdb
Modified: pypy/dist/pypy/objspace/cclp/scheduler.py
==============================================================================
--- pypy/dist/pypy/objspace/cclp/scheduler.py (original)
+++ pypy/dist/pypy/objspace/cclp/scheduler.py Thu Oct 19 16:50:27 2006
@@ -4,7 +4,7 @@
from pypy.objspace.std.listobject import W_ListObject
from pypy.objspace.cclp.types import W_Var, W_FailedValue, aliases
-from pypy.objspace.cclp.misc import w, v, ClonableCoroutine, get_current_cspace
+from pypy.objspace.cclp.misc import w, v, AppCoroutine, get_current_cspace
from pypy.objspace.cclp.global_state import sched
#-- Singleton scheduler ------------------------------------------------
@@ -22,18 +22,18 @@
self._head._next = self._head._prev = self._head
# asking for stability
self._asking = {} # cspace -> thread set
- self._asking[top_level_space] = {}
+ self._asking[top_level_space] = {} # XXX
# variables suspension lists
self._blocked = {}
self._blocked_on = {} # var -> threads
self._blocked_byneed = {} # var -> threads
def _chain_insert(self, group):
- assert group._next is group
- assert group._prev is group
- assert isinstance(group, W_ThreadGroupScheduler)
- assert isinstance(group._next, W_ThreadGroupScheduler)
- assert isinstance(group._prev, W_ThreadGroupScheduler)
+ assert group._next is group, "group._next not correctly linked"
+ assert group._prev is group, "group._prev not correctly linked"
+ assert isinstance(group, W_ThreadGroupScheduler), "type error"
+ assert isinstance(group._next, W_ThreadGroupScheduler), "type error"
+ assert isinstance(group._prev, W_ThreadGroupScheduler), "type error"
r = self._head
l = r._prev
l._next = group
@@ -43,7 +43,7 @@
def schedule(self):
to_be_run = self._select_next()
- assert isinstance(to_be_run, W_ThreadGroupScheduler)
+ assert isinstance(to_be_run, W_ThreadGroupScheduler), "type error"
#w(".. SWITCHING (spaces)", str(id(get_current_cspace(self.space))), "=>", str(id(to_be_run)))
self._switch_count += 1
to_be_run.schedule()
@@ -56,7 +56,7 @@
if to_be_run.is_runnable():
break
to_be_run = to_be_run._next
- assert isinstance(to_be_run, W_ThreadGroupScheduler)
+ assert isinstance(to_be_run, W_ThreadGroupScheduler), "type error"
if to_be_run == sentinel:
reset_scheduler(self.space)
w(".. SCHEDULER reinitialized")
@@ -68,13 +68,13 @@
def add_new_group(self, group):
"insert 'group' at end of running queue"
- assert isinstance(group, W_ThreadGroupScheduler)
+ assert isinstance(group, W_ThreadGroupScheduler), "type error"
w(".. ADDING group", str(id(group)))
self._asking[group] = {}
self._chain_insert(group)
def remove_group(self, group):
- assert isinstance(group, W_ThreadGroupScheduler)
+ assert isinstance(group, W_ThreadGroupScheduler), "type error"
w(".. REMOVING group", str(id(group)))
l = group._prev
r = group._next
@@ -96,8 +96,8 @@
def add_to_blocked_on(self, w_var, thread):
w(".. we BLOCK thread", str(id(thread)), "on var", str(w_var))
- assert isinstance(w_var, W_Var)
- assert isinstance(thread, ClonableCoroutine)
+ assert isinstance(w_var, W_Var), "type error"
+ assert isinstance(thread, AppCoroutine), "type error"
assert thread not in self._blocked
if w_var in self._blocked_on:
blocked = self._blocked_on[w_var]
@@ -112,7 +112,7 @@
def unblock_on(self, w_var):
v(".. we UNBLOCK threads dependants of var", str(w_var))
- assert isinstance(w_var, W_Var)
+ assert isinstance(w_var, W_Var), "type error"
blocked = []
if w_var in self._blocked_on:
blocked = self._blocked_on[w_var]
@@ -125,8 +125,8 @@
#XXX sync the un/block byneed stuff with above, later
def add_to_blocked_byneed(self, w_var, thread):
w(".. we BLOCK BYNEED thread", str(id(thread)), "on var", str(w_var))
- assert isinstance(w_var, W_Var)
- assert isinstance(thread, ClonableCoroutine)
+ assert isinstance(w_var, W_Var), "type error"
+ assert isinstance(thread, AppCoroutine), "type error"
if w_var in self._blocked_byneed:
blocked = self._blocked_byneed[w_var]
else:
@@ -138,7 +138,7 @@
def unblock_byneed_on(self, w_var):
v(".. we UNBLOCK BYNEED dependants of var", str(w_var))
- assert isinstance(w_var, W_Var)
+ assert isinstance(w_var, W_Var), "type error"
blocked = []
for w_alias in aliases(self.space, w_var):
if w_alias in self._blocked_byneed:
@@ -190,10 +190,10 @@
si = self.space.setitem
w_all = s.newdict()
si(w_all, s.newint(id(self._head)), self._head.group_info())
- assert isinstance(self._head, W_ThreadGroupScheduler)
+ assert isinstance(self._head, W_ThreadGroupScheduler), "type error"
curr = self._head._next
while curr != self._head:
- assert isinstance(curr, W_ThreadGroupScheduler)
+ assert isinstance(curr, W_ThreadGroupScheduler), "type error"
si(w_all, s.newint(id(curr)), curr.group_info())
curr = curr._next
si(w_all, s.wrap('blocked'), self.w_blocked())
@@ -240,18 +240,18 @@
self.blocked_count = 0
def _init_head(self, thread):
- assert isinstance(thread, ClonableCoroutine)
+ assert isinstance(thread, AppCoroutine), "type error"
self._head = thread
thread._next = thread._prev = thread
assert self._head._next == self._head
w("HEAD (main) THREAD = ", str(id(self._head)))
def _chain_insert(self, thread):
- assert thread._next is thread
- assert thread._prev is thread
- assert isinstance(thread, ClonableCoroutine)
- assert isinstance(thread._next, ClonableCoroutine)
- assert isinstance(thread._prev, ClonableCoroutine)
+ assert thread._next is thread, "thread._next not correctly linked"
+ assert thread._prev is thread, "thread._prev not correctly linked"
+ assert isinstance(thread, AppCoroutine), "type error"
+ assert isinstance(thread._next, AppCoroutine), "type error"
+ assert isinstance(thread._prev, AppCoroutine), "type error"
r = self._head
l = r._prev
l._next = thread
@@ -281,8 +281,8 @@
str(id(get_current_cspace(self.space))))
if self.is_stable():
return
- curr = ClonableCoroutine.w_getcurrent(self.space)
- assert isinstance(curr, ClonableCoroutine)
+ curr = AppCoroutine.w_getcurrent(self.space)
+ assert isinstance(curr, AppCoroutine), "type error"
asking = sched.uler._asking
if self in asking:
asking[self][curr] = True
@@ -297,10 +297,10 @@
raise OperationError(self.space.w_AllBlockedError,
self.space.wrap("ouch, that's a BUG"))
to_be_run = self._select_next()
- if to_be_run == ClonableCoroutine.w_getcurrent(self.space):
+ if to_be_run == AppCoroutine.w_getcurrent(self.space):
return
- assert isinstance(to_be_run, ClonableCoroutine)
- #w(".. SWITCHING (treads)", str(id(ClonableCoroutine.w_getcurrent(self.space))), "=>", str(id(to_be_run)))
+ assert isinstance(to_be_run, AppCoroutine), "type error"
+ #w(".. SWITCHING (treads)", str(id(AppCoroutine.w_getcurrent(self.space))), "=>", str(id(to_be_run)))
self._switch_count += 1
to_be_run.w_switch()
@@ -314,7 +314,7 @@
th._cspace.blocked_count -= 1
sched.uler._asking[self] = {}
break
- assert isinstance(to_be_run, ClonableCoroutine)
+ assert isinstance(to_be_run, AppCoroutine), "type error"
to_be_run = to_be_run._next
if to_be_run == sentinel:
if not we_are_translated():
@@ -325,13 +325,13 @@
def add_new_thread(self, thread):
"insert 'thread' at end of running queue"
- w(".. ADDING thread", str(id(thread)), "to group", str(id(self)), "count ==", str(self.thread_count))
- assert isinstance(thread, ClonableCoroutine)
+ w(".. ADDING thread", str(id(thread)), "to group", str(id(self)))
+ assert isinstance(thread, AppCoroutine), "type error"
self._chain_insert(thread)
self.thread_count += 1
def remove_thread(self, thread):
- assert isinstance(thread, ClonableCoroutine)
+ assert isinstance(thread, AppCoroutine)
w(".. REMOVING thread", str(id(thread)))
assert thread not in sched.uler._blocked
try:
@@ -355,14 +355,14 @@
# Logic Variables tracing, "accelerates" exception propagation
# amongst threads
def trace_vars(self, thread, lvars):
- assert isinstance(thread, ClonableCoroutine)
- assert isinstance(lvars, list)
+ assert isinstance(thread, AppCoroutine), "type error"
+ assert isinstance(lvars, list), "type error"
#w(".. TRACING logic vars.", str(lvars), "for", str(id(thread)))
#assert not self._traced.has_key(thread) doesn't translate
self._traced[thread] = lvars
def dirty_traced_vars(self, thread, failed_value):
- assert isinstance(thread, ClonableCoroutine)
+ assert isinstance(thread, AppCoroutine)
assert isinstance(failed_value, W_FailedValue)
#w(".. DIRTYING traced vars")
for w_var in self._traced[thread]:
@@ -372,10 +372,10 @@
def w_threads(self):
s = self.space
thl = [s.newint(id(self._head))]
- assert isinstance(self._head, ClonableCoroutine)
+ assert isinstance(self._head, AppCoroutine)
curr = self._head._next
while curr != self._head:
- assert isinstance(curr, ClonableCoroutine)
+ assert isinstance(curr, AppCoroutine)
thl.append(s.newint(id(curr)))
curr = curr._next
w_t = W_ListObject(thl)
Modified: pypy/dist/pypy/objspace/cclp/space.py
==============================================================================
--- pypy/dist/pypy/objspace/cclp/space.py (original)
+++ pypy/dist/pypy/objspace/cclp/space.py Thu Oct 19 16:50:27 2006
@@ -10,7 +10,7 @@
from pypy.module._stackless.interp_coroutine import AbstractThunk
-from pypy.objspace.cclp.misc import ClonableCoroutine, get_current_cspace, w
+from pypy.objspace.cclp.misc import AppCoroutine, get_current_cspace, w
from pypy.objspace.cclp.thunk import CSpaceThunk, PropagatorThunk
from pypy.objspace.cclp.global_state import sched
from pypy.objspace.cclp.variable import newvar
@@ -20,47 +20,19 @@
from pypy.objspace.cclp.constraint.distributor import distribute
from pypy.objspace.cclp.scheduler import W_ThreadGroupScheduler
+from pypy.rpython.rgc import gc_swap_pool, gc_clone
-class NewSpaceThunk(AbstractThunk):
- "container thread for one comp. space"
- def __init__(self, space, w_callable, __args__, thread):
- self.space = space
- self.thread = thread
- self.cspace = None
- self.callable = w_callable
- self.args = __args__
-
- def call(self):
- try:
- self.cspace = _newspace(self.space,
- self.callable,
- self.args)
- self.space.wait(self.cspace._finished)
- finally:
- sched.uler.remove_thread(self.thread)
- sched.uler.schedule()
-
-def _newspace(space, w_callable, __args__):
+def newspace(space, w_callable, __args__):
+ "application level creation of a new computation space"
args = __args__.normalize()
- dist_thread = ClonableCoroutine(space)
+ dist_thread = AppCoroutine(space)
+ dist_thread._next = dist_thread._prev = dist_thread
thunk = CSpaceThunk(space, w_callable, args, dist_thread)
dist_thread.bind(thunk)
if not we_are_translated():
w("NEWSPACE, (distributor) thread", str(id(dist_thread)), "for", str(w_callable.name))
w_space = W_CSpace(space, dist_thread)
return w_space
-
-def newspace(space, w_callable, __args__):
- "application level creation of a new computation space"
- outer_thread = ClonableCoroutine(space)
- outer_thread._cspace = get_current_cspace(space)
- thunk = NewSpaceThunk(space, w_callable, __args__, outer_thread)
- outer_thread.bind(thunk)
- sched.uler.add_new_thread(outer_thread)
- sched.uler.schedule() # XXX assumption: thread will be executed before we get back there
- cspace = thunk.cspace
- cspace._container = outer_thread
- return cspace
app_newspace = gateway.interp2app(newspace, unwrap_spec=[baseobjspace.ObjSpace,
baseobjspace.W_Root,
argument.Arguments])
@@ -99,6 +71,7 @@
class W_CSpace(W_ThreadGroupScheduler):
+ local_pool = None
def __init__(self, space, dist_thread):
W_ThreadGroupScheduler.__init__(self, space)
@@ -125,63 +98,20 @@
self._store[cvar.name] = cvar
def w_clone(self):
- if not we_are_translated():
- raise NotImplementedError
- # build fresh cspace & distributor thread
- #dist_thread = ClonableCoroutine(self.space)
- #new_cspace = W_CSpace(self.space, dist_thread)
- #dist_thread._cspace = new_cspace
- # new distributor instance
- #old_dist = self.distributor
- #new_dist = old_dist.__class__(self.space, old_dist._fanout)
- #new_dist._cspace = new_cspace
- #new_cspace.distributor = new_dist
- # copy the store
- #for var in self._store.values():
- # new_cspace.register_var(var.copy(self.space))
- # new distributor thunk, binding
- #f = Function(self.space,
- # app_fresh_distributor._code,
- # self.space.newdict())
- #thunk = CSpaceThunk(self.space, f,
- # argument.Arguments(self.space, [new_dist]),
- # dist_thread)
- #dist_thread.bind(thunk)
- # relinking to scheduler
- #sched.uler.add_new_group(new_cspace)
- #self.add_new_thread(dist_thread)
- # rebuild propagators
- #for const in self._constraints:
- # ccopy = const.copy(self.space)
- # ccopy._cspace = new_cspace
- # new_cspace.tell(const)
- # duh
- #new_cspace._last_choice = self._last_choice
- # copy solution variables
- #self.space.unify(new_cspace._solution,
- # self.space.newlist([var for var in new_cspace._store.values()]))
- #new_cspace.wait_stable()
- #return new_cspace
- else:
- # the theory is that
- # a) we create a (clonable) container thread for any 'newspace'
- # b) at clone-time, we clone that container, hoping that
- # indeed everything will come with it
- w("cloning the container thread")
- everything = self._container.w_clone()
- w("getting the fresh cspace from it")
- new_cspace = everything._cspace
- w("registering the container clone to top level cspace scheduler")
- sched.main_thread._cspace.add_new_thread(everything)
- w("add container clone to blocked on cspace clone variable _finished")
- # we crash below
- sched.uler.add_to_blocked_on(new_cspace._finished, everything)
- # however, we need to keep track of all threads created
- # from 'within' the space (propagators, or even app-level threads)
- # -> cspaces as thread groups
- w("add cloned cspace to new group")
+ if we_are_translated():
+ w("<> cloning the space")
+ if self.local_pool is None:
+ self.local_pool = gc_swap_pool(gc_swap_pool(None))
+ new_cspace, new_cspace.local_pool = gc_clone(self, self.local_pool)
+ w("<> add cloned cspace to new group")
+ assert isinstance(new_cspace, W_CSpace)
+ new_cspace._next = new_cspace._prev = new_cspace
sched.uler.add_new_group(new_cspace)
+ w("<> returning clone ")
return new_cspace
+ else:
+ raise NotImplementedError
+
def w_ask(self):
self.wait_stable()
@@ -217,7 +147,8 @@
def tell(self, w_constraint):
space = self.space
- w_coro = ClonableCoroutine(space)
+ w_coro = AppCoroutine(space)
+ w_coro._next = w_coro._prev = w_coro
w_coro._cspace = self
thunk = PropagatorThunk(space, w_constraint, w_coro)
w_coro.bind(thunk)
Modified: pypy/dist/pypy/objspace/cclp/thread.py
==============================================================================
--- pypy/dist/pypy/objspace/cclp/thread.py (original)
+++ pypy/dist/pypy/objspace/cclp/thread.py Thu Oct 19 16:50:27 2006
@@ -2,7 +2,7 @@
from pypy.rpython.objectmodel import we_are_translated
from pypy.objspace.cclp.types import W_Var, W_Future, W_FailedValue
-from pypy.objspace.cclp.misc import w, v, ClonableCoroutine, get_current_cspace
+from pypy.objspace.cclp.misc import w, v, AppCoroutine, get_current_cspace
from pypy.objspace.cclp.thunk import FutureThunk, ProcedureThunk
from pypy.objspace.cclp.global_state import sched
@@ -13,7 +13,8 @@
"""returns a future result"""
#XXX we could be much more lazy wrt coro creation
args = __args__.normalize()
- coro = ClonableCoroutine(space)
+ coro = AppCoroutine(space)
+ coro._next = coro._prev = coro
w_Future = W_Future(space)
thunk = FutureThunk(space, w_callable, args, w_Future, coro)
coro.bind(thunk)
@@ -31,7 +32,8 @@
def stacklet(space, w_callable, __args__):
"""returns a coroutine object"""
args = __args__.normalize()
- coro = ClonableCoroutine(space)
+ coro = AppCoroutine(space)
+ coro._next = coro._prev = coro
thunk = ProcedureThunk(space, w_callable, args, coro)
coro.bind(thunk)
coro._cspace = get_current_cspace(space)
@@ -46,5 +48,5 @@
def this_thread(space):
- return ClonableCoroutine.w_getcurrent(space)
+ return AppCoroutine.w_getcurrent(space)
app_this_thread = gateway.interp2app(this_thread)
Modified: pypy/dist/pypy/objspace/cclp/types.py
==============================================================================
--- pypy/dist/pypy/objspace/cclp/types.py (original)
+++ pypy/dist/pypy/objspace/cclp/types.py Thu Oct 19 16:50:27 2006
@@ -1,7 +1,7 @@
from pypy.interpreter import baseobjspace, gateway, typedef
from pypy.interpreter.error import OperationError
-from pypy.objspace.cclp.misc import w, ClonableCoroutine, get_current_cspace
+from pypy.objspace.cclp.misc import w, AppCoroutine, get_current_cspace
W_Root = baseobjspace.W_Root
@@ -31,7 +31,7 @@
"a read-only-by-its-consummer variant of logic. var"
def __init__(w_self, space):
W_Var.__init__(w_self, space)
- w_self._client = ClonableCoroutine.w_getcurrent(space)
+ w_self._client = AppCoroutine.w_getcurrent(space)
w("FUT", str(w_self))
Modified: pypy/dist/pypy/objspace/cclp/variable.py
==============================================================================
--- pypy/dist/pypy/objspace/cclp/variable.py (original)
+++ pypy/dist/pypy/objspace/cclp/variable.py Thu Oct 19 16:50:27 2006
@@ -5,7 +5,7 @@
from pypy.objspace.std.dictobject import W_DictObject
from pypy.objspace.std.stringobject import W_StringObject
-from pypy.objspace.cclp.misc import w, v, ClonableCoroutine
+from pypy.objspace.cclp.misc import w, v, AppCoroutine
from pypy.objspace.cclp.global_state import sched
from pypy.objspace.cclp.types import deref, W_Var, W_CVar, W_Future, W_FailedValue
@@ -27,10 +27,10 @@
return w_obj
def wait__Var(space, w_var):
- #w("###:wait", str(id(ClonableCoroutine.w_getcurrent(space))))
+ #w("###:wait", str(id(AppCoroutine.w_getcurrent(space))))
if space.is_true(space.is_free(w_var)):
sched.uler.unblock_byneed_on(w_var)
- sched.uler.add_to_blocked_on(w_var, ClonableCoroutine.w_getcurrent(space))
+ sched.uler.add_to_blocked_on(w_var, AppCoroutine.w_getcurrent(space))
sched.uler.schedule()
assert space.is_true(space.is_bound(w_var))
w_ret = w_var.w_bound_to
@@ -53,11 +53,11 @@
#-- Wait_needed --------------------------------------------
def wait_needed__Var(space, w_var):
- #w(":wait_needed", str(id(ClonableCoroutine.w_getcurrent(space))))
+ #w(":wait_needed", str(id(AppCoroutine.w_getcurrent(space))))
if space.is_true(space.is_free(w_var)):
if w_var.needed:
return
- sched.uler.add_to_blocked_byneed(w_var, ClonableCoroutine.w_getcurrent(space))
+ sched.uler.add_to_blocked_byneed(w_var, AppCoroutine.w_getcurrent(space))
sched.uler.schedule()
else:
raise OperationError(space.w_TypeError,
@@ -194,7 +194,7 @@
def bind__Future_Root(space, w_fut, w_obj):
#v("future val", str(id(w_fut)))
- if w_fut._client == ClonableCoroutine.w_getcurrent(space):
+ if w_fut._client == AppCoroutine.w_getcurrent(space):
raise_future_binding(space)
return bind__Var_Root(space, w_fut, w_obj) # call-next-method ?
@@ -216,7 +216,7 @@
def bind__Future_Var(space, w_fut, w_var):
#v("future var")
- if w_fut._client == ClonableCoroutine.w_getcurrent(space):
+ if w_fut._client == AppCoroutine.w_getcurrent(space):
raise_future_binding(space)
return bind__Var_Var(space, w_fut, w_var)
@@ -224,7 +224,7 @@
def bind__Var_Future(space, w_var, w_fut):
if space.is_true(space.is_bound(w_fut)): #XXX write a test for me !
return bind__Var_Root(space, w_var, deref(space, w_fut))
- if w_fut._client == ClonableCoroutine.w_getcurrent(space):
+ if w_fut._client == AppCoroutine.w_getcurrent(space):
raise_future_binding(space)
return bind__Var_Var(space, w_var, w_fut) #and for me ...
More information about the Pypy-commit
mailing list