[pypy-svn] r23328 - pypy/dist/pypy/lib/logic/computation_space

auc at codespeak.net auc at codespeak.net
Tue Feb 14 15:10:10 CET 2006


Author: auc
Date: Tue Feb 14 15:10:07 2006
New Revision: 23328

Modified:
   pypy/dist/pypy/lib/logic/computation_space/test_variable.py
   pypy/dist/pypy/lib/logic/computation_space/variable.py
Log:
bounded buffer test


Modified: pypy/dist/pypy/lib/logic/computation_space/test_variable.py
==============================================================================
--- pypy/dist/pypy/lib/logic/computation_space/test_variable.py	(original)
+++ pypy/dist/pypy/lib/logic/computation_space/test_variable.py	Tue Feb 14 15:10:07 2006
@@ -35,23 +35,27 @@
     def run(self):
         val = [var.get() for var in self.vars]
 
+def newspace():
+    return space.ComputationSpace(dummy_problem)
+
+
 #-- meat ----------------------------
 
 class TestVariable:
 
     def test_no_same_name(self):
-        sp = space.ComputationSpace(dummy_problem)
+        sp = newspace()
         x = sp.var('x')
         raises(space.AlreadyInStore, sp.var, 'x')
 
     def test_get_by_name(self):
-        sp = space.ComputationSpace(dummy_problem)
+        sp = newspace()
         x = sp.var('x')
         assert x == sp.get_var_by_name('x')
         raises(space.NotInStore, sp.get_var_by_name, 'y')
 
     def test_one_thread_reading_one_var(self):
-        sp = space.ComputationSpace(dummy_problem)
+        sp = newspace()
         cons = Consumer()
         x = sp.var('x')
         cons.give_var(x)
@@ -61,7 +65,7 @@
         assert cons.var.val == 42
 
     def test_many_threads_reading_one_var(self):
-        sp = space.ComputationSpace(dummy_problem)
+        sp = newspace()
         conss = [Consumer() for i in range(10)]
         x = sp.var('x')
         for cons in conss:
@@ -73,7 +77,7 @@
         assert cons.var.val == 42
 
     def test_many_thread_reading_many_var(self):
-        sp = space.ComputationSpace(dummy_problem)
+        sp = newspace()
         conss = [NConsumer() for i in range(10)]
         vars_ = [sp.var(str(i)) for i in range(10)]
         for cons in conss:
@@ -86,6 +90,91 @@
         for i in range(10):
             assert vars_[i].val == str(i)
 
+#-- concurrent streams and lists ----------------
+
+#-- utilities -----------------------------------
+
+class EOL: pass
+
+def generate(thread, Xs, n, limit):
+    """(eager generation of a stream 0|1|2|...)
+    declare
+    fun {Generate N Limit}
+       if N<Limit then
+          N|{Generate N+1 Limit}
+       else nil end
+    end"""
+    if n<limit:
+        sp = newspace()
+        Xr = sp.var('Xr')
+        Xs.bind(v.CList(n, Xr))
+        generate(thread, Xr, n+1, limit)
+    else:
+        Xs.bind(EOL)    
+
+def dgenerate(thread, n, Xs):
+    """(demand-driven generation of 0|1|2|...)
+    declare
+    proc {DGenerate N Xs}
+        case Xs of X|Xr then
+           X=N
+           {DGenerate N+1 Xr}
+        end
+    end"""
+    sp = newspace()
+    print "GENERATOR waits on Xs"
+    X_Xr = Xs.get()      # destructure Xs
+    if X_Xr == None: return
+    X = X_Xr.first()     # ... into X
+    X.bind(n)            # bind X to n
+    print "GENERATOR binds X to", n
+    Xr = X_Xr.rest()     # ... and Xr
+    dgenerate(thread, n+1, Xr)
+
+def dsum(thread, Xs, a, limit):
+    """declare
+    fun {DSum ?Xs A Limit}
+       if Limit>0 then
+          X|Xr=Xs
+       in
+          {DSum Xr A+X Limit-1}
+       else A end
+    end"""
+    if limit > 0:
+        sp = newspace()
+        # fill Xs with an empty pair
+        X = sp.var('X')
+        Xr = sp.var('Xr')
+        print "CLIENT binds Xs to X|Xr"
+        Xs.bind(v.CList(X, Xr))
+        x = X.get() # wait on the value of X
+        print "CLIENT got", x
+        dsum(thread, Xr, a+x, limit-1)
+    else:
+        print "CLIENT binds Xs to None and exits"
+        Xs.bind(None)
+        thread.result = a
+
+def reduc(thread, Xs, a, fun):
+    """declare
+    fun {Sum Xs A}
+        case Xs
+            of X|Xr then {Sum Xr A+X}
+            [] nil then A
+            else {Sum Xs A}
+        end
+    end"""
+    X_Xr = Xs.get()
+    if X_Xr == EOL:
+        thread.result = a
+        return
+    Xr = X_Xr.rest()
+    reduc(thread, Xr, fun(a, X_Xr.first()), fun)
+
+#-- meat ----------------------------------------
+
+class TestStream:
+
     def test_basic_list(self):
         s = v.make_list([1, 2, 3])
         assert s.__str__() == '1|2|3'
@@ -97,10 +186,9 @@
         assert s.__str__() == '1|...'
         assert s.length() == 2
 
-    #-- concurrent streams and lists ----------------
-
     def test_producer_consummer_stream(self):
-        sp = space.ComputationSpace(dummy_problem)
+        """test FIFO stream behaviour"""
+        sp = newspace()
         import time
 
         def generate(thread, var, n, limit):
@@ -117,11 +205,11 @@
                 thread.result = fun(thread.result, val)
                 val = stream.get()
 
-        s = sp.var('s')
-        s.bind(v.Stream())
+        Xs = sp.var('s')
+        Xs.bind(v.Stream())
         
-        generator = FunThread(generate, s, 1, 10)
-        reductor = FunThread(reduc, s, operator.mul)
+        generator = FunThread(generate, Xs, 1, 10)
+        reductor = FunThread(reduc, Xs, operator.mul)
         reductor.result = 2
 
         generator.start()
@@ -132,7 +220,10 @@
         assert reductor.result == 725760
 
     def test_daisychain_stream(self):
-        sp = space.ComputationSpace(dummy_problem)
+        """walk a list whose last element
+           is a var bound to another list
+        """
+        sp = newspace()
 
         def woman_in_chains(thread, S):
             stream = S.get()
@@ -165,40 +256,7 @@
                 
     def test_multiple_readers_eager_list(self):
         """the generator controls the flow"""
-        sp = space.ComputationSpace(dummy_problem)
-
-        class EOL: pass
-        
-        def generate(thread, Xs, n, limit):
-            """declare
-            fun {Generate N Limit}
-               if N<Limit then
-                  N|{Generate N+1 Limit}
-               else nil end
-            end"""
-            if n<limit:
-                sp = space.ComputationSpace(dummy_problem)
-                Xr = sp.var('Xr')
-                Xs.bind(v.CList(n, Xr))
-                generate(thread, Xr, n+1, limit)
-            else:
-                Xs.bind(EOL)
-                
-        def reduc(thread, Xs, a, fun):
-            """declare
-            fun {Sum Xs A}
-                case Xs
-                    of X|Xr then {Sum Xr A+X}
-                    [] nil then A
-                    else {Sum Xs A}
-                end
-            end"""
-            X_Xr = Xs.get()
-            if X_Xr == EOL:
-                thread.result = a
-                return
-            Xr = X_Xr.rest()
-            reduc(thread, Xr, fun(a, X_Xr.first()), fun)
+        sp = newspace()
             
         Xs = sp.var('L')
 
@@ -219,54 +277,7 @@
 
     def test_lazy_list(self):
         """the reader controls the flow"""
-        sp = space.ComputationSpace(dummy_problem)
-
-        def newspace():
-            return space.ComputationSpace(dummy_problem)
-
-        def dgenerate(thread, n, Xs):
-            """declare
-            proc {DGenerate N Xs}
-                case Xs of X|Xr then
-                   X=N
-                   {DGenerate N+1 Xr}
-                end
-            end"""
-            # new local space
-            sp = newspace()
-            # go ahead ...
-            print "GENERATOR waits on Xs"
-            X_Xr = Xs.get()      # destructure Xs
-            if X_Xr == None: return
-            X = X_Xr.first()     # ... into X
-            X.bind(n)            # bind X to n
-            print "GENERATOR binds X to", n
-            Xr = X_Xr.rest()     # ... and Xr
-            dgenerate(thread, n+1, Xr)
-
-        def dsum(thread, Xs, a, limit):
-            """declare
-            fun {DSum ?Xs A Limit}
-               if Limit>0 then
-                  X|Xr=Xs
-               in
-                  {DSum Xr A+X Limit-1}
-               else A end
-            end"""
-            if limit > 0:
-                sp = newspace()
-                # fill Xs with an empty pair
-                X = sp.var('X')
-                Xr = sp.var('Xr')
-                print "CLIENT binds Xs to X|Xr"
-                Xs.bind(v.Pair(X, Xr))
-                x = X.get() # wait on the value of X
-                print "CLIENT got", x
-                dsum(thread, Xr, a+x, limit-1)
-            else:
-                print "CLIENT binds Xs to None and exits"
-                Xs.bind(None)
-                thread.result = a
+        sp = newspace()
 
         def run_test(t1, t2):
             """
@@ -286,3 +297,74 @@
 
         run_test(generator, summer)
         assert summer.result == 105
+
+
+    def test_bounded_buffer_transducer(self):
+        """reader controls the flow but a
+           buffer between generator/consummer
+           avoids inefficient step-wise progression
+        """
+        sp = newspace()
+
+        def bounded_buffer(thread, n, Xs, Ys):
+
+            sp = newspace()
+
+            def startup(n, Xs):
+                """
+                fun {Startup N ?Xs}
+                    if N==0 then Xs 
+                    else Xr in Xs=_|Xr {Startup N-1 Xr} end
+                end
+                """
+                if n==0: return Xs
+                sp = newspace()
+                X_ = sp.var('X_')
+                Xr = sp.var('Xr')
+                Xs.bind(v.CList(X_, Xr))
+                return startup(n-1, Xr)
+
+            def ask_loop(Ys, Xs, End):
+                """
+                proc {AskLoop Ys ?Xs ?End}
+                    case Ys of Y|Yr then Xr End2 in
+                        Xs=Y|Xr
+                        End=_|End2
+                        {AskLoop Yr Xr End2}
+                    end
+                end
+                """
+                sp = newspace()
+                Y_Yr = Ys.get()   # destructure Ys
+                if Y_Yr != None: 
+                    Y, Yr = Y_Yr.as_tuple()
+                    X, Xr = Xs.get().as_tuple()
+                    Y.bind(X.get())
+                    End2 = sp.var('End2')
+                    X_ = sp.var('X_')
+                    End.bind(v.CList(X_, End2))
+                    ask_loop(Yr, Xr, End2)
+                else:
+                    End.bind(None)
+
+            End = sp.var('End')
+            End.bind(startup(n, Xs))
+            print "BUFFER starts"
+            ask_loop(Ys, Xs, End)
+
+        Xs = sp.var('Xs')
+        Ys = sp.var('Ys')
+
+        generator = FunThread(dgenerate, 0, Xs)
+        bbuffer = FunThread(bounded_buffer, 4, Xs, Ys)
+        summer = FunThread(dsum, Ys, 0, 50)
+
+        generator.start()
+        summer.start()
+        bbuffer.start()
+
+        generator.join()
+        summer.join()
+        bbuffer.join()
+
+        assert summer.result == 1225

Modified: pypy/dist/pypy/lib/logic/computation_space/variable.py
==============================================================================
--- pypy/dist/pypy/lib/logic/computation_space/variable.py	(original)
+++ pypy/dist/pypy/lib/logic/computation_space/variable.py	Tue Feb 14 15:10:07 2006
@@ -146,6 +146,9 @@
     def set_rest(self, stuff):
         self._cdr = stuff
 
+    def as_tuple(self):
+        return (self._car, self._cdr)
+
     def is_empty(self):
         return self._car is None and self._cdr is None
 
@@ -179,7 +182,10 @@
                 if elt is None:
                     strs.pop()
                 elif isinstance(elt, Var):
-                    strs.append(elt.name)
+                    if elt.is_bound():
+                        strs.append(str(elt.val))
+                    else:
+                        strs.append(elt.name)
                 else:
                     strs.append(str(elt))
 
@@ -292,3 +298,5 @@
 
     def __str__(self):
         return str(self.head)
+
+



More information about the Pypy-commit mailing list