[pypy-svn] r24320 - pypy/dist/pypy/lib/logic/computation_space
auc at codespeak.net
auc at codespeak.net
Mon Mar 13 22:50:24 CET 2006
Author: auc
Date: Mon Mar 13 22:50:23 2006
New Revision: 24320
Modified:
pypy/dist/pypy/lib/logic/computation_space/test_computationspace.py
pypy/dist/pypy/lib/logic/computation_space/test_variable.py
pypy/dist/pypy/lib/logic/computation_space/variable.py
Log:
added waitneeded
some refactoring, still one test to fix
Modified: pypy/dist/pypy/lib/logic/computation_space/test_computationspace.py
==============================================================================
--- pypy/dist/pypy/lib/logic/computation_space/test_computationspace.py (original)
+++ pypy/dist/pypy/lib/logic/computation_space/test_computationspace.py Mon Mar 13 22:50:23 2006
@@ -36,6 +36,12 @@
x = sp.var('x')
raises(v.AlreadyInStore, sp.var, 'x')
+ def test_get_by_name(self):
+ sp = newspace()
+ x = sp.var('x')
+ assert x == sp.get_var_by_name('x')
+ raises(space.NotInStore, sp.get_var_by_name, 'y')
+
def test_already_bound(self):
sp = newspace()
x = sp.var('x')
Modified: pypy/dist/pypy/lib/logic/computation_space/test_variable.py
==============================================================================
--- pypy/dist/pypy/lib/logic/computation_space/test_variable.py (original)
+++ pypy/dist/pypy/lib/logic/computation_space/test_variable.py Mon Mar 13 22:50:23 2006
@@ -3,9 +3,7 @@
from py.test import raises
-import computationspace as space
-import variable as v
-from problems import dummy_problem
+from variable import var, NoValue, AlreadyBound
#-- utilities ---------------------
@@ -35,21 +33,17 @@
def run(self):
val = [var.wait() for var in self.vars]
-def newspace():
- return space.ComputationSpace(dummy_problem)
-
-
#-- meat ----------------------------
class TestSimpleVariable:
def test_basics(self):
- x = v.SimpleVar()
- assert x.val == v.NoValue
+ x = var()
+ assert x.val == NoValue
x.bind(42)
assert x.val == 42
x.bind(42)
- raises(v.AlreadyBound, x.bind, 43)
+ raises(AlreadyBound, x.bind, 43)
def test_dataflow(self):
def fun(thread, var):
@@ -57,7 +51,7 @@
v = var.wait()
thread.state = v
- x = v.SimpleVar()
+ x = var()
t = FunThread(fun, x)
import time
t.start()
@@ -74,67 +68,18 @@
thread.res += v[0]
consummer(thread, v[1])
- S = v.SimpleVar()
+ S = var()
t = FunThread(consummer, S)
t.res = 0
t.start()
for i in range(10):
- tail = v.SimpleVar()
+ tail = var()
S.bind((i, tail))
S = tail
S.bind(None)
t.join()
assert t.res == 45
-class TestCsVariable:
-
- def test_no_same_name(self):
- sp = newspace()
- x = sp.var('x')
- raises(space.AlreadyInStore, sp.var, 'x')
-
- def test_get_by_name(self):
- sp = newspace()
- x = sp.var('x')
- assert x == sp.get_var_by_name('x')
- raises(space.NotInStore, sp.get_var_by_name, 'y')
-
- def test_one_thread_reading_one_var(self):
- sp = newspace()
- cons = Consumer()
- x = sp.var('x')
- cons.give_var(x)
- cons.start()
- sp.bind(x, 42)
- cons.join()
- assert cons.var.val == 42
-
- def test_many_threads_reading_one_var(self):
- sp = newspace()
- conss = [Consumer() for i in range(10)]
- x = sp.var('x')
- for cons in conss:
- cons.give_var(x)
- cons.start()
- sp.bind(x, 42)
- for cons in conss:
- cons.join()
- assert cons.var.val == 42
-
- def test_many_thread_reading_many_var(self):
- sp = newspace()
- conss = [NConsumer() for i in range(10)]
- vars_ = [sp.var(str(i)) for i in range(10)]
- for cons in conss:
- cons.give_vars(vars_)
- cons.start()
- for var in vars_:
- sp.bind(var, var.name)
- for cons in conss:
- cons.join()
- for i in range(10):
- assert vars_[i].val == str(i)
-
#-- concurrent streams and lists ----------------
#-- utilities -----------------------------------
@@ -150,12 +95,11 @@
else nil end
end"""
if n<limit:
- sp = newspace()
- Xr = sp.var('Xr')
+ Xr = var()
Xs.bind((n, Xr))
generate(thread, Xr, n+1, limit)
else:
- Xs.bind(EOL)
+ Xs.bind(None)
def dgenerate(thread, n, Xs):
"""(demand-driven generation of 0|1|2|...)
@@ -166,13 +110,12 @@
{DGenerate N+1 Xr}
end
end"""
- sp = newspace()
- print "GENERATOR waits on Xs"
+ print "GENERATOR in %s waits on Xs" % thread.getName()
X_Xr = Xs.wait() # destructure Xs
if X_Xr == None: return
X = X_Xr[0] # ... into X
+ print "GENERATOR in %s binds X to %s" % (thread.getName(), n)
X.bind(n) # bind X to n
- print "GENERATOR binds X to", n
Xr = X_Xr[1] # ... and Xr
dgenerate(thread, n+1, Xr)
@@ -186,10 +129,9 @@
else A end
end"""
if limit > 0:
- sp = newspace()
# fill Xs with an empty pair
- X = sp.var('X')
- Xr = sp.var('Xr')
+ X = var()
+ Xr = var()
print "CLIENT binds Xs to X|Xr"
Xs.bind((X, Xr))
x = X.wait() # wait on the value of X
@@ -210,21 +152,26 @@
end
end"""
X_Xr = Xs.wait()
- if X_Xr == EOL:
+ if X_Xr == None:
thread.result = a
return
Xr = X_Xr[1]
reduc(thread, Xr, fun(a, X_Xr[0]), fun)
+def run_test(t1, t2):
+ t1.start()
+ t2.start()
+ t1.join()
+ t2.join()
+
+
#-- meat ----------------------------------------
class TestStream:
def test_multiple_readers_eager_list(self):
"""the generator controls the flow"""
- sp = newspace()
-
- Xs = sp.var('L')
+ Xs = var()
r1 = FunThread(reduc, Xs, 0, operator.add)
r2 = FunThread(reduc, Xs, 0, operator.add)
@@ -242,40 +189,67 @@
assert r.result == 861
def test_lazy_list(self):
- """the reader controls the flow"""
- sp = newspace()
-
- def run_test(t1, t2):
- """
- local Xs S in
- thread {DGenerate 0 Xs} end
- thread S={DSum Xs 0 15} end
- {Browse S}
- end"""
- t1.start()
- t2.start()
- t1.join()
- t2.join()
-
- Xs = sp.var('Xs')
+ """the reader controls the flow
+ local Xs S in
+ thread {DGenerate 0 Xs} end
+ thread S={DSum Xs 0 15} end
+ {Browse S}
+ end"""
+ Xs = var()
generator = FunThread(dgenerate, 0, Xs)
summer = FunThread(dsum, Xs, 0, 15)
run_test(generator, summer)
assert summer.result == 105
+ def test_wait_needed(self):
+ """lazyness by wait_needed"""
+ Xs = var()
+
+ def lgenerate(thread, n, Xs):
+ """wait-needed version of dgenerate"""
+ print "GENERATOR waits on Xs"
+ Xs.wait_needed()
+ Xr = var()
+ Xs.bind((n, Xr))
+ print "GENERATOR binds Xs to", n
+ dgenerate(thread, n+1, Xr)
+
+ def sum(thread, Xs, a, limit):
+ """much shorter than dsum"""
+ if limit > 0:
+ x = Xs.wait()
+ print "CLIENT got", x
+ dsum(thread, x[1], a+x[0], limit-1)
+ else:
+ thread.result = a
+
+ generator = FunThread(lgenerate, 0, Xs)
+ summer = FunThread(sum, Xs, 0, 15)
+
+ run_test(generator, summer)
+ assert summer.result == 105
+
def test_bounded_buffer_transducer(self):
"""reader controls the flow but a
buffer between generator/consummer
avoids inefficient step-wise progression
"""
- sp = newspace()
+ def print_stream(S):
+ while S.is_bound():
+ v = S.wait()
+ if isinstance(v, tuple):
+ v0 = v[0]
+ if v0.is_bound(): print v0, '|',
+ else: print '?' ; break
+ S = v[1]
+ else:
+ print v
+ break
def bounded_buffer(thread, n, Xs, Ys):
- sp = newspace()
-
def startup(n, Xs):
"""
fun {Startup N ?Xs}
@@ -284,10 +258,10 @@
end
"""
if n==0: return Xs
- sp = newspace()
- X_ = sp.var('X_')
- Xr = sp.var('Xr')
- Xs.bind((X_, Xr))
+ print "startup n = ", n,
+ print_stream(Xs)
+ Xr = var()
+ Xs.bind((var(), Xr))
return startup(n-1, Xr)
def ask_loop(Ys, Xs, End):
@@ -300,29 +274,32 @@
end
end
"""
- sp = newspace()
+ print "Ask_loop ..."
+ print_stream(Xs)
+ print_stream(Ys)
Y_Yr = Ys.wait() # destructure Ys
if Y_Yr != None:
Y, Yr = Y_Yr
+ print "Ask_loop in thread %s got %s %s " % \
+ (thread.getName(), Y, Yr)
X, Xr = Xs.wait()
Y.bind(X.wait())
- End2 = sp.var('End2')
- X_ = sp.var('X_')
- End.bind((X_, End2))
+ End2 = var()
+ End.bind((var(), End2))
ask_loop(Yr, Xr, End2)
else:
End.bind(None)
- End = sp.var('End')
+ End = var()
End.bind(startup(n, Xs))
print "BUFFER starts"
ask_loop(Ys, Xs, End)
- Xs = sp.var('Xs')
- Ys = sp.var('Ys')
+ Xs = var()
+ Ys = var()
generator = FunThread(dgenerate, 0, Xs)
- bbuffer = FunThread(bounded_buffer, 8, Xs, Ys)
+ bbuffer = FunThread(bounded_buffer, 4, Xs, Ys)
summer = FunThread(dsum, Ys, 0, 50)
generator.start()
Modified: pypy/dist/pypy/lib/logic/computation_space/variable.py
==============================================================================
--- pypy/dist/pypy/lib/logic/computation_space/variable.py (original)
+++ pypy/dist/pypy/lib/logic/computation_space/variable.py Mon Mar 13 22:50:23 2006
@@ -31,6 +31,9 @@
class NoDom: pass
+def var():
+ return SimpleVar()
+
class SimpleVar(object):
"""Spaceless dataflow variable"""
@@ -39,6 +42,7 @@
self._val = NoValue
# a condition variable for concurrent access
self._value_condition = threading.Condition()
+ self._need_condition = threading.Condition()
# value accessors
def _set_val(self, val):
@@ -56,6 +60,13 @@
return self._val
val = property(_get_val, _set_val)
+ def __str__(self):
+ if self.is_bound():
+ return "%s = %s" % (self.name, self.val)
+ return "%s" % self.name
+
+ def __repr__(self):
+ return self.__str__()
# public interface
@@ -67,22 +78,34 @@
def wait(self):
try:
+ self._need_condition.acquire()
+ self._need_condition.notifyAll()
+ finally:
+ self._need_condition.release()
+ try:
self._value_condition.acquire()
while not self.is_bound():
t1 = time.time()
- self._value_condition.wait(120)
+ self._value_condition.wait(10)
t2 = time.time()
- if t2-t1>120:
+ if t2-t1>10:
raise RuntimeError("possible deadlock??")
return self.val
finally:
self._value_condition.release()
-
+
+ def wait_needed(self):
+ try:
+ self._need_condition.acquire()
+ self._need_condition.wait()
+ finally:
+ self._need_condition.release()
class CsVar(SimpleVar):
"""Dataflow variable linked to a space"""
def __init__(self, name, cs):
+ SimpleVar.__init__(self)
if name in cs.names:
raise AlreadyInStore(name)
self.name = name
@@ -129,14 +152,6 @@
return self._val
val = property(_get_val, _set_val)
- def __str__(self):
- if self.is_bound():
- return "%s = %s" % (self.name, self.val)
- return "%s" % self.name
-
- def __repr__(self):
- return self.__str__()
-
def bind(self, val):
"""home space bind"""
self._cs.bind(self, val)
More information about the Pypy-commit
mailing list