[pypy-svn] r31104 - pypy/dist/pypy/objspace/test

auc at codespeak.net auc at codespeak.net
Mon Aug 7 13:55:20 CEST 2006


Author: auc
Date: Mon Aug  7 13:55:18 2006
New Revision: 31104

Modified:
   pypy/dist/pypy/objspace/test/test_logicobjspace.py
Log:
a couple of new tests


Modified: pypy/dist/pypy/objspace/test/test_logicobjspace.py
==============================================================================
--- pypy/dist/pypy/objspace/test/test_logicobjspace.py	(original)
+++ pypy/dist/pypy/objspace/test/test_logicobjspace.py	Mon Aug  7 13:55:18 2006
@@ -15,8 +15,6 @@
         assert not is_free(X)
         assert is_bound(X)
         assert is_bound(1)
-        # FIXME : propagate proper
-        #         FailureException
         raises(RebindingError, bind, X, 2)
 
     def test_bind_to_self(self):
@@ -224,7 +222,6 @@
         bind(X, 42); schedule() # gc ...
 
     def test_one_future_exception(self):
-        print "one future exception", sched_info()
         class FooException(Exception): pass
         
         def poop(X):
@@ -241,7 +238,6 @@
         assert False
 
     def test_exception_in_chain(self):
-        print "exception in chain", sched_info()
         class FooException(Exception): pass
 
         def raise_foo():
@@ -265,7 +261,6 @@
         assert False
 
     def test_exception_in_group(self):
-        print "exception in groups", sched_info()
         class FooException(Exception): pass
 
         def loop_or_raise(Canary, crit, Bomb_signal):
@@ -297,7 +292,6 @@
         """check that a wait nested in a tree of
            threads works correctly
         """
-        print "nested threads", sched_info()
         def sleep(X):
             wait(X)
             return X
@@ -313,7 +307,6 @@
         assert v == 42
 
     def test_wait_needed(self):
-        print "wait_needed", sched_info()
         X = newvar()
 
         def binder(V):
@@ -331,7 +324,6 @@
         schedule() # gc help
 
     def test_eager_producer_consummer(self):
-        print "eager_producer_consummer", sched_info()
 
         def generate(n, limit, R):
             if n < limit:
@@ -357,7 +349,6 @@
 
 
     def test_lazy_producer_consummer(self):
-        print "lazy_producer_consummer", sched_info()
 
         def lgenerate(n, L):
             """wait-needed version of generate"""
@@ -462,8 +453,150 @@
         assert count[0] == max_spawn + erring
         try:
             wait(Failed)
-        except Exception, e: # Unification Failure
+        except RebindingError, e: 
             assert len(sched_info()['threads']) == 1
             return
         assert False
                 
+    def test_nd_append(self):
+        skip("non determnistic choice: yet to come")
+        #from CTM p.639
+        """
+        def append(A, B, C):
+            choice:
+                unify(A, None)
+                unify(B, C)
+            or:
+                As, Bs, X = newvar(), newvar(), newvar()
+                unify(A, (X, As))
+                unify(C, (X, Cs))
+                append(As, B, Cs)
+        """
+        from solver import solve
+        X, Y, S = newvar(), newvar(), newvar()
+        unify((X, Y), S)
+        
+        for sol in solve(lambda : append(X, Y, [1, 2, 3])):
+            assert sol in ((None, [1, 2, 3]),
+                           ([1], [2, 3]),
+                           ([1, 2], [3]),
+                           ([1, 2, 3], None))
+                           
+
+    def test_stream_merger(self):
+        """this is a little cheesy, due to threads not
+        being preemptively scheduled
+        """
+        
+        def _sleep(X, Barrier):
+            wait(X)
+            unify(Barrier, True)
+        
+        def wait_two(X, Y):
+            Barrier = newvar()
+            stacklet(_sleep, X, Barrier)
+            stacklet(_sleep, Y, Barrier)
+            wait(Barrier)
+            if is_free(Y):
+                return 1
+            return 2
+
+        def stream_merger(S1, S2):
+            F = newvar()
+            unify(F, wait_two(S1, S2))
+            if F==1:
+                if S1 is None:
+                    return S2
+                else:
+                    M, NewS1 = newvar(), newvar()
+                    unify((M, NewS1), S1)
+                    return (M, stream_merger(S2, NewS1))
+            elif F==2:
+                if S2 is None:
+                    return S1
+                else:
+                    M, NewS2 = newvar(), newvar()
+                    unify((M, NewS2), S2)
+                    return (M, stream_merger(NewS2, S1))
+        
+
+        def feed_stream(S, values):
+            for v in values:
+                N = newvar()
+                bind(S, (v, N))
+                S = N # yeah, tail recursion is cooler
+                schedule()
+            bind(S, None)
+
+        S1, S2 = newvar(), newvar()
+        
+        O = future(stream_merger, S1, S2)
+        
+        stacklet(feed_stream, S2, ['foo', 'bar', 'quux', 'spam', 'eggs'])
+        stacklet(feed_stream, S1, range(10))
+        
+        assert O == ('foo', (0, ('bar', (1, ('quux', (2, ('spam',
+                    (3, ('eggs', (4, (5, (6, (7, (8, (9, None)))))))))))))))
+        
+
+    def test_digital_logic(self):
+        
+        def print_stream(S):
+            elts = []
+            while is_bound(S):
+                if S is None:
+                    break
+                elts.append(str(S[0]))
+                S = S[1]
+            print '[%s]' % ','.join(elts)
+
+        def bound_part(S):
+            elts = []
+            while is_bound(S):
+                if S is None:
+                    break
+                elts.append(S[0])
+                S = S[1]
+            return elts
+            
+        
+        def gatemaker(Fun):
+            def _(X, Y):
+                def gateloop(X, Y, R):
+                    Xs, Ys, Xr, Yr = newvar(), newvar(), newvar(), newvar()
+                    unify((X,        Y),
+                          ((Xs, Xr), (Ys, Yr)))
+                    Rs = newvar()
+                    unify(R, (Fun(Xs, Ys), Rs))
+                    gateloop(Xr, Yr, Rs)
+                R = newvar()
+                stacklet(gateloop, X, Y, R)
+                return R
+            return _
+
+        andg  = gatemaker(lambda X, Y: X*Y,)
+        org   = gatemaker(lambda X, Y: X+Y-X*Y)
+        xorg  = gatemaker(lambda X, Y: X+Y-2*X*Y)
+        
+        def full_adder(X, Y, Z, C, S):
+            K, L, M = newvar(), newvar(), newvar()
+            unify(K, andg(X, Y))
+            unify(L, andg(Y, Z))
+            unify(M, andg(X, Z))
+            unify(C, org(K, org(L, M)))
+            unify(S, xorg(Z, xorg(X, Y)))
+
+        X, Y, Z, C, S = newvar(), newvar(), newvar(), newvar(), newvar()
+
+        unify(X, (1, (1, (0, newvar()))))
+        unify(Y, (0, (1, (0, newvar()))))
+        unify(Z, (1, (1, (1, newvar()))))
+
+
+        stacklet(full_adder, X, Y, Z, C, S)
+        wait(C); wait(S)
+        assert bound_part(C) == [1, 1, 0]
+        assert bound_part(S) == [0, 1, 1]
+        schedule()
+
+        reset_scheduler() # free all the hanging threads



More information about the Pypy-commit mailing list