[pypy-svn] r42232 - in pypy/dist/pypy/lang/prolog: builtin interpreter interpreter/test

cfbolz at codespeak.net cfbolz at codespeak.net
Sat Apr 21 15:52:57 CEST 2007


Author: cfbolz
Date: Sat Apr 21 15:52:56 2007
New Revision: 42232

Modified:
   pypy/dist/pypy/lang/prolog/builtin/database.py
   pypy/dist/pypy/lang/prolog/interpreter/engine.py
   pypy/dist/pypy/lang/prolog/interpreter/test/test_builtin.py
   pypy/dist/pypy/lang/prolog/interpreter/test/test_parsing.py
Log:
make asserting of new facts more efficient and support the "logical update
view" for retract.


Modified: pypy/dist/pypy/lang/prolog/builtin/database.py
==============================================================================
--- pypy/dist/pypy/lang/prolog/builtin/database.py	(original)
+++ pypy/dist/pypy/lang/prolog/builtin/database.py	Sat Apr 21 15:52:56 2007
@@ -14,8 +14,8 @@
     if signature in builtins:
         error.throw_permission_error("modify", "static_procedure",
                                      predicate)
-    if signature in engine.signature2rules:
-        del engine.signature2rules[signature]
+    if signature in engine.signature2function:
+        del engine.signature2function[signature]
 expose_builtin(impl_abolish, "abolish", unwrap_spec=["obj"])
 
 def impl_assert(engine, rule):
@@ -39,9 +39,13 @@
         assert isinstance(head, term.Callable)
         error.throw_permission_error("modify", "static_procedure", 
                                      head.get_prolog_signature())
-    rules = engine.signature2rules.get(head.signature, [])
-    for i in range(len(rules)):
-        rule = rules[i]
+    function = engine.signature2function.get(head.signature, None)
+    if function is None:
+        raise UnificationFailed
+    #import pdb; pdb.set_trace()
+    rulechain = function.rulechain
+    while rulechain:
+        rule = rulechain.rule
         oldstate = engine.frame.branch()
         # standardizing apart
         try:
@@ -51,8 +55,15 @@
         except error.UnificationFailed:
             engine.frame.revert(oldstate)
         else:
-            del rules[i]
+            if function.rulechain is rulechain:
+                if rulechain.next is None:
+                    del engine.signature2function[head.signature]
+                else:
+                    function.rulechain = rulechain.next
+            else:
+                function.remove(rulechain)
             break
+        rulechain = rulechain.next
     else:
         raise error.UnificationFailed()
 expose_builtin(impl_retract, "retract", unwrap_spec=["callable"])

Modified: pypy/dist/pypy/lang/prolog/interpreter/engine.py
==============================================================================
--- pypy/dist/pypy/lang/prolog/interpreter/engine.py	(original)
+++ pypy/dist/pypy/lang/prolog/interpreter/engine.py	Sat Apr 21 15:52:56 2007
@@ -90,10 +90,61 @@
         self.extend(1)
         return result
 
+class LinkedRules(object):
+    def __init__(self, rule, next=None):
+        self.rule = rule
+        self.next = next
+
+    def copy(self, stopat=None):
+        first = LinkedRules(self.rule)
+        curr = self.next
+        copy = first
+        while curr is not stopat:
+            new = LinkedRules(curr.rule)
+            copy.next = new
+            copy = new
+            curr = curr.next
+        return first, copy
+
+    def find_applicable_rule(self, query, uh1):
+        #import pdb;pdb.set_trace()
+        while self:
+            uh2 = self.rule.unify_hash
+            assert len(uh1) == len(uh2)
+            for j in range(len(uh1)):
+                if uh1[j] != 0 and uh2[j] != 0 and uh1[j] != uh2[j]:
+                    break
+            else:
+                return self
+            self = self.next
+        return None
+
+    def __repr__(self):
+        return "LinkedRules(%r, %r)" % (self.rule, self.next)
+
+
+class Function(object):
+    def __init__(self, firstrule):
+        self.rulechain = LinkedRules(firstrule)
+        self.last = self.rulechain
+
+    def add_rule(self, rule, end):
+        #import pdb; pdb.set_trace()
+        if end:
+            self.rulechain, last = self.rulechain.copy()
+            self.last = LinkedRules(rule)
+            last.next = self.last
+        else:
+            self.rulechain = LinkedRules(rule, self.rulechain)
+
+    def remove(self, rulechain):
+        self.rulechain, last = self.rulechain.copy(rulechain)
+        last.next = rulechain.next
+
 class Engine(object):
     def __init__(self):
         self.frame = Frame()
-        self.signature2rules = {}
+        self.signature2function = {}
         self.parser = None
         self.operations = None
     
@@ -116,13 +167,11 @@
         if signature in builtin.builtins:
             error.throw_permission_error(
                 "modify", "static_procedure", rule.head.get_prolog_signature())
-        # it's important to not update the list in place, because
-        # there might be references to it in the stack somewhere
-        rules = self.signature2rules.get(signature, [])
-        if end:
-            self.signature2rules[signature] = rules + [rule]
+        function = self.signature2function.get(signature, None)
+        if function is not None:
+            self.signature2function[signature].add_rule(rule, end)
         else:
-            self.signature2rules[signature] = [rule] + rules
+            self.signature2function[signature] = Function(rule)
 
     def run(self, query, continuation=DONOTHING):
         if not isinstance(query, Callable):
@@ -164,19 +213,21 @@
     def user_call(self, query, continuation):
         #import pdb; pdb.set_trace()
         signature = query.signature
-        rules = self.signature2rules.get(signature, None)
-        if rules is None:
+        function = self.signature2function.get(signature, None)
+        if function is None:
             error.throw_existence_error(
                 "procedure", query.get_prolog_signature())
         unify_hash = query.get_deeper_unify_hash(self.frame)
-        rule, i = self.find_applicable_rule(0, rules, query, unify_hash)
-        if rule is None:
+        rulechain = function.rulechain.find_applicable_rule(query, unify_hash)
+        if rulechain is None:
             # none of the rules apply
             raise UnificationFailed()
+        rule = rulechain.rule
+        rulechain = rulechain.next
         oldstate = self.frame.branch()
-        while 1:
-            next, i = self.find_applicable_rule(i, rules, query, unify_hash)
-            if next is None:
+        while rulechain:
+            rulechain = rulechain.find_applicable_rule(query, unify_hash)
+            if rulechain is None:
                 self.frame.discard(oldstate)
                 break
             if rule.contains_cut:
@@ -199,7 +250,8 @@
                     return result
                 except UnificationFailed:
                     self.frame.revert(oldstate)
-            rule = next
+            rule = rulechain.rule
+            rulechain = rulechain.next
         if rule.contains_cut:
             continuation = LimitedScopeContinuation(continuation)
             try:
@@ -226,18 +278,6 @@
             return self.call(nextcall, continuation)
         return continuation.call(self)
 
-    def find_applicable_rule(self, startindex, rules, query, uh1):
-        i = startindex
-        while i < len(rules):
-            uh2 = rules[i].unify_hash
-            assert len(uh1) == len(uh2)
-            for j in range(len(uh1)):
-                if uh1[j] != 0 and uh2[j] != 0 and uh1[j] != uh2[j]:
-                    break
-            else:
-                return rules[i], i + 1
-            i += 1
-        return None, 0
 
     def continue_after_cut(self, continuation, lsc=None):
         while 1:

Modified: pypy/dist/pypy/lang/prolog/interpreter/test/test_builtin.py
==============================================================================
--- pypy/dist/pypy/lang/prolog/interpreter/test/test_builtin.py	(original)
+++ pypy/dist/pypy/lang/prolog/interpreter/test/test_builtin.py	Sat Apr 21 15:52:56 2007
@@ -120,6 +120,25 @@
     """)
     frames = collect_all(e, "g(X).")
     assert len(frames) == 3
+    e = get_engine("""
+        p :- assertz(p), fail.
+        p :- fail.
+    """)
+    assert_false("p.", e)
+    e = get_engine("""
+        q :- fail.
+        q :- assertz(q), fail.
+    """)
+    assert_false("q.", e)
+
+
+def test_retract_logical_update_view():
+    e = get_engine("""
+        p :- retract(p :- true), fail.
+        p :- true.
+    """)
+    assert_true("p.", e)
+    assert_false("p.", e)
 
 def test_abolish():
     e = get_engine("g(b, b). g(c, c). g(a). f(b, b). h(b, b).")

Modified: pypy/dist/pypy/lang/prolog/interpreter/test/test_parsing.py
==============================================================================
--- pypy/dist/pypy/lang/prolog/interpreter/test/test_parsing.py	(original)
+++ pypy/dist/pypy/lang/prolog/interpreter/test/test_parsing.py	Sat Apr 21 15:52:56 2007
@@ -30,7 +30,7 @@
     for fact in facts:
         print fact
         e.add_rule(fact)
-    assert e.signature2rules["add_numeral/3"][0].head.args[1].name == "null"
+    assert e.signature2function["add_numeral/3"].rulechain.rule.head.args[1].name == "null"
     four = Term("succ", [Term("succ", [Term("succ",
                 [Term("succ", [Atom("null")])])])])
     e.run(parse_query_term("numeral(succ(succ(null)))."))



More information about the Pypy-commit mailing list