[pypy-svn] r33236 - pypy/dist/pypy/objspace/test
auc at codespeak.net
auc at codespeak.net
Thu Oct 12 19:24:40 CEST 2006
Author: auc
Date: Thu Oct 12 19:24:38 2006
New Revision: 33236
Modified:
pypy/dist/pypy/objspace/test/test_logicobjspace.py
Log:
some needed adjustement (most notably, removed explicit scheduling calls)
Modified: pypy/dist/pypy/objspace/test/test_logicobjspace.py
==============================================================================
--- pypy/dist/pypy/objspace/test/test_logicobjspace.py (original)
+++ pypy/dist/pypy/objspace/test/test_logicobjspace.py Thu Oct 12 19:24:38 2006
@@ -6,6 +6,11 @@
# we might be called from _test_logic_build
# if not, check your paths
+try:
+ is_interpreted()
+except:
+ def is_interpreted(): return True
+
class AppTest_Logic(object):
@@ -164,13 +169,22 @@
assert X == (1, (2, None))
def test_unify_list(self):
+ # this tests fails with compiled pypy-logic
+ # for no apparent reason
+ # seems like we block in the wait call
+ # from X == x (where at least one var
+ # looks free then)
X = newvar()
x = (newvar(), newvar())
unify(X, x)
unify(X[1], (newvar(), newvar()))
- assert is_bound(X)
- assert X == x
+ # passes
+ assert not is_free(X)
+ assert not is_free(x)
+ assert X is x
assert X[1] == x[1]
+ # fails
+ assert X == x
unify(X, (1, (2, None)))
assert X == (1, (2, None))
unify(X, (1, (2, None)))
@@ -253,7 +267,7 @@
X = newvar()
T = future(poop, X)
raises(FutureBindingError, unify, T, 42)
- bind(X, 42); schedule() # helps the gc
+ bind(X, 42)
X, Y, Z = newvar(), newvar(), newvar()
bind(Z, 42)
@@ -262,7 +276,7 @@
raises(FutureBindingError, unify, Z, T)
raises(FutureBindingError, unify, Y, T)
raises(FutureBindingError, unify, T, Y)
- bind(X, 42); schedule() # gc ...
+ bind(X, 42)
def test_one_future_exception(self):
class FooException(Exception): pass
@@ -314,7 +328,6 @@
raise FooException
else: # but returned
return Canary
- schedule()
return 42
B, C = newvar(), newvar()
@@ -364,7 +377,6 @@
future(binder, X)
assert X == 42
- schedule() # gc help
def test_eager_producer_consummer(self):
@@ -426,7 +438,7 @@
assert len(sched_info()['blocked_byneed']) == 1
reset_scheduler()
assert len(sched_info()['blocked_byneed']) == 0
- assert len(sched_info()['threads']) == 1
+ assert len(sched_info().values()[0]['threads']) == 1
def test_wait_two(self):
@@ -449,8 +461,7 @@
unify(Y, 42)
assert X == Y == 42
assert o == 2
- schedule() # give a chance to the second thread to exit
- assert len(sched_info()['threads']) == 1
+ assert len(sched_info().values()[0]['threads']) == 1
def test_fib(self):
def fib(X):
@@ -488,7 +499,7 @@
try:
wait(Failed)
except RebindingError, e:
- assert len(sched_info()['threads']) == 1
+ assert len(sched_info().values()[0]['threads']) == 1
return
assert False
@@ -631,7 +642,6 @@
wait(C); wait(S)
assert bound_part(C) == [1, 1, 0]
assert bound_part(S) == [0, 1, 1]
- schedule()
reset_scheduler() # free all the hanging threads
@@ -688,31 +698,12 @@
#raises(TypeError, domain_of, x)
bind(X, True)
+ return []
X = newvar()
newspace(in_space, X)
wait(X)
- def test_newspace_ask_wait(self):
-
- def quux(X):
- while 1:
- if is_bound(X):
- break
- schedule()
-
- def asker(cspace):
- cspace.ask()
-
- X = newvar()
- s = newspace(quux, X)
- stacklet(asker, s)
- unify(X, 42)
- assert len(sched_info()['asking']) == 1
- schedule() # allow quux exit
- schedule() # allow asker exit
- assert len(sched_info()['asking']) == 0
-
def test_ask_choose(self):
def chooser(X):
@@ -726,7 +717,6 @@
X = newvar()
s = newspace(chooser, X)
stacklet(asker, s)
- schedule()
wait(X)
assert X == 2
@@ -737,6 +727,7 @@
choice = choose(v)
assert choice == v
unify(X, 'done')
+ return []
def asker(cspace):
while 1:
@@ -745,21 +736,13 @@
if choices == 8: # success !
break
- # choices >= 1
v = range(2, 9)
X = newvar()
s = newspace(chooser, v, X)
stacklet(asker, s)
- schedule()
- assert len(sched_info()['asking']) == 1
- mainspace = sched_info()['space_accounting'].keys()[0]
- assert sched_info()['space_accounting'][mainspace] == 0
-
+ assert len(sched_info()[interp_id(s)]['asking']) == 1
assert X == 'done'
- schedule()
- #XXX
- #assert len(sched_info()['threads']) == 1
def test_tell_ask_choose_commit(self):
@@ -781,34 +764,49 @@
s = newspace(conference_scheduling)
Solution = newvar()
stacklet(solve, s, commit_to, Solution)
- if commit_to == 1:
- assert set(Solution) == set([('room A', 'day 1 AM'),
- ('room B', 'day 1 AM'),
- ('room B', 'day 2 PM'),
- ('room A', 'day 1 PM'),
- ('room A', 'day 2 AM'),
- ('room C', 'day 2 PM'),
- ('room C', 'day 2 AM'),
- ('room C', 'day 1 AM'),
- ('room C', 'day 1 PM'),
- ('room B', 'day 2 AM')])
- else:
- assert set(Solution) == set([('room B', 'day 1 PM'),
- ('room A', 'day 1 PM'),
- ('room B', 'day 2 AM'),
- ('room B', 'day 1 AM'),
- ('room A', 'day 2 PM'),
- ('room C', 'day 2 AM'),
- ('room C', 'day 2 PM'),
- ('room C', 'day 1 PM'),
- ('room C', 'day 1 AM'),
- ('room B', 'day 2 PM')])
+## if commit_to == 1:
+## assert set(Solution) == set([('room A', 'day 1 AM'),
+## ('room B', 'day 1 AM'),
+## ('room B', 'day 2 PM'),
+## ('room A', 'day 1 PM'),
+## ('room A', 'day 2 AM'),
+## ('room C', 'day 2 PM'),
+## ('room C', 'day 2 AM'),
+## ('room C', 'day 1 AM'),
+## ('room C', 'day 1 PM'),
+## ('room B', 'day 2 AM')])
+## else:
+## assert set(Solution) == set([('room B', 'day 1 PM'),
+## ('room A', 'day 1 PM'),
+## ('room B', 'day 2 AM'),
+## ('room B', 'day 1 AM'),
+## ('room A', 'day 2 PM'),
+## ('room C', 'day 2 AM'),
+## ('room C', 'day 2 PM'),
+## ('room C', 'day 1 PM'),
+## ('room C', 'day 1 AM'),
+## ('room B', 'day 2 PM')])
+
+ # well, depending on dict key linear order, we get different
+ # results
+ print Solution
+
+
+ def test_default_solver(self):
+ if is_interpreted():
+ skip("will loop infinitely (bug in space.clone())")
+ from problem import conference_scheduling
+ from constraint import solver
+
+ s = newspace(conference_scheduling)
+ sols = set()
+ for sol in solver.solve(s):
+ sols.add(tuple(sol))
+ print sol
+ assert len(sols) == 64
- #XXX who's still stuck there ?
- #assert len(sched_info()['threads']) == 1
- def test_recomputing_tell_ask_choose_commit(self):
- skip("interpreted clone support still missing")
+ def test_recomputing_solver(self):
if is_interpreted():
skip("interpreted clone support still missing")
from problem import conference_scheduling
More information about the Pypy-commit
mailing list