[pypy-svn] r22470 - pypy/dist/pypy/lib/logic

auc at codespeak.net auc at codespeak.net
Fri Jan 20 17:17:28 CET 2006


Author: auc
Date: Fri Jan 20 17:17:25 2006
New Revision: 22470

Added:
   pypy/dist/pypy/lib/logic/test_variable.py
Modified:
   pypy/dist/pypy/lib/logic/unification.py
   pypy/dist/pypy/lib/logic/variable.py
Log:
add blocking read primitive


Added: pypy/dist/pypy/lib/logic/test_variable.py
==============================================================================
--- (empty file)
+++ pypy/dist/pypy/lib/logic/test_variable.py	Fri Jan 20 17:17:25 2006
@@ -0,0 +1,56 @@
+import unification as u
+from threading import Thread
+
+class Consumer(Thread):
+
+    def give_var(self, var):
+        self.var = var
+
+    def run(self):
+        val = self.var.get()
+
+class NConsumer(Thread):
+
+    def give_vars(self, vars_):
+        self.vars = vars_
+
+    def run(self):
+        val = [var.get() for var in self.vars]
+
+class TestVariable:
+
+    def setup_method(self, meth):
+        u._store = u.Store()
+
+    def test_one_thread_reading_one_var(self):
+        cons = Consumer()
+        x = u.var('x')
+        cons.give_var(x)
+        cons.start()
+        u.bind(x, 42)
+        cons.join()
+        assert cons.var.val == 42
+
+    def test_many_threads_reading_one_var(self):
+        conss = [Consumer() for i in range(10)]
+        x = u.var('x')
+        for cons in conss:
+            cons.give_var(x)
+            cons.start()
+        u.bind(x, 42)
+        for cons in conss:
+            cons.join()
+        assert cons.var.val == 42
+
+    def test_many_thread_reading_many_var(self):
+        conss = [NConsumer() for i in range(10)]
+        vars_ = [u.var(str(i)) for i in range(10)]
+        for cons in conss:
+            cons.give_vars(vars_)
+            cons.start()
+        for var in vars_:
+            u.bind(var, var.name)
+        for cons in conss:
+            cons.join()
+        for i in range(10):
+            assert vars_[i].val == str(i)

Modified: pypy/dist/pypy/lib/logic/unification.py
==============================================================================
--- pypy/dist/pypy/lib/logic/unification.py	(original)
+++ pypy/dist/pypy/lib/logic/unification.py	Fri Jan 20 17:17:25 2006
@@ -178,14 +178,14 @@
         if _both_are_vars(var, val):
             if _both_are_bound(var, val):
                 raise AlreadyBound(var.name)
-            if var.is_bound(): # 2b. var is bound, not var
+            if var._is_bound(): # 2b. var is bound, not var
                 self.bind(val, var)
-            elif val.is_bound(): # 2a.val is bound, not val
+            elif val._is_bound(): # 2a.val is bound, not val
                 self._bind(var.val, val.val)
             else: # 1. both are unbound
                 self._merge(var.val, val.val)
         else: # 3. val is really a value
-            if var.is_bound():
+            if var._is_bound():
                 raise AlreadyBound(var.name)
             self._bind(var.val, val)
 
@@ -213,11 +213,11 @@
                 self._really_unify(x, y)
                 for var in self.vars:
                     if var.changed:
-                        var.commit()
+                        var._commit()
             except:
                 for var in self.vars:
                     if var.changed:
-                        var.abort()
+                        var._abort()
                 raise
         finally:
             self.in_transaction = False
@@ -242,7 +242,7 @@
             self.bind(y,x)
 
     def _unify_var_val(self, x, y):
-        if x.is_bound(): raise UnificationFailure(x, y)
+        if x._is_bound(): raise UnificationFailure(x, y)
         if x != y:
             self.bind(x, y)
         
@@ -330,7 +330,7 @@
     return isinstance(v1, Var) and isinstance(v2, Var)
     
 def _both_are_bound(v1, v2):
-    return v1.is_bound() and v2.is_bound()
+    return v1._is_bound() and v2._is_bound()
 
 #--
 #-- the global store
@@ -349,10 +349,9 @@
     return _store.unify(var1, var2)
 
 def bound():
-    return _store.bound.keys()
+    return map(_store.vars,
+               lambda v: v.is_bound())
 
 def unbound():
-    res = []
-    for cluster in _store.unbound:
-        res.append('='.join([str(var) for var in cluster]))
-    return res
+    return map(_store.vars,
+               lambda v: not v.is_bound())

Modified: pypy/dist/pypy/lib/logic/variable.py
==============================================================================
--- pypy/dist/pypy/lib/logic/variable.py	(original)
+++ pypy/dist/pypy/lib/logic/variable.py	Fri Jan 20 17:17:25 2006
@@ -1,3 +1,7 @@
+# First cut at representing Oz dataflow variable
+
+import threading
+
 #----------- Exceptions ---------------------------------
 class VariableException(Exception):
     def __init__(self, name):
@@ -10,7 +14,12 @@
 #----------- Variables ----------------------------------
 class EqSet(set):
     """An equivalence set for variables"""
-    pass
+
+##     def __str__(self):
+##         if len(self) == 0:
+##             return ''
+##         for var in self:
+##             '='.join(var.name)
 
 class NoValue:
     pass
@@ -26,28 +35,39 @@
         # of our initial value (for abort cases)
         self.previous = None
         self.changed = False
+        # a condition variable for concurrent access
+        self.mutex = threading.Lock()
+        self.value_condition = threading.Condition(self.mutex)
 
-    def is_bound(self):
-        return not isinstance(self.val, EqSet) \
-               and self.val != NoValue
+    # for consumption by the global store
 
-    def commit(self):
+    def _is_bound(self):
+        return not isinstance(self._val, EqSet) \
+               and self._val != NoValue
+
+    # 'transaction' support
+
+    def _commit(self):
         self.changed = False
 
-    def abort(self):
+    def _abort(self):
         self.val = self.previous
         self.changed = False
 
-    def set_val(self, val):
+    # value accessors
+    def _set_val(self, val):
+        self.value_condition.acquire()
         if self.store.in_transaction:
             if not self.changed:
                 self.previous = self._val
                 self.changed = True
-                print "in transaction, %s <- %s" % (self.name, val)
         self._val = val
-    def get_val(self):
+        self.value_condition.notifyAll()
+        self.value_condition.release()
+        
+    def _get_val(self):
         return self._val
-    val = property(get_val, set_val)
+    val = property(_get_val, _set_val)
 
     def __str__(self):
         if self.is_bound():
@@ -64,9 +84,21 @@
     def __hash__(self):
         return self.name.__hash__()
 
-def var(name):
-    v = Var(name, _store)
-    _store.add_unbound(v)
-    return v
-
+    #---- Concurrent public ops --------------------------
 
+    def is_bound(self):
+        self.mutex.acquire()
+        res = self._is_bound()
+        self.mutex.release()
+        return res
+
+    # should be used by threads that want to block on
+    # unbound variables
+    def get(self):
+        try:
+            self.value_condition.acquire()
+            while not self._is_bound():
+                self.value_condition.wait()
+            return self.val
+        finally:
+            self.value_condition.release()



More information about the Pypy-commit mailing list