[pypy-svn] r22470 - pypy/dist/pypy/lib/logic
auc at codespeak.net
auc at codespeak.net
Fri Jan 20 17:17:28 CET 2006
Author: auc
Date: Fri Jan 20 17:17:25 2006
New Revision: 22470
Added:
pypy/dist/pypy/lib/logic/test_variable.py
Modified:
pypy/dist/pypy/lib/logic/unification.py
pypy/dist/pypy/lib/logic/variable.py
Log:
add blocking read primitive
Added: pypy/dist/pypy/lib/logic/test_variable.py
==============================================================================
--- (empty file)
+++ pypy/dist/pypy/lib/logic/test_variable.py Fri Jan 20 17:17:25 2006
@@ -0,0 +1,56 @@
+import unification as u
+from threading import Thread
+
+class Consumer(Thread):
+
+ def give_var(self, var):
+ self.var = var
+
+ def run(self):
+ val = self.var.get()
+
+class NConsumer(Thread):
+
+ def give_vars(self, vars_):
+ self.vars = vars_
+
+ def run(self):
+ val = [var.get() for var in self.vars]
+
+class TestVariable:
+
+ def setup_method(self, meth):
+ u._store = u.Store()
+
+ def test_one_thread_reading_one_var(self):
+ cons = Consumer()
+ x = u.var('x')
+ cons.give_var(x)
+ cons.start()
+ u.bind(x, 42)
+ cons.join()
+ assert cons.var.val == 42
+
+ def test_many_threads_reading_one_var(self):
+ conss = [Consumer() for i in range(10)]
+ x = u.var('x')
+ for cons in conss:
+ cons.give_var(x)
+ cons.start()
+ u.bind(x, 42)
+ for cons in conss:
+ cons.join()
+ assert cons.var.val == 42
+
+ def test_many_thread_reading_many_var(self):
+ conss = [NConsumer() for i in range(10)]
+ vars_ = [u.var(str(i)) for i in range(10)]
+ for cons in conss:
+ cons.give_vars(vars_)
+ cons.start()
+ for var in vars_:
+ u.bind(var, var.name)
+ for cons in conss:
+ cons.join()
+ for i in range(10):
+ assert vars_[i].val == str(i)
Modified: pypy/dist/pypy/lib/logic/unification.py
==============================================================================
--- pypy/dist/pypy/lib/logic/unification.py (original)
+++ pypy/dist/pypy/lib/logic/unification.py Fri Jan 20 17:17:25 2006
@@ -178,14 +178,14 @@
if _both_are_vars(var, val):
if _both_are_bound(var, val):
raise AlreadyBound(var.name)
- if var.is_bound(): # 2b. var is bound, not var
+ if var._is_bound(): # 2b. var is bound, not var
self.bind(val, var)
- elif val.is_bound(): # 2a.val is bound, not val
+ elif val._is_bound(): # 2a.val is bound, not val
self._bind(var.val, val.val)
else: # 1. both are unbound
self._merge(var.val, val.val)
else: # 3. val is really a value
- if var.is_bound():
+ if var._is_bound():
raise AlreadyBound(var.name)
self._bind(var.val, val)
@@ -213,11 +213,11 @@
self._really_unify(x, y)
for var in self.vars:
if var.changed:
- var.commit()
+ var._commit()
except:
for var in self.vars:
if var.changed:
- var.abort()
+ var._abort()
raise
finally:
self.in_transaction = False
@@ -242,7 +242,7 @@
self.bind(y,x)
def _unify_var_val(self, x, y):
- if x.is_bound(): raise UnificationFailure(x, y)
+ if x._is_bound(): raise UnificationFailure(x, y)
if x != y:
self.bind(x, y)
@@ -330,7 +330,7 @@
return isinstance(v1, Var) and isinstance(v2, Var)
def _both_are_bound(v1, v2):
- return v1.is_bound() and v2.is_bound()
+ return v1._is_bound() and v2._is_bound()
#--
#-- the global store
@@ -349,10 +349,9 @@
return _store.unify(var1, var2)
def bound():
- return _store.bound.keys()
+ return map(_store.vars,
+ lambda v: v.is_bound())
def unbound():
- res = []
- for cluster in _store.unbound:
- res.append('='.join([str(var) for var in cluster]))
- return res
+ return map(_store.vars,
+ lambda v: not v.is_bound())
Modified: pypy/dist/pypy/lib/logic/variable.py
==============================================================================
--- pypy/dist/pypy/lib/logic/variable.py (original)
+++ pypy/dist/pypy/lib/logic/variable.py Fri Jan 20 17:17:25 2006
@@ -1,3 +1,7 @@
+# First cut at representing Oz dataflow variable
+
+import threading
+
#----------- Exceptions ---------------------------------
class VariableException(Exception):
def __init__(self, name):
@@ -10,7 +14,12 @@
#----------- Variables ----------------------------------
class EqSet(set):
"""An equivalence set for variables"""
- pass
+
+## def __str__(self):
+## if len(self) == 0:
+## return ''
+## for var in self:
+## '='.join(var.name)
class NoValue:
pass
@@ -26,28 +35,39 @@
# of our initial value (for abort cases)
self.previous = None
self.changed = False
+ # a condition variable for concurrent access
+ self.mutex = threading.Lock()
+ self.value_condition = threading.Condition(self.mutex)
- def is_bound(self):
- return not isinstance(self.val, EqSet) \
- and self.val != NoValue
+ # for consumption by the global store
- def commit(self):
+ def _is_bound(self):
+ return not isinstance(self._val, EqSet) \
+ and self._val != NoValue
+
+ # 'transaction' support
+
+ def _commit(self):
self.changed = False
- def abort(self):
+ def _abort(self):
self.val = self.previous
self.changed = False
- def set_val(self, val):
+ # value accessors
+ def _set_val(self, val):
+ self.value_condition.acquire()
if self.store.in_transaction:
if not self.changed:
self.previous = self._val
self.changed = True
- print "in transaction, %s <- %s" % (self.name, val)
self._val = val
- def get_val(self):
+ self.value_condition.notifyAll()
+ self.value_condition.release()
+
+ def _get_val(self):
return self._val
- val = property(get_val, set_val)
+ val = property(_get_val, _set_val)
def __str__(self):
if self.is_bound():
@@ -64,9 +84,21 @@
def __hash__(self):
return self.name.__hash__()
-def var(name):
- v = Var(name, _store)
- _store.add_unbound(v)
- return v
-
+ #---- Concurrent public ops --------------------------
+ def is_bound(self):
+ self.mutex.acquire()
+ res = self._is_bound()
+ self.mutex.release()
+ return res
+
+ # should be used by threads that want to block on
+ # unbound variables
+ def get(self):
+ try:
+ self.value_condition.acquire()
+ while not self._is_bound():
+ self.value_condition.wait()
+ return self.val
+ finally:
+ self.value_condition.release()
More information about the Pypy-commit
mailing list