[pypy-svn] r24320 - pypy/dist/pypy/lib/logic/computation_space

auc at codespeak.net auc at codespeak.net
Mon Mar 13 22:50:24 CET 2006


Author: auc
Date: Mon Mar 13 22:50:23 2006
New Revision: 24320

Modified:
   pypy/dist/pypy/lib/logic/computation_space/test_computationspace.py
   pypy/dist/pypy/lib/logic/computation_space/test_variable.py
   pypy/dist/pypy/lib/logic/computation_space/variable.py
Log:
added waitneeded
some refactoring, still one test to fix


Modified: pypy/dist/pypy/lib/logic/computation_space/test_computationspace.py
==============================================================================
--- pypy/dist/pypy/lib/logic/computation_space/test_computationspace.py	(original)
+++ pypy/dist/pypy/lib/logic/computation_space/test_computationspace.py	Mon Mar 13 22:50:23 2006
@@ -36,6 +36,12 @@
         x = sp.var('x')
         raises(v.AlreadyInStore, sp.var, 'x')
 
+    def test_get_by_name(self):
+        sp = newspace()
+        x = sp.var('x')
+        assert x == sp.get_var_by_name('x')
+        raises(space.NotInStore, sp.get_var_by_name, 'y')
+
     def test_already_bound(self):
         sp = newspace()
         x = sp.var('x')

Modified: pypy/dist/pypy/lib/logic/computation_space/test_variable.py
==============================================================================
--- pypy/dist/pypy/lib/logic/computation_space/test_variable.py	(original)
+++ pypy/dist/pypy/lib/logic/computation_space/test_variable.py	Mon Mar 13 22:50:23 2006
@@ -3,9 +3,7 @@
 
 from py.test import raises
 
-import computationspace as space
-import variable as v
-from problems import dummy_problem
+from variable import var, NoValue, AlreadyBound
 
 #-- utilities ---------------------
 
@@ -35,21 +33,17 @@
     def run(self):
         val = [var.wait() for var in self.vars]
 
-def newspace():
-    return space.ComputationSpace(dummy_problem)
-
-
 #-- meat ----------------------------
 
 class TestSimpleVariable:
 
     def test_basics(self):
-        x = v.SimpleVar()
-        assert x.val == v.NoValue
+        x = var()
+        assert x.val == NoValue
         x.bind(42)
         assert x.val == 42
         x.bind(42)
-        raises(v.AlreadyBound, x.bind, 43)
+        raises(AlreadyBound, x.bind, 43)
 
     def test_dataflow(self):
         def fun(thread, var):
@@ -57,7 +51,7 @@
             v = var.wait()
             thread.state = v
 
-        x = v.SimpleVar()
+        x = var()
         t = FunThread(fun, x)
         import time
         t.start()
@@ -74,67 +68,18 @@
                 thread.res += v[0]
                 consummer(thread, v[1])
 
-        S = v.SimpleVar()
+        S = var()
         t = FunThread(consummer, S)
         t.res = 0
         t.start()
         for i in range(10):
-            tail = v.SimpleVar()
+            tail = var()
             S.bind((i, tail))
             S = tail
         S.bind(None)
         t.join()
         assert t.res == 45
 
-class TestCsVariable:
-
-    def test_no_same_name(self):
-        sp = newspace()
-        x = sp.var('x')
-        raises(space.AlreadyInStore, sp.var, 'x')
-
-    def test_get_by_name(self):
-        sp = newspace()
-        x = sp.var('x')
-        assert x == sp.get_var_by_name('x')
-        raises(space.NotInStore, sp.get_var_by_name, 'y')
-
-    def test_one_thread_reading_one_var(self):
-        sp = newspace()
-        cons = Consumer()
-        x = sp.var('x')
-        cons.give_var(x)
-        cons.start()
-        sp.bind(x, 42)
-        cons.join()
-        assert cons.var.val == 42
-
-    def test_many_threads_reading_one_var(self):
-        sp = newspace()
-        conss = [Consumer() for i in range(10)]
-        x = sp.var('x')
-        for cons in conss:
-            cons.give_var(x)
-            cons.start()
-        sp.bind(x, 42)
-        for cons in conss:
-            cons.join()
-        assert cons.var.val == 42
-
-    def test_many_thread_reading_many_var(self):
-        sp = newspace()
-        conss = [NConsumer() for i in range(10)]
-        vars_ = [sp.var(str(i)) for i in range(10)]
-        for cons in conss:
-            cons.give_vars(vars_)
-            cons.start()
-        for var in vars_:
-            sp.bind(var, var.name)
-        for cons in conss:
-            cons.join()
-        for i in range(10):
-            assert vars_[i].val == str(i)
-
 #-- concurrent streams and lists ----------------
 
 #-- utilities -----------------------------------
@@ -150,12 +95,11 @@
        else nil end
     end"""
     if n<limit:
-        sp = newspace()
-        Xr = sp.var('Xr')
+        Xr = var()
         Xs.bind((n, Xr))
         generate(thread, Xr, n+1, limit)
     else:
-        Xs.bind(EOL)    
+        Xs.bind(None)    
 
 def dgenerate(thread, n, Xs):
     """(demand-driven generation of 0|1|2|...)
@@ -166,13 +110,12 @@
            {DGenerate N+1 Xr}
         end
     end"""
-    sp = newspace()
-    print "GENERATOR waits on Xs"
+    print "GENERATOR in %s waits on Xs" % thread.getName()
     X_Xr = Xs.wait()      # destructure Xs
     if X_Xr == None: return
     X = X_Xr[0]          # ... into X
+    print "GENERATOR in %s binds X to %s" % (thread.getName(), n)
     X.bind(n)            # bind X to n
-    print "GENERATOR binds X to", n
     Xr = X_Xr[1]         # ... and Xr
     dgenerate(thread, n+1, Xr)
 
@@ -186,10 +129,9 @@
        else A end
     end"""
     if limit > 0:
-        sp = newspace()
         # fill Xs with an empty pair
-        X = sp.var('X')
-        Xr = sp.var('Xr')
+        X = var()
+        Xr = var()
         print "CLIENT binds Xs to X|Xr"
         Xs.bind((X, Xr))
         x = X.wait() # wait on the value of X
@@ -210,21 +152,26 @@
         end
     end"""
     X_Xr = Xs.wait()
-    if X_Xr == EOL:
+    if X_Xr == None:
         thread.result = a
         return
     Xr = X_Xr[1]
     reduc(thread, Xr, fun(a, X_Xr[0]), fun)
 
+def run_test(t1, t2):
+    t1.start()
+    t2.start()
+    t1.join()
+    t2.join()
+
+
 #-- meat ----------------------------------------
 
 class TestStream:
                 
     def test_multiple_readers_eager_list(self):
         """the generator controls the flow"""
-        sp = newspace()
-            
-        Xs = sp.var('L')
+        Xs = var()
 
         r1 = FunThread(reduc, Xs, 0, operator.add)
         r2 = FunThread(reduc, Xs, 0, operator.add)
@@ -242,40 +189,67 @@
             assert r.result == 861
 
     def test_lazy_list(self):
-        """the reader controls the flow"""
-        sp = newspace()
-
-        def run_test(t1, t2):
-            """
-            local Xs S in
-              thread {DGenerate 0 Xs} end
-              thread S={DSum Xs 0 15} end
-              {Browse S}
-            end"""
-            t1.start()
-            t2.start()
-            t1.join()
-            t2.join()
-
-        Xs = sp.var('Xs')
+        """the reader controls the flow
+        local Xs S in
+          thread {DGenerate 0 Xs} end
+          thread S={DSum Xs 0 15} end
+          {Browse S}
+        end"""
+        Xs = var()
         generator = FunThread(dgenerate, 0, Xs)
         summer = FunThread(dsum, Xs, 0, 15)
 
         run_test(generator, summer)
         assert summer.result == 105
 
+    def test_wait_needed(self):
+        """lazyness by wait_needed"""
+        Xs = var()
+
+        def lgenerate(thread, n, Xs):
+            """wait-needed version of dgenerate"""
+            print "GENERATOR waits on Xs"
+            Xs.wait_needed()
+            Xr = var()
+            Xs.bind((n, Xr))  
+            print "GENERATOR binds Xs to", n
+            dgenerate(thread, n+1, Xr)
+
+        def sum(thread, Xs, a, limit):
+            """much shorter than dsum"""
+            if limit > 0:
+                x = Xs.wait()
+                print "CLIENT got", x
+                dsum(thread, x[1], a+x[0], limit-1)
+            else:
+                thread.result = a
+        
+        generator = FunThread(lgenerate, 0, Xs)
+        summer = FunThread(sum, Xs, 0, 15)
+
+        run_test(generator, summer)
+        assert summer.result == 105
+        
 
     def test_bounded_buffer_transducer(self):
         """reader controls the flow but a
            buffer between generator/consummer
            avoids inefficient step-wise progression
         """
-        sp = newspace()
+        def print_stream(S):
+            while S.is_bound():
+                v = S.wait()
+                if isinstance(v, tuple):
+                    v0 = v[0]
+                    if v0.is_bound(): print v0, '|',
+                    else: print '?' ; break
+                    S = v[1]
+                else:
+                    print v
+                    break
 
         def bounded_buffer(thread, n, Xs, Ys):
 
-            sp = newspace()
-
             def startup(n, Xs):
                 """
                 fun {Startup N ?Xs}
@@ -284,10 +258,10 @@
                 end
                 """
                 if n==0: return Xs
-                sp = newspace()
-                X_ = sp.var('X_')
-                Xr = sp.var('Xr')
-                Xs.bind((X_, Xr))
+                print "startup n = ", n,
+                print_stream(Xs)
+                Xr = var()
+                Xs.bind((var(), Xr))
                 return startup(n-1, Xr)
 
             def ask_loop(Ys, Xs, End):
@@ -300,29 +274,32 @@
                     end
                 end
                 """
-                sp = newspace()
+                print "Ask_loop ..."
+                print_stream(Xs)
+                print_stream(Ys)
                 Y_Yr = Ys.wait()   # destructure Ys
                 if Y_Yr != None: 
                     Y, Yr = Y_Yr
+                    print "Ask_loop in thread %s got %s %s " % \
+                          (thread.getName(), Y, Yr)
                     X, Xr = Xs.wait()
                     Y.bind(X.wait())
-                    End2 = sp.var('End2')
-                    X_ = sp.var('X_')
-                    End.bind((X_, End2))
+                    End2 = var()
+                    End.bind((var(), End2))
                     ask_loop(Yr, Xr, End2)
                 else:
                     End.bind(None)
 
-            End = sp.var('End')
+            End = var()
             End.bind(startup(n, Xs))
             print "BUFFER starts"
             ask_loop(Ys, Xs, End)
 
-        Xs = sp.var('Xs')
-        Ys = sp.var('Ys')
+        Xs = var()
+        Ys = var()
 
         generator = FunThread(dgenerate, 0, Xs)
-        bbuffer = FunThread(bounded_buffer, 8, Xs, Ys)
+        bbuffer = FunThread(bounded_buffer, 4, Xs, Ys)
         summer = FunThread(dsum, Ys, 0, 50)
 
         generator.start()

Modified: pypy/dist/pypy/lib/logic/computation_space/variable.py
==============================================================================
--- pypy/dist/pypy/lib/logic/computation_space/variable.py	(original)
+++ pypy/dist/pypy/lib/logic/computation_space/variable.py	Mon Mar 13 22:50:23 2006
@@ -31,6 +31,9 @@
 
 class NoDom: pass
 
+def var():
+    return SimpleVar()
+
 class SimpleVar(object):
     """Spaceless dataflow variable"""
     
@@ -39,6 +42,7 @@
         self._val = NoValue
         # a condition variable for concurrent access
         self._value_condition = threading.Condition()
+        self._need_condition = threading.Condition()
 
     # value accessors
     def _set_val(self, val):
@@ -56,6 +60,13 @@
         return self._val
     val = property(_get_val, _set_val)
 
+    def __str__(self):
+        if self.is_bound():
+            return "%s = %s" % (self.name, self.val)
+        return "%s" % self.name
+
+    def __repr__(self):
+        return self.__str__()
 
     # public interface
 
@@ -67,22 +78,34 @@
 
     def wait(self):
         try:
+            self._need_condition.acquire()
+            self._need_condition.notifyAll()
+        finally:
+            self._need_condition.release()
+        try:
             self._value_condition.acquire()
             while not self.is_bound():
                 t1 = time.time()
-                self._value_condition.wait(120)
+                self._value_condition.wait(10)
                 t2 = time.time()
-                if t2-t1>120:
+                if t2-t1>10:
                     raise RuntimeError("possible deadlock??")
             return self.val
         finally:
             self._value_condition.release()
-        
+
+    def wait_needed(self):
+        try:
+            self._need_condition.acquire()
+            self._need_condition.wait()
+        finally:
+            self._need_condition.release()
 
 class CsVar(SimpleVar):
     """Dataflow variable linked to a space"""
 
     def __init__(self, name, cs):
+        SimpleVar.__init__(self)
         if name in cs.names:
             raise AlreadyInStore(name)
         self.name = name
@@ -129,14 +152,6 @@
         return self._val
     val = property(_get_val, _set_val)
 
-    def __str__(self):
-        if self.is_bound():
-            return "%s = %s" % (self.name, self.val)
-        return "%s" % self.name
-
-    def __repr__(self):
-        return self.__str__()
-
     def bind(self, val):
         """home space bind"""
         self._cs.bind(self, val)



More information about the Pypy-commit mailing list