[pypy-svn] r31104 - pypy/dist/pypy/objspace/test
auc at codespeak.net
auc at codespeak.net
Mon Aug 7 13:55:20 CEST 2006
Author: auc
Date: Mon Aug 7 13:55:18 2006
New Revision: 31104
Modified:
pypy/dist/pypy/objspace/test/test_logicobjspace.py
Log:
a couple of new tests
Modified: pypy/dist/pypy/objspace/test/test_logicobjspace.py
==============================================================================
--- pypy/dist/pypy/objspace/test/test_logicobjspace.py (original)
+++ pypy/dist/pypy/objspace/test/test_logicobjspace.py Mon Aug 7 13:55:18 2006
@@ -15,8 +15,6 @@
assert not is_free(X)
assert is_bound(X)
assert is_bound(1)
- # FIXME : propagate proper
- # FailureException
raises(RebindingError, bind, X, 2)
def test_bind_to_self(self):
@@ -224,7 +222,6 @@
bind(X, 42); schedule() # gc ...
def test_one_future_exception(self):
- print "one future exception", sched_info()
class FooException(Exception): pass
def poop(X):
@@ -241,7 +238,6 @@
assert False
def test_exception_in_chain(self):
- print "exception in chain", sched_info()
class FooException(Exception): pass
def raise_foo():
@@ -265,7 +261,6 @@
assert False
def test_exception_in_group(self):
- print "exception in groups", sched_info()
class FooException(Exception): pass
def loop_or_raise(Canary, crit, Bomb_signal):
@@ -297,7 +292,6 @@
"""check that a wait nested in a tree of
threads works correctly
"""
- print "nested threads", sched_info()
def sleep(X):
wait(X)
return X
@@ -313,7 +307,6 @@
assert v == 42
def test_wait_needed(self):
- print "wait_needed", sched_info()
X = newvar()
def binder(V):
@@ -331,7 +324,6 @@
schedule() # gc help
def test_eager_producer_consummer(self):
- print "eager_producer_consummer", sched_info()
def generate(n, limit, R):
if n < limit:
@@ -357,7 +349,6 @@
def test_lazy_producer_consummer(self):
- print "lazy_producer_consummer", sched_info()
def lgenerate(n, L):
"""wait-needed version of generate"""
@@ -462,8 +453,150 @@
assert count[0] == max_spawn + erring
try:
wait(Failed)
- except Exception, e: # Unification Failure
+ except RebindingError, e:
assert len(sched_info()['threads']) == 1
return
assert False
+ def test_nd_append(self):
+ skip("non determnistic choice: yet to come")
+ #from CTM p.639
+ """
+ def append(A, B, C):
+ choice:
+ unify(A, None)
+ unify(B, C)
+ or:
+ As, Bs, X = newvar(), newvar(), newvar()
+ unify(A, (X, As))
+ unify(C, (X, Cs))
+ append(As, B, Cs)
+ """
+ from solver import solve
+ X, Y, S = newvar(), newvar(), newvar()
+ unify((X, Y), S)
+
+ for sol in solve(lambda : append(X, Y, [1, 2, 3])):
+ assert sol in ((None, [1, 2, 3]),
+ ([1], [2, 3]),
+ ([1, 2], [3]),
+ ([1, 2, 3], None))
+
+
+ def test_stream_merger(self):
+ """this is a little cheesy, due to threads not
+ being preemptively scheduled
+ """
+
+ def _sleep(X, Barrier):
+ wait(X)
+ unify(Barrier, True)
+
+ def wait_two(X, Y):
+ Barrier = newvar()
+ stacklet(_sleep, X, Barrier)
+ stacklet(_sleep, Y, Barrier)
+ wait(Barrier)
+ if is_free(Y):
+ return 1
+ return 2
+
+ def stream_merger(S1, S2):
+ F = newvar()
+ unify(F, wait_two(S1, S2))
+ if F==1:
+ if S1 is None:
+ return S2
+ else:
+ M, NewS1 = newvar(), newvar()
+ unify((M, NewS1), S1)
+ return (M, stream_merger(S2, NewS1))
+ elif F==2:
+ if S2 is None:
+ return S1
+ else:
+ M, NewS2 = newvar(), newvar()
+ unify((M, NewS2), S2)
+ return (M, stream_merger(NewS2, S1))
+
+
+ def feed_stream(S, values):
+ for v in values:
+ N = newvar()
+ bind(S, (v, N))
+ S = N # yeah, tail recursion is cooler
+ schedule()
+ bind(S, None)
+
+ S1, S2 = newvar(), newvar()
+
+ O = future(stream_merger, S1, S2)
+
+ stacklet(feed_stream, S2, ['foo', 'bar', 'quux', 'spam', 'eggs'])
+ stacklet(feed_stream, S1, range(10))
+
+ assert O == ('foo', (0, ('bar', (1, ('quux', (2, ('spam',
+ (3, ('eggs', (4, (5, (6, (7, (8, (9, None)))))))))))))))
+
+
+ def test_digital_logic(self):
+
+ def print_stream(S):
+ elts = []
+ while is_bound(S):
+ if S is None:
+ break
+ elts.append(str(S[0]))
+ S = S[1]
+ print '[%s]' % ','.join(elts)
+
+ def bound_part(S):
+ elts = []
+ while is_bound(S):
+ if S is None:
+ break
+ elts.append(S[0])
+ S = S[1]
+ return elts
+
+
+ def gatemaker(Fun):
+ def _(X, Y):
+ def gateloop(X, Y, R):
+ Xs, Ys, Xr, Yr = newvar(), newvar(), newvar(), newvar()
+ unify((X, Y),
+ ((Xs, Xr), (Ys, Yr)))
+ Rs = newvar()
+ unify(R, (Fun(Xs, Ys), Rs))
+ gateloop(Xr, Yr, Rs)
+ R = newvar()
+ stacklet(gateloop, X, Y, R)
+ return R
+ return _
+
+ andg = gatemaker(lambda X, Y: X*Y,)
+ org = gatemaker(lambda X, Y: X+Y-X*Y)
+ xorg = gatemaker(lambda X, Y: X+Y-2*X*Y)
+
+ def full_adder(X, Y, Z, C, S):
+ K, L, M = newvar(), newvar(), newvar()
+ unify(K, andg(X, Y))
+ unify(L, andg(Y, Z))
+ unify(M, andg(X, Z))
+ unify(C, org(K, org(L, M)))
+ unify(S, xorg(Z, xorg(X, Y)))
+
+ X, Y, Z, C, S = newvar(), newvar(), newvar(), newvar(), newvar()
+
+ unify(X, (1, (1, (0, newvar()))))
+ unify(Y, (0, (1, (0, newvar()))))
+ unify(Z, (1, (1, (1, newvar()))))
+
+
+ stacklet(full_adder, X, Y, Z, C, S)
+ wait(C); wait(S)
+ assert bound_part(C) == [1, 1, 0]
+ assert bound_part(S) == [0, 1, 1]
+ schedule()
+
+ reset_scheduler() # free all the hanging threads
More information about the Pypy-commit
mailing list