[pypy-svn] r23367 - pypy/dist/pypy/lib/logic/computation_space
auc at codespeak.net
auc at codespeak.net
Wed Feb 15 17:50:29 CET 2006
Author: auc
Date: Wed Feb 15 17:50:26 2006
New Revision: 23367
Modified:
pypy/dist/pypy/lib/logic/computation_space/computationspace.py
pypy/dist/pypy/lib/logic/computation_space/distributor.py
pypy/dist/pypy/lib/logic/computation_space/problems.py
pypy/dist/pypy/lib/logic/computation_space/state.py
pypy/dist/pypy/lib/logic/computation_space/strategies.py
pypy/dist/pypy/lib/logic/computation_space/test_computationspace.py
pypy/dist/pypy/lib/logic/computation_space/test_variable.py
Log:
* first working one-solution strategy, applied to a scheduling problem
* small fixes to comp. space
* variables are really dataflow (not merely single-assignment)
Modified: pypy/dist/pypy/lib/logic/computation_space/computationspace.py
==============================================================================
--- pypy/dist/pypy/lib/logic/computation_space/computationspace.py (original)
+++ pypy/dist/pypy/lib/logic/computation_space/computationspace.py Wed Feb 15 17:50:26 2006
@@ -1,8 +1,9 @@
from threading import Thread, Condition, RLock, local
-from state import Succeeded, Distributable, Failed, Merged
+from state import Succeeded, Distributable, Failed, \
+ Merged, Distributing
-from variable import EqSet, Var, NoValue, \
+from variable import EqSet, Var, NoValue, Pair, \
VariableException, NotAVariable, AlreadyInStore
from constraint import FiniteDomain, ConsistencyFailure
from distributor import DefaultDistributor
@@ -16,6 +17,7 @@
def __eq__(self, other):
if other is None: return False
+ if not isinstance(other, Alternatives): return False
return self._nbalt == other._nbalt
def NoProblem():
@@ -58,14 +60,6 @@
#---- ComputationSpace -------------------------------
class ComputationSpace(object):
- """The Store consists of a set of k variables
- x1,...,xk that are partitioned as follows:
- * set of unbound variables that are equal
- (also called equivalence sets of variables).
- The variables in each set are equal to each
- other but not to any other variables.
- * variables bound to a number, record or procedure
- (also called determined variables)."""
# we have to enforce only one distributor
# thread running in one space at the same time
@@ -75,6 +69,7 @@
# consistency-preserving stuff
self.in_transaction = False
self.bind_lock = RLock()
+ self.var_lock = RLock()
self.status = None
self.status_condition = Condition()
self.distributor = DefaultDistributor(self)
@@ -93,6 +88,7 @@
# set up the problem
self.bind(self.root, problem(self))
# check satisfiability of the space
+ self._init_choose_commit()
self._process()
if self.status == Distributable:
self.distributor.start()
@@ -102,24 +98,144 @@
self.var_const_map = parent.var_const_map
self.constraints = parent.constraints
self.root = parent.root
+ self._init_choose_commit()
+ def _init_choose_commit(self):
# create a unique choice point
- self.CHOICE = self._make_choice_var()
+ # using two vars as channels betwen
+ # space and distributor threads
+ self.CHOOSE = self._make_choice_var()
+ self.STABLE = self._make_stable_var()
+ # we start stanle
+ self.STABLE.bind(0)
- def __del__(self):
- self.status = Failed
- self.bind(self.CHOICE, 0)
+#-- Computation Space -----------------------------------------
+
+ def _make_choice_var(self):
+ ComputationSpace._nb_choices += 1
+ return self.var('__choice__'+str(self._nb_choices))
+
+ def _make_stable_var(self):
+ ComputationSpace._nb_choices += 1
+ return self.var('__stable__'+str(self._nb_choices))
+
+ def _process(self):
+ """auxilary of the distributor
+ XXX: shouldn't only the distributor call it ?
+ """
+ try:
+ self.satisfy_all()
+ except ConsistencyFailure:
+ self.status = Failed
+ else:
+ if self._distributable():
+ self.status = Distributable
+ else:
+ self.status = Succeeded
+
+ def _suspended(self):
+ raise NotImplemented
+ # additional basic constraints done in an ancestor can
+ # make it runnable ; it is a temporary condition due
+ # to concurrency ; it means that some ancestor space
+ # has not yet transferred all required information to
+ # the space
+
+
+ def _distributable(self):
+ if self.status not in (Failed, Succeeded, Merged):
+ # sync. barrier with distributor
+ for var in self.vars:
+ if var.cs_get_dom(self).size() > 1 :
+ return True
+ return False
+ # in The Book : "the space has one thread that is
+ # suspended on a choice point with two or more alternatives.
+ # A space can have at most one choice point; attempting to
+ # create another gives an error."
+
+ def ask(self):
+ print "SPACE Ask() checks stability ..."
+ self.STABLE.get() # that's real stability
+ print "SPACE is stable, resuming Ask()"
+ status = self.status in (Failed, Succeeded, Merged)
+ if status: return self.status
+ if self._distributable():
+ return Alternatives(self.distributor.nb_subdomains())
+ # should be unreachable
+ raise NotImplemented
+
+ def clone(self):
+ #XXX: lazy copy of domains would be nice
+ spc = ComputationSpace(NoProblem, parent=self)
+ for var in spc.vars:
+ var.cs_set_dom(spc, var.cs_get_dom(self).copy())
+ spc.distributor.set_space(spc)
+ if spc.status == Distributable:
+ spc.distributor.start()
+ return spc
+
+ def inject(self, restricting_problem):
+ """add additional entities into a space"""
+ restricting_problem(self)
+ self._process()
+
+ def commit(self, choice):
+ """if self is distributable, causes the Choose call in the
+ space to complete and return some_number as a result. This
+ may cause the spzce to resume execution.
+ some_number must satisfy 1=<I=<N where N is the first arg
+ of the Choose call.
+ """
+ print "SPACE commited to", choice
+ # block future calls to Ask until the distributor
+ # binds STABLE
+ self.STABLE = self._make_stable_var()
+ print "SPACE binds CHOOSE to", choice
+ self.bind(self.CHOOSE, choice)
+
+
+ def choose(self, nb_choices):
+ """
+ waits for stability
+ blocks until commit provides a value
+ between 0 and nb_choices
+ at most one choose running in a given space
+ at a given time
+ ----
+ this is used by the distributor thread
+ """
+ choice = self.CHOOSE.get()
+ return choice
+
+ def merge(self):
+ """binds root vars to their singleton domains """
+ assert self.status == Succeeded
+ for var in self.root.val:
+ var.bind(var.cs_get_dom(self).get_values()[0])
+ self.status = Merged
+ # shut down the distributor
+ self.distributor.cs = None
+ self.CHOOSE.bind(0)
+ return self.root.val
+
+ def set_distributor(self, dist):
+ self.distributor = dist
-#-- Store ------------------------------------------------
+#-- Constraint Store ---------------------------------------
#-- Variables ----------------------------
def var(self, name):
"""creates a single assignment variable of name name
and puts it into the store"""
- v = Var(name, self)
- self.add_unbound(v)
- return v
+ self.var_lock.acquire()
+ try:
+ v = Var(name, self)
+ self.add_unbound(v)
+ return v
+ finally:
+ self.var_lock.release()
def add_unbound(self, var):
"""add unbound variable to the store"""
@@ -148,9 +264,10 @@
def find_vars(self, *names):
"""looks up many variables"""
try:
- return [self.names[name] for name in names]
+ return [self.names[name]
+ for name in names]
except KeyError:
- raise NotInStore(name)
+ raise NotInStore(str(names))
def is_bound(self, var):
"""check wether a var is locally bound"""
@@ -234,6 +351,7 @@
return narrowed_domains
def satisfy(self, constraint):
+ """narrows the domains down to satisfiability"""
assert constraint in self.constraints
varset = set()
constset = set()
@@ -308,14 +426,20 @@
2. (unbound)Variable/(bound)Variable or
3. (unbound)Variable/Value binding
"""
+ # just introduced complete dataflow behaviour,
+ # where binding several times to compatible
+ # values is allowed provided no information is
+ # removed (this last condition remains to be checked)
+ self.bind_lock.acquire()
try:
- self.bind_lock.acquire()
assert(isinstance(var, Var) and (var in self.vars))
if var == val:
return
if _both_are_vars(var, val):
if _both_are_bound(var, val):
- raise AlreadyBound(var.name)
+ if _unifiable(var, val):
+ return # XXX check corrrectness
+ raise UnificationFailure(var, val)
if var._is_bound(): # 2b. var is bound, not var
self.bind(val, var)
elif val._is_bound(): # 2a.var is bound, not val
@@ -324,7 +448,9 @@
self._alias(var, val)
else: # 3. val is really a value
if var._is_bound():
- raise AlreadyBound(var.name)
+ if _unifiable(var.val, val):
+ return # XXX check correctness
+ raise UnificationFailure(var, val)
self._bind(var.val, val)
finally:
self.bind_lock.release()
@@ -398,11 +524,8 @@
self.bind(y,x)
def _unify_var_val(self, x, y):
- if x.val != y:
- try:
- self.bind(x, y)
- except AlreadyBound:
- raise UnificationFailure(x, y)
+ if x.val != y: # what else ?
+ self.bind(x, y)
def _unify_bound(self, x, y):
# print "unify bound %s %s" % (x, y)
@@ -416,7 +539,7 @@
raise UnificationFailure(x, y)
def _unify_iterable(self, x, y):
- print "unify sequences %s %s" % (x, y)
+ #print "unify sequences %s %s" % (x, y)
vx, vy = (x.val, y.val)
idx, top = (0, len(vx))
while (idx < top):
@@ -429,115 +552,6 @@
for xk in vx.keys():
self._really_unify(vx[xk], vy[xk])
-#-- Computation Space -----------------------------------------
-
- def _make_choice_var(self):
- ComputationSpace._nb_choices += 1
- return self.var('__choice__'+str(self._nb_choices))
-
- def _process(self):
- try:
- self.satisfy_all()
- except ConsistencyFailure:
- self.status = Failed
- else:
- if self._distributable():
- self.status = Distributable
- else:
- self.status = Succeeded
-
- def _stable(self):
- #XXX: really ?
- return self.status in (Failed, Succeeded, Merged) \
- or self._distributable()
-
- def _suspended(self):
- raise NotImplemented
- # additional basic constraints done in an ancestor can
- # make it runnable ; it is a temporary condition due
- # to concurrency ; it means that some ancestor space
- # has not yet transferred all required information to
- # the space
-
-
- def _distributable(self):
- if self.status not in (Failed, Succeeded, Merged):
- return self._distributable_domains()
- return False
- # in The Book : "the space has one thread that is
- # suspended on a choice point with two or more alternatives.
- # A space canhave at most one choice point; attempting to
- # create another gives an error."
-
- def _distributable_domains(self):
- for var in self.vars:
- if var.cs_get_dom(self).size() > 1 :
- return True
- return False
-
- def set_distributor(self, dist):
- self.distributor = dist
-
- def ask(self):
- # XXX: block on status being not stable for threads
- # use a df var instead of explicit waiting
- # XXX: truly df vars needed (not one-shot bindings)
- try:
- self.status_condition.acquire()
- while not self._stable():
- self.status_condition.wait()
- if self._distributable():
- return Alternatives(self.distributor.nb_subdomains())
- return self.status
- finally:
- self.status_condition.release()
-
- def clone(self):
- #XXX: lazy copy of domains would be nice
- spc = ComputationSpace(NoProblem, parent=self)
- for var in spc.vars:
- var.cs_set_dom(spc, var.cs_get_dom(self).copy())
- spc.distributor.set_space(spc)
- if spc.status == Distributable:
- spc.distributor.start()
- return spc
-
- def inject(self, restricting_problem):
- """add additional entities into a space"""
- restricting_problem(self)
- self._process()
-
- def commit(self, choice):
- """if self is distributable, causes the Choose call in the
- space to complete and return some_number as a result. This
- may cause the spzce to resume execution.
- some_number must satisfy 1=<I=<N where N is the first arg
- of the Choose call.
- """
- self.bind(self.CHOICE, choice)
-
- def choose(self, nb_choices):
- """
- waits for stability
- blocks until commit provides a value
- between 0 and nb_choices
- at most one choose running in a given space
- at a given time
- ----
- this is used by the distributor thread
- """
- self.ask()
- choice = self.CHOICE.get()
- self.CHOICE = self._make_choice_var()
- return choice
-
- def merge(self):
- assert self.status == Succeeded
- # the following ought to be atomic (?)
- for var in self.root.val:
- var.val = var.cs_get_dom(self).get_values()[0]
- self.status = Merged
-
#-- Unifiability checks---------------------------------------
#--
#-- quite costly & could be merged back in unify
@@ -593,11 +607,10 @@
return _iterable_unifiable([e[1] for e in v1],
[e[1] for e in v2])
-#-- Some utilities -----------------------------------------------
+#-- Some utilities -------------------------------------------
def _both_are_vars(v1, v2):
return isinstance(v1, Var) and isinstance(v2, Var)
def _both_are_bound(v1, v2):
return v1._is_bound() and v2._is_bound()
-
Modified: pypy/dist/pypy/lib/logic/computation_space/distributor.py
==============================================================================
--- pypy/dist/pypy/lib/logic/computation_space/distributor.py (original)
+++ pypy/dist/pypy/lib/logic/computation_space/distributor.py Wed Feb 15 17:50:26 2006
@@ -1,6 +1,7 @@
import math, random
from threading import Thread
-from state import Succeeded, Distributable, Failed, Merged
+from state import Succeeded, Distributable, \
+ Distributing, Failed, Merged
def arrange_domains(cs, variables):
"""build a data structure from var to dom
@@ -124,19 +125,32 @@
def nb_subdomains(self):
"""See AbstractDistributor"""
- self.__to_split = self.findSmallestDomain()
+ try:
+ self.cs.var_lock.acquire()
+ self.__to_split = self.findSmallestDomain()
+ finally:
+ self.cs.var_lock.release()
if self.nb_subspaces:
- return min(self.nb_subspaces, self.__to_split.cs_get_dom(self.cs).size()) #domains[self.__to_split].size())
+ return min(self.nb_subspaces,
+ self.__to_split.cs_get_dom(self.cs).size())
else:
- return self.__to_split.cs_get_dom(self.cs).size() # domains[self.__to_split].size()
+ return self.__to_split.cs_get_dom(self.cs).size()
### new threaded distributor
def run(self):
while self.cs.status == Distributable:
+ print "DISTRIBUTOR sleeps on choose()"
choice = self.cs.choose(self.nb_subdomains())
+ # racey ...
+ print "DISTRIBUTOR choice =", choice
self.new_distribute(choice)
self.cs._process()
+ print "DISTRIBUTOR builds new CHOOSE"
+ self.cs.CHOOSE = self.cs._make_choice_var()
+ print "DISTRIBUTOR asserts space stability"
+ self.cs.STABLE.bind(0) # unlocks Ask
+
def new_distribute(self, choice):
"""See AbstractDistributor"""
@@ -152,7 +166,7 @@
variable.cs_get_dom(self.cs).remove_values(values[end:])
- ### current tests rely on this old
+ ### some tests rely on this old
### do_everything-at-once version
def _distribute(self, *args):
Modified: pypy/dist/pypy/lib/logic/computation_space/problems.py
==============================================================================
--- pypy/dist/pypy/lib/logic/computation_space/problems.py (original)
+++ pypy/dist/pypy/lib/logic/computation_space/problems.py Wed Feb 15 17:50:26 2006
@@ -2,6 +2,10 @@
import constraint as c
import distributor as di
+def dummy_problem(computation_space):
+ ret = computation_space.var('__dummy__')
+ ret.dom = c.FiniteDomain([1, 2])
+ return (ret)
def satisfiable_problem(computation_space):
cs = computation_space
@@ -46,7 +50,49 @@
cs.set_distributor(di.DichotomyDistributor(cs))
return (x, w, y)
-def dummy_problem(computation_space):
- ret = computation_space.var('__dummy__')
- ret.dom = c.FiniteDomain([1, 2])
- return (ret)
+def conference_scheduling(computation_space):
+ cs = computation_space
+
+ variables = [cs.var(v)
+ for v in ('c01','c02','c03','c04','c05',
+ 'c06','c07','c08','c09','c10')]
+
+ dom_values = [(room,slot)
+ for room in ('room A','room B','room C')
+ for slot in ('day 1 AM','day 1 PM','day 2 AM',
+ 'day 2 PM')]
+ for v in variables:
+ v.cs_set_dom(cs, c.FiniteDomain(dom_values))
+
+ for conf in ('c03','c04','c05','c06'):
+ v = cs.get_var_by_name(conf)
+ cs.add_constraint(c.Expression(cs, [v], "%s[0] == 'room C'" % v.name))
+
+ for conf in ('c01','c05','c10'):
+ v = cs.get_var_by_name(conf)
+ cs.add_constraint(c.Expression(cs, [v], "%s[1].startswith('day 1')" % v.name))
+
+ for conf in ('c02','c03','c04','c09'):
+ v = cs.get_var_by_name(conf)
+ cs.add_constraint(c.Expression(cs, [v], "%s[1].startswith('day 2')" % v.name))
+
+ groups = (('c01','c02','c03','c10'),
+ ('c02','c06','c08','c09'),
+ ('c03','c05','c06','c07'),
+ ('c01','c03','c07','c08'))
+
+ for g in groups:
+ for conf1 in g:
+ for conf2 in g:
+ v1, v2 = cs.find_vars(conf1, conf2)
+ if conf2 > conf1:
+ cs.add_constraint(c.Expression(cs, [v1,v2],
+ '%s[1] != %s[1]'%\
+ (v1.name,v2.name)))
+
+ for conf1 in variables:
+ for conf2 in variables:
+ if conf2 > conf1:
+ cs.add_constraint(c.Expression(cs, [conf1,conf2],
+ '%s != %s'%(conf1.name,conf2.name)))
+ return tuple(variables)
Modified: pypy/dist/pypy/lib/logic/computation_space/state.py
==============================================================================
--- pypy/dist/pypy/lib/logic/computation_space/state.py (original)
+++ pypy/dist/pypy/lib/logic/computation_space/state.py Wed Feb 15 17:50:26 2006
@@ -8,6 +8,9 @@
class Distributable:
pass
+class Distributing:
+ pass
+
class Failed(Exception):
pass
Modified: pypy/dist/pypy/lib/logic/computation_space/strategies.py
==============================================================================
--- pypy/dist/pypy/lib/logic/computation_space/strategies.py (original)
+++ pypy/dist/pypy/lib/logic/computation_space/strategies.py Wed Feb 15 17:50:26 2006
@@ -13,7 +13,7 @@
if status == csp.Failed:
return None
elif status == csp.Succeeded:
- return [space]
+ return space
elif status == csp.Alternatives(2):
new_space = space.clone()
space.commit(1)
@@ -28,9 +28,8 @@
space = csp.ComputationSpace(problem)
- outcome = do_dfs(space)
- if outcome == None: return None
- space.merge()
- return space
+ solved_space = do_dfs(space)
+ if solved_space == None: return None
+ return solved_space.merge()
Modified: pypy/dist/pypy/lib/logic/computation_space/test_computationspace.py
==============================================================================
--- pypy/dist/pypy/lib/logic/computation_space/test_computationspace.py (original)
+++ pypy/dist/pypy/lib/logic/computation_space/test_computationspace.py Wed Feb 15 17:50:26 2006
@@ -19,34 +19,42 @@
def run(self):
self.fun(self, *self.args)
+def newspace(problem=problems.dummy_problem):
+ return space.ComputationSpace(problem)
+
#-- meat ------------------------
class TestStoreUnification:
def test_already_in_store(self):
- sp = space.ComputationSpace(problems.dummy_problem)
+ sp = newspace()
x = sp.var('x')
raises(v.AlreadyInStore, sp.var, 'x')
def test_already_bound(self):
- sp = space.ComputationSpace(problems.dummy_problem)
+ sp = newspace()
x = sp.var('x')
sp.bind(x, 42)
- raises(space.AlreadyBound, sp.bind, x, 42)
+ sp.bind(x, 42)
+ raises(space.UnificationFailure, sp.bind, x, 43)
def test_bind_var_var(self):
- sp = space.ComputationSpace(problems.dummy_problem)
- x = sp.var('x')
- y = sp.var('y')
- z = sp.var('z')
+ sp = newspace()
+ x, y, z = sp.var('x'), sp.var('y'), sp.var('z')
sp.bind(x, z)
assert x.val == space.EqSet([x, z])
assert y.val == space.EqSet([y])
assert z.val == space.EqSet([x, z])
+ z.bind(42)
+ assert z.val == 42
+ assert x.val == 42
+ y.bind(42)
+ assert y.val == 42
+ y.bind(z)
def test_bind_var_val(self):
- sp = space.ComputationSpace(problems.dummy_problem)
+ sp = newspace()
x, y, z = sp.var('x'), sp.var('y'), sp.var('z')
sp.bind(x, z)
sp.bind(y, 42)
@@ -56,7 +64,7 @@
assert z.val == 3.14
def test_unify_same(self):
- sp = space.ComputationSpace(problems.dummy_problem)
+ sp = newspace()
x,y,z,w = (sp.var('x'), sp.var('y'),
sp.var('z'), sp.var('w'))
sp.bind(x, [42, z])
@@ -67,7 +75,7 @@
assert z.val == 42
def test_double_unification(self):
- sp = space.ComputationSpace(problems.dummy_problem)
+ sp = newspace()
x, y, z = (sp.var('x'), sp.var('y'),
sp.var('z'))
sp.bind(x, 42)
@@ -79,7 +87,7 @@
def test_unify_values(self):
- sp = space.ComputationSpace(problems.dummy_problem)
+ sp = newspace()
x, y = sp.var('x'), sp.var('y')
sp.bind(x, [1, 2, 3])
sp.bind(y, [1, 2, 3])
@@ -88,7 +96,7 @@
assert y.val == [1, 2, 3]
def test_unify_lists_success(self):
- sp = space.ComputationSpace(problems.dummy_problem)
+ sp = newspace()
x,y,z,w = (sp.var('x'), sp.var('y'),
sp.var('z'), sp.var('w'))
sp.bind(x, [42, z])
@@ -100,7 +108,7 @@
assert w.val == 42
def test_unify_dicts_success(self):
- sp = space.ComputationSpace(problems.dummy_problem)
+ sp = newspace()
x,y,z,w = (sp.var('x'), sp.var('y'),
sp.var('z'), sp.var('w'))
sp.bind(x, {1:42, 2:z})
@@ -112,7 +120,7 @@
assert w.val == 42
def test_unify_failure(self):
- sp = space.ComputationSpace(problems.dummy_problem)
+ sp = newspace()
x,y,z = sp.var('x'), sp.var('y'), sp.var('z')
sp.bind(x, [42, z])
sp.bind(y, [z, 44])
@@ -123,7 +131,7 @@
assert z.val == space.EqSet([z])
def test_unify_failure2(self):
- sp = space.ComputationSpace(problems.dummy_problem)
+ sp = newspace()
x,y,z,w = (sp.var('x'), sp.var('y'),
sp.var('z'), sp.var('w'))
sp.bind(x, [42, z])
@@ -139,7 +147,7 @@
assert w.val == space.EqSet([z,w])
def test_unify_circular(self):
- sp = space.ComputationSpace(problems.dummy_problem)
+ sp = newspace()
x, y, z, w, a, b = (sp.var('x'), sp.var('y'),
sp.var('z'), sp.var('w'),
sp.var('a'), sp.var('b'))
@@ -162,12 +170,12 @@
def test_threads_creating_vars(self):
- sp = space.ComputationSpace(problems.dummy_problem)
+ sp = newspace()
def create_var(thread, *args):
x = sp.var('x')
def create_var2(thread, *args):
- raises(v.AlreadyExists, sp.var, 'x')
+ raises(v.AlreadyInStore, sp.var, 'x')
t1, t2 = (FunThread(create_var),
FunThread(create_var2))
@@ -178,7 +186,7 @@
def test_threads_binding_vars(self):
- sp = space.ComputationSpace(problems.dummy_problem)
+ sp = newspace()
def do_stuff(thread, var, val):
thread.raised = False
@@ -189,7 +197,7 @@
except Exception, e:
print e
thread.raised = True
- assert isinstance(e, space.AlreadyBound)
+ assert isinstance(e, space.UnificationFailure)
x = sp.var('x')
vars_ = []
@@ -217,7 +225,7 @@
def test_threads_waiting_for_unbound_var(self):
- sp = space.ComputationSpace(problems.dummy_problem)
+ sp = newspace()
import time
def near(v1, v2, err):
@@ -245,13 +253,13 @@
def test_set_var_domain(self):
- sp = space.ComputationSpace(problems.dummy_problem)
+ sp = newspace()
x = sp.var('x')
sp.set_domain(x, [1, 3, 5])
assert x.cs_get_dom(sp) == c.FiniteDomain([1, 3, 5])
def test_bind_with_domain(self):
- sp = space.ComputationSpace(problems.dummy_problem)
+ sp = newspace()
x = sp.var('x')
sp.set_domain(x, [1, 2, 3])
raises(space.OutOfDomain, sp.bind, x, 42)
@@ -259,7 +267,7 @@
assert x.val == 3
def test_bind_with_incompatible_domains(self):
- sp = space.ComputationSpace(problems.dummy_problem)
+ sp = newspace()
x, y = sp.var('x'), sp.var('y')
sp.set_domain(x, [1, 2])
sp.set_domain(y, [3, 4])
@@ -271,7 +279,7 @@
def test_unify_with_domains(self):
- sp = space.ComputationSpace(problems.dummy_problem)
+ sp = newspace()
x,y,z = sp.var('x'), sp.var('y'), sp.var('z')
sp.bind(x, [42, z])
sp.bind(y, [z, 42])
@@ -283,7 +291,7 @@
assert z.cs_get_dom(sp) == c.FiniteDomain([41, 42, 43])
def test_add_constraint(self):
- sp = space.ComputationSpace(problems.dummy_problem)
+ sp = newspace()
x,y,z = sp.var('x'), sp.var('y'), sp.var('z')
raises(c.DomainlessVariables,
c.Expression, sp, [x, y, z], 'x == y + z')
@@ -295,7 +303,7 @@
assert k in sp.constraints
def test_narrowing_domains_failure(self):
- sp = space.ComputationSpace(problems.dummy_problem)
+ sp = newspace()
x,y,z = sp.var('x'), sp.var('y'), sp.var('z')
x.cs_set_dom(sp, c.FiniteDomain([1, 2]))
y.cs_set_dom(sp, c.FiniteDomain([2, 3]))
@@ -304,7 +312,7 @@
raises(c.ConsistencyFailure, k.narrow)
def test_narrowing_domains_success(self):
- sp = space.ComputationSpace(problems.dummy_problem)
+ sp = newspace()
x,y,z = sp.var('x'), sp.var('y'), sp.var('z')
x.cs_set_dom(sp, c.FiniteDomain([1, 2, 5]))
y.cs_set_dom(sp, c.FiniteDomain([2, 3]))
@@ -316,7 +324,7 @@
assert z.cs_get_dom(sp) == c.FiniteDomain([3])
def test_compute_dependant_vars(self):
- sp = space.ComputationSpace(problems.dummy_problem)
+ sp = newspace()
x,y,z,w = (sp.var('x'), sp.var('y'),
sp.var('z'), sp.var('w'))
x.cs_set_dom(sp, c.FiniteDomain([1, 2, 5]))
@@ -334,7 +342,7 @@
assert constset == set([k1, k2])
def test_store_satisfiable_success(self):
- sp = space.ComputationSpace(problems.dummy_problem)
+ sp = newspace()
x,y,z = sp.var('x'), sp.var('y'), sp.var('z')
x.cs_set_dom(sp, c.FiniteDomain([1, 2, 5]))
y.cs_set_dom(sp, c.FiniteDomain([2, 3]))
@@ -347,7 +355,7 @@
assert z.cs_get_dom(sp) == c.FiniteDomain([3, 4])
def test_store_satisfiable_failure(self):
- sp = space.ComputationSpace(problems.dummy_problem)
+ sp = newspace()
x,y,z = sp.var('x'), sp.var('y'), sp.var('z')
x.cs_set_dom(sp, c.FiniteDomain([1, 2]))
y.cs_set_dom(sp, c.FiniteDomain([2, 3]))
@@ -360,7 +368,7 @@
assert z.cs_get_dom(sp) == c.FiniteDomain([3, 4])
def test_satisfiable_many_const_success(self):
- sp = space.ComputationSpace(problems.dummy_problem)
+ sp = newspace()
x,y,z,w = (sp.var('x'), sp.var('y'),
sp.var('z'), sp.var('w'))
x.cs_set_dom(sp, c.FiniteDomain([1, 2, 5]))
@@ -394,7 +402,7 @@
def test_satisfiable_many_const_failure(self):
- sp = space.ComputationSpace(problems.dummy_problem)
+ sp = newspace()
x,y,z,w = (sp.var('x'), sp.var('y'),
sp.var('z'), sp.var('w'))
x.cs_set_dom(sp, c.FiniteDomain([1, 2, 5]))
@@ -421,7 +429,7 @@
assert narrowed_doms == {}
def test_satisfy_many_const_failure(self):
- sp = space.ComputationSpace(problems.dummy_problem)
+ sp = newspace()
x,y,z,w = (sp.var('x'), sp.var('y'),
sp.var('z'), sp.var('w'))
x.cs_set_dom(sp, c.FiniteDomain([1, 2, 5]))
@@ -444,7 +452,7 @@
assert w.cs_get_dom(sp) == c.FiniteDomain([1])
def test_satisfy_many_const_success(self):
- sp = space.ComputationSpace(problems.dummy_problem)
+ sp = newspace()
x,y,z,w = (sp.var('x'), sp.var('y'),
sp.var('z'), sp.var('w'))
x.cs_set_dom(sp, c.FiniteDomain([1, 2, 5]))
@@ -462,6 +470,9 @@
assert z.cs_get_dom(sp) == c.FiniteDomain([3])
assert w.cs_get_dom(sp) == c.FiniteDomain([4, 5])
+#-- computation spaces -------------------------------
+
+import strategies
class TestComputationSpace:
@@ -469,30 +480,30 @@
pass
def test_bind_cs_root(self):
- spc = space.ComputationSpace(problems.satisfiable_problem)
+ spc = newspace(problems.satisfiable_problem)
assert '__root__' in spc.names
assert set(['x', 'y', 'w']) == \
set([var.name for var in spc.root.val])
def test_ask_success(self):
- spc = space.ComputationSpace(problems.one_solution_problem)
+ spc = newspace(problems.one_solution_problem)
assert spc.ask() == space.Succeeded
def test_double_ask(self):
- spc = space.ComputationSpace(problems.one_solution_problem)
+ spc = newspace(problems.one_solution_problem)
assert spc.ask() == space.Succeeded
assert spc.ask() == space.Succeeded
def test_ask_failure(self):
- spc = space.ComputationSpace(problems.unsatisfiable_problem)
+ spc = newspace(problems.unsatisfiable_problem)
assert spc.ask() == space.Failed
def test_ask_alternatives(self):
- spc = space.ComputationSpace(problems.satisfiable_problem)
+ spc = newspace(problems.satisfiable_problem)
assert spc.ask() == space.Alternatives(2)
def test_old_distribute(self):
- spc = space.ComputationSpace(problems.satisfiable_problem)
+ spc = newspace(problems.satisfiable_problem)
new_domains = [tuple(d.items()) for d in
spc.distributor.distribute()]
x, y, z, w = (spc.get_var_by_name('x'),
@@ -516,12 +527,13 @@
assert e1 == e2
def test_clone_and_distribute(self):
- spc = space.ComputationSpace(problems.satisfiable_problem)
+ spc = newspace(problems.satisfiable_problem)
w = spc.get_var_by_name('w')
assert spc.ask() == space.Alternatives(2)
new_spc = spc.clone()
# following couple of ops superceeded by inject()
- new_spc.add_constraint(c.Expression(new_spc, [w], 'w == 5'))
+ new_spc.add_constraint(c.Expression(new_spc, [w],
+ 'w == 5'))
new_spc._process()
assert spc.ask() == space.Alternatives(2)
assert new_spc.ask() == space.Succeeded
@@ -530,8 +542,9 @@
def test_inject(self):
def more_constraints(space):
- space.add_constraint(c.Expression(space, [w], 'w == 5'))
- spc = space.ComputationSpace(problems.satisfiable_problem)
+ space.add_constraint(c.Expression(space, [w],
+ 'w == 5'))
+ spc = newspace(problems.satisfiable_problem)
w = spc.get_var_by_name('w')
assert spc.ask() == space.Alternatives(2)
new_spc = spc.clone()
@@ -542,17 +555,17 @@
assert w.cs_get_dom(new_spc) == c.FiniteDomain([5])
def test_merge(self):
- spc = space.ComputationSpace(problems.satisfiable_problem)
+ spc = newspace(problems.satisfiable_problem)
x, y, z, w = spc.find_vars('x', 'y', 'z', 'w')
assert spc.TOP
- assert spc.ask() == space.Alternatives(2)
assert spc.dom(x) == c.FiniteDomain([6])
assert spc.dom(y) == c.FiniteDomain([2])
assert spc.dom(z) == c.FiniteDomain([4])
assert spc.dom(w) == c.FiniteDomain([5, 6, 7])
def more_constraints(space):
- space.add_constraint(c.Expression(space, [w], 'w == 5'))
+ space.add_constraint(c.Expression(space, [w],
+ 'w == 5'))
nspc = spc.clone()
nspc.inject(more_constraints)
@@ -569,3 +582,18 @@
assert y.val == 2
assert w.val == 5
assert (x, w, y) == nspc.root.val
+
+ def test_scheduling_problem_dfs_one_solution(self):
+ sol = strategies.dfs_one_solution(problems.conference_scheduling)
+
+ sol2 = [var.val for var in sol]
+ assert sol2 == [('room A', 'day 1 AM'),
+ ('room A', 'day 2 PM'),
+ ('room C', 'day 2 AM'),
+ ('room C', 'day 2 PM'),
+ ('room C', 'day 1 PM'),
+ ('room C', 'day 1 AM'),
+ ('room B', 'day 2 PM'),
+ ('room B', 'day 1 PM'),
+ ('room B', 'day 2 AM'),
+ ('room A', 'day 1 PM')]
Modified: pypy/dist/pypy/lib/logic/computation_space/test_variable.py
==============================================================================
--- pypy/dist/pypy/lib/logic/computation_space/test_variable.py (original)
+++ pypy/dist/pypy/lib/logic/computation_space/test_variable.py Wed Feb 15 17:50:26 2006
@@ -356,7 +356,7 @@
Ys = sp.var('Ys')
generator = FunThread(dgenerate, 0, Xs)
- bbuffer = FunThread(bounded_buffer, 4, Xs, Ys)
+ bbuffer = FunThread(bounded_buffer, 8, Xs, Ys)
summer = FunThread(dsum, Ys, 0, 50)
generator.start()
More information about the Pypy-commit
mailing list