[pypy-svn] r56949 - pypy/branch/garden-call-code/pypy/module/_stackless/test

pedronis at codespeak.net pedronis at codespeak.net
Sun Aug 3 22:48:34 CEST 2008


Author: pedronis
Date: Sun Aug  3 22:48:32 2008
New Revision: 56949

Added:
   pypy/branch/garden-call-code/pypy/module/_stackless/test/test_frame_chain_reconstruction.py   (contents, props changed)
Log:
tests about frame chain reconstruction that can run on CPython+greenlet, check the frame chain by faking rstack.resume_state_create

skipped tests showing bad inderection of CALL_METHOD and coroutine pickling



Added: pypy/branch/garden-call-code/pypy/module/_stackless/test/test_frame_chain_reconstruction.py
==============================================================================
--- (empty file)
+++ pypy/branch/garden-call-code/pypy/module/_stackless/test/test_frame_chain_reconstruction.py	Sun Aug  3 22:48:32 2008
@@ -0,0 +1,264 @@
+from pypy.conftest import gettestobjspace
+from py.test import skip
+
+class FrameCheck(object):
+
+    def __init__(self, name):
+        self.name = name
+
+    def __eq__(self, frame):
+        return frame.pycode.co_name == self.name
+
+class BytecodeCheck(object):
+
+    def __init__(self, code, op, arg):
+        self.code = code
+        self.op = chr(op)+chr(arg & 0xff) + chr(arg >> 8 & 0xff)
+
+    def __eq__(self, pos):
+        return self.code[pos-3:pos] == self.op
+
+class BaseTestReconstructFrameChain(object):
+    OPTIONS = {}
+
+    def setup_class(cls):
+        space = gettestobjspace(usemodules=('_stackless',), **cls.OPTIONS)
+        cls.space = space
+
+        from pypy.rlib import rstack
+        cls.old_resume_state_create = rstack.resume_state_create
+
+        def tr(prevstate, label, *args):
+            if prevstate is None:
+                prevstate = []
+            return prevstate+[(label, args)]
+        rstack.resume_state_create = tr
+
+        w_opmap = space.appexec([], """():
+        import opcode
+
+        return opcode.opmap
+        """)
+
+        opmap = space.unwrap(w_opmap)
+        cls.CALL_FUNCTION = opmap['CALL_FUNCTION']
+        cls.CALL_FUNCTION_VAR = opmap['CALL_FUNCTION_VAR']        
+        
+    def teardown_class(cls):
+        from pypy.rlib import rstack
+        rstack.resume_state_create = cls.old_resume_state_create
+
+    def start(self, w_coro):
+        self.i = 0
+        self.frame_to_check = w_coro.frame
+        w_coro.frame = None # avoid exploding in kill > __del__
+
+    def end(self):
+        assert self.i == len(self.frame_to_check)
+
+    def check_entry(self, label, *args):
+        frame = self.frame_to_check
+        assert frame[self.i] == (label, args)
+        self.i += 1
+
+        
+    def test_two_frames_simple(self):
+        space = self.space
+
+        w_res = space.appexec([], """():
+        import _stackless as stackless
+        import pickle
+
+        main = stackless.coroutine.getcurrent()
+        d = {'main': main}        
+
+        exec \"\"\"
+def f():
+    g(1)
+
+def g(x):
+    main.switch()
+\"\"\" in d
+        f = d['f']
+        g = d['g']
+
+        co = stackless.coroutine()
+        co.bind(f)
+        co.switch()
+
+        s = pickle.dumps(co)
+        co = pickle.loads(s)
+
+        return co, f, g
+        """)
+
+        w_co, w_f, w_g = space.unpacktuple(w_res)
+
+        ec = space.getexecutioncontext()
+        fcode = w_f.code.co_code
+        gcode = w_g.code.co_code        
+
+        self.start(w_co)
+        e = self.check_entry
+        e('yield_current_frame_to_caller_1')
+        e('coroutine__bind', w_co.costate)
+        e('appthunk', w_co.costate)
+        # f
+        e('execute_frame', FrameCheck('f'), ec)
+        e('dispatch', FrameCheck('f'), fcode, ec)
+        e('handle_bytecode', FrameCheck('f'), fcode, ec)
+        e('dispatch_call', FrameCheck('f'), fcode,
+          BytecodeCheck(fcode, self.CALL_FUNCTION, 1), ec)
+        e('CALL_FUNCTION', FrameCheck('f'), 1)
+        # g
+        e('execute_frame', FrameCheck('g'), ec)
+        e('dispatch', FrameCheck('g'), gcode, ec)
+        e('handle_bytecode', FrameCheck('g'), gcode, ec)
+        e('dispatch_call', FrameCheck('g'), gcode,
+          BytecodeCheck(gcode, self.CALL_FUNCTION, 0), ec)
+        e('CALL_FUNCTION', FrameCheck('g'), 0)
+        e('w_switch', w_co.costate, space)
+        e('coroutine_switch', w_co.costate)
+        self.end()
+
+    def test_two_frames_stararg(self):
+        space = self.space
+
+        w_res = space.appexec([], """():
+        import _stackless as stackless
+        import pickle
+        
+        main = stackless.coroutine.getcurrent()
+        d = {'main': main}        
+
+        exec \"\"\"        
+def f():
+    g(4, 3, d=2, *(1,))
+
+def g(a, b, c, d):
+    main.switch()
+\"\"\" in d
+        f = d['f']
+        g = d['g']    
+
+        co = stackless.coroutine()
+        co.bind(f)
+        co.switch()
+
+        s = pickle.dumps(co)
+        co = pickle.loads(s)
+
+        return co, f, g
+        """)
+
+        w_co, w_f, w_g = space.unpacktuple(w_res)
+
+        ec = space.getexecutioncontext()
+        fcode = w_f.code.co_code
+        gcode = w_g.code.co_code        
+
+        self.start(w_co)
+        e = self.check_entry
+        e('yield_current_frame_to_caller_1')
+        e('coroutine__bind', w_co.costate)
+        e('appthunk', w_co.costate)
+        # f
+        e('execute_frame', FrameCheck('f'), ec)
+        e('dispatch', FrameCheck('f'), fcode, ec)
+        e('handle_bytecode', FrameCheck('f'), fcode, ec)
+        e('dispatch_call', FrameCheck('f'), fcode,
+          BytecodeCheck(fcode, self.CALL_FUNCTION_VAR, 2+(1<<8)), ec)
+        e('call_function', FrameCheck('f'))
+        # g
+        e('execute_frame', FrameCheck('g'), ec)
+        e('dispatch', FrameCheck('g'), gcode, ec)
+        e('handle_bytecode', FrameCheck('g'), gcode, ec)
+        e('dispatch_call', FrameCheck('g'), gcode,
+          BytecodeCheck(gcode, self.CALL_FUNCTION, 0), ec)
+        e('CALL_FUNCTION', FrameCheck('g'), 0)
+        e('w_switch', w_co.costate, space)
+        e('coroutine_switch', w_co.costate)
+        self.end()        
+    
+    def test_two_frames_method(self):
+        space = self.space
+
+        w_res = space.appexec([], """():
+        import _stackless as stackless
+        import pickle
+        import new, sys
+
+        mod = new.module('mod')
+        sys.modules['mod'] = mod
+        
+        main = stackless.coroutine.getcurrent()
+        d = {'main': main}        
+
+        exec \"\"\"                
+def f():
+    a = A()
+    a.m(1)
+
+def g(_, x):
+    main.switch()
+
+class A(object):
+    m = g
+\"\"\" in d
+        f = d['f']
+        g = d['g']
+        A = d['A']
+
+        # to make pickling work
+        mod.A = A
+        A.__module__ = 'mod'
+
+        co = stackless.coroutine()
+        co.bind(f)
+        co.switch()
+
+        s = pickle.dumps(co)
+        co = pickle.loads(s)
+
+        return co, f, g
+        """)
+
+        w_co, w_f, w_g = space.unpacktuple(w_res)
+
+        ec = space.getexecutioncontext()
+        fcode = w_f.code.co_code
+        gcode = w_g.code.co_code        
+
+        self.start(w_co)
+        e = self.check_entry
+        e('yield_current_frame_to_caller_1')
+        e('coroutine__bind', w_co.costate)
+        e('appthunk', w_co.costate)
+        # f
+        e('execute_frame', FrameCheck('f'), ec)
+        e('dispatch', FrameCheck('f'), fcode, ec)
+        e('handle_bytecode', FrameCheck('f'), fcode, ec)
+        e('dispatch_call', FrameCheck('f'), fcode,
+          BytecodeCheck(fcode, self.CALL_FUNCTION, 1), ec)
+        e('CALL_FUNCTION', FrameCheck('f'), 1)
+        # g
+        e('execute_frame', FrameCheck('g'), ec)
+        e('dispatch', FrameCheck('g'), gcode, ec)
+        e('handle_bytecode', FrameCheck('g'), gcode, ec)
+        e('dispatch_call', FrameCheck('g'), gcode,
+          BytecodeCheck(gcode, self.CALL_FUNCTION, 0), ec)
+        e('CALL_FUNCTION', FrameCheck('g'), 0)
+        e('w_switch', w_co.costate, space)
+        e('coroutine_switch', w_co.costate)
+        self.end()
+
+class TestReconstructFrameChain(BaseTestReconstructFrameChain):
+    pass
+
+class TestReconstructFrameChain_CALL_METHOD(BaseTestReconstructFrameChain):
+    OPTIONS = {"objspace.opcodes.CALL_METHOD": True}
+
+    def setup_class(cls):
+        skip("this needs special casing in Function reduce for BuiltinCodes like in Method as first thing")
+
+                



More information about the Pypy-commit mailing list