[pypy-svn] r40589 - in pypy/dist/pypy: interpreter interpreter/astcompiler lib lib/app_test module/recparser/hooksamples
afayolle at codespeak.net
afayolle at codespeak.net
Fri Mar 16 17:39:43 CET 2007
Author: afayolle
Date: Fri Mar 16 17:39:40 2007
New Revision: 40589
Modified:
pypy/dist/pypy/interpreter/astcompiler/ast.py
pypy/dist/pypy/interpreter/astcompiler/astgen.py
pypy/dist/pypy/interpreter/pycompiler.py
pypy/dist/pypy/lib/aop.py
pypy/dist/pypy/lib/app_test/sample_aop_code.py
pypy/dist/pypy/lib/app_test/test_aop.py
pypy/dist/pypy/module/recparser/hooksamples/constchanger.py
pypy/dist/pypy/module/recparser/hooksamples/tracer.py
Log:
Changed compile_hook prototype to provide a filename
Updated the hooksamples to the new API, and let the use the ASTMutator base clasChanged point cut definition syntax to allow specifying a module and a classname (classname filtering not implemented yet, module filetering is available)
Use logging in aop.py to ease debugging
Heavy refactoring of aop.py
Modified: pypy/dist/pypy/interpreter/astcompiler/ast.py
==============================================================================
--- pypy/dist/pypy/interpreter/astcompiler/ast.py (original)
+++ pypy/dist/pypy/interpreter/astcompiler/ast.py Fri Mar 16 17:39:40 2007
@@ -86,6 +86,7 @@
Node.typedef = TypeDef('ASTNode',
__new__ = interp2app(descr_Node_new, unwrap_spec=[ObjSpace, W_Root, int]),
#__repr__ = interp2app(Node.descr_repr, unwrap_spec=['self', ObjSpace] ),
+ __repr__ = interp2app(Node.descr_repr, unwrap_spec=['self', ObjSpace] ),
getChildNodes = interp2app(Node.descr_getChildNodes, unwrap_spec=[ 'self', ObjSpace ] ),
accept = interp2app(descr_node_accept, unwrap_spec=[ ObjSpace, W_Root, W_Root ] ),
mutate = interp2app(descr_node_mutate, unwrap_spec=[ ObjSpace, W_Root, W_Root ] ),
Modified: pypy/dist/pypy/interpreter/astcompiler/astgen.py
==============================================================================
--- pypy/dist/pypy/interpreter/astcompiler/astgen.py (original)
+++ pypy/dist/pypy/interpreter/astcompiler/astgen.py Fri Mar 16 17:39:40 2007
@@ -734,6 +734,7 @@
Node.typedef = TypeDef('ASTNode',
__new__ = interp2app(descr_Node_new, unwrap_spec=[ObjSpace, W_Root, int]),
#__repr__ = interp2app(Node.descr_repr, unwrap_spec=['self', ObjSpace] ),
+ __repr__ = interp2app(Node.descr_repr, unwrap_spec=['self', ObjSpace] ),
getChildNodes = interp2app(Node.descr_getChildNodes, unwrap_spec=[ 'self', ObjSpace ] ),
accept = interp2app(descr_node_accept, unwrap_spec=[ ObjSpace, W_Root, W_Root ] ),
mutate = interp2app(descr_node_mutate, unwrap_spec=[ ObjSpace, W_Root, W_Root ] ),
Modified: pypy/dist/pypy/interpreter/pycompiler.py
==============================================================================
--- pypy/dist/pypy/interpreter/pycompiler.py (original)
+++ pypy/dist/pypy/interpreter/pycompiler.py Fri Mar 16 17:39:40 2007
@@ -235,7 +235,8 @@
try:
w_ast_tree = space.call_function(self.w_compile_hook,
space.wrap(ast_tree),
- space.wrap(encoding))
+ space.wrap(encoding),
+ space.wrap(filename))
ast_tree = space.interp_w(Node, w_ast_tree)
except OperationError:
self.w_compile_hook = space.w_None
Modified: pypy/dist/pypy/lib/aop.py
==============================================================================
--- pypy/dist/pypy/lib/aop.py (original)
+++ pypy/dist/pypy/lib/aop.py Fri Mar 16 17:39:40 2007
@@ -7,68 +7,100 @@
# API
###########################
import parser
+import re
+import sys
+import os
+import os.path as osp
+from logging import error, debug, warning, info
+
+import logging
+logging.basicConfig(level=logging.DEBUG,
+ format='%(asctime)s %(levelname)-8s: %(message)s',
+ datefmt='%H:%M:%S')
+
+
+
+def log_exc(func):
+ """Logs entering the function at debug level.
+ Logs any exception during function execution at error level"""
+
+ def wrapped(*args, **kwargs):
+ try:
+ return func(*args, **kwargs)
+ except Exception, exc:
+ error('Unhandled exception in %s', func.func_name)
+ error('Exception %s: %s', exc.__class__.__name__, exc)#, exc_info=True)
+ raise
+ wrapped.__doc__ == func.__doc__
+ return wrapped
# advices
# -------
-class Advice(parser.ASTVisitor):
+class Advice(parser.ASTMutator):
requires_dynamic_pointcut=True
def __init__(self, pointcut):
if self.requires_dynamic_pointcut != pointcut.isdynamic:
raise TypeError('Expecting a static pointcut')
self.pointcut = pointcut
- dispatch = {ExecutionPointCut: self.weave_at_execution_pointcut,
- CallPointCut: self.weave_at_call_pointcut,
- InitializationPointCut: self.weave_at_initialization_pointcut,
- DestructionPointCut: self.weave_at_destruction_pointcut,
- PointCut: self.weave_at_static_pointcut,
+ dispatch = {ExecutionPointCut: self.weave_at_execution,
+ CallPointCut: self.weave_at_call,
+ InitializationPointCut: self.weave_at_initialization,
+ DestructionPointCut: self.weave_at_destruction,
+ PointCut: self.weave_at_static,
}
- self.weave_at_pointcut = dispatch[pointcut.__class__]
+ self.weave_at = dispatch[pointcut.__class__]
def __call__(self, function):
- print 'wrapping advice %s on %s' % (self.pointcut, function.__name__)
+ debug('wrapping advice %s on %s', self.pointcut, function.__name__)
self.woven_code = function
return self
- def weave(self, ast, enc):
- return ast.mutate(self)
+ def weave(self, ast, enc, modulename):
+ debug("modulename = %s", modulename)
+ self.curr_encoding = enc
+ if self.pointcut.match_module(modulename):
+ return ast.mutate(self)
+ else:
+ return ast
def default(self, node):
if self.pointcut.match(node):
- node = self.weave_at_pointcut(node,
+ node = self.weave_at(node,
self.pointcut.joinpoint(node))
return node
## def visitClass(self, node):
## if self.pointcut.match(node):
-## print "found match", node.name
+## print("found match", node.name)
## return node
- def weave_at_execution_pointcut(self, node, tjp):
+ def weave_at_execution(self, node, tjp):
raise NotImplementedError("abstract method")
- def weave_at_call_pointcut(self, node, tjp):
+ def weave_at_call(self, node, tjp):
raise NotImplementedError("abstract method")
- def weave_at_initialization_pointcut(self, node, tjp):
+ def weave_at_initialization(self, node, tjp):
raise NotImplementedError("abstract method")
- def weave_at_destruction_pointcut(self, node, tjp):
+ def weave_at_destruction(self, node, tjp):
raise NotImplementedError("abstract method")
- def weave_at_static_pointcut(self, node, tjp):
+ def weave_at_static(self, node, tjp):
raise NotImplementedError("abstract method")
class around(Advice):
"""specify code to be run instead of the pointcut"""
- def weave_at_execution_pointcut(self, node, tjp):
+ @log_exc
+ def weave_at_execution(self, node, tjp):
"""weaving around a function execution moves the body of the
function to an inner function called
__aoptarget_<funcname>_<id>, and generate the following code:
return __aop__(id, __aoptarget_<funcname>_<id>)
"""
- print"WEAVE around!!!"
+ debug("WEAVE around execution")
p = parser
id = __aop__.register_joinpoint(self.woven_code, tjp)
statement = node.code
@@ -89,32 +121,36 @@
node.code = newcode
return node
- def weave_at_call_pointcut(self, node, tjp):
+ def weave_at_call(self, node, tjp):
+ debug("WEAVE around execution")
raise NotImplementedError("abstract method")
- def weave_at_initialization_pointcut(self, node, tjp):
+ def weave_at_initialization(self, node, tjp):
raise NotImplementedError("abstract method")
- def weave_at_destruction_pointcut(self, node, tjp):
+ def weave_at_destruction(self, node, tjp):
raise NotImplementedError("abstract method")
class before(Advice):
"""specify code to be run before the pointcut"""
- def weave_at_execution_pointcut(self, node, tjp):
+ @log_exc
+ def weave_at_execution(self, node, tjp):
"""weaving before execution inserts a call to __aop__(id) at
the beginning of the wrapped function definition"""
- print "WEAVE before!!!"
+ debug("WEAVE before execution")
id = __aop__.register_joinpoint(self.woven_code, tjp)
statement_list = node.code.nodes
statement_list.insert(0, make_aop_call(id))
node.code.nodes = statement_list
return node
- def weave_at_call_pointcut(self, node, tjp):
+ @log_exc
+ def weave_at_call(self, node, tjp):
"""weaving before call replaces a call to foo(bar) with the
following code:
(lambda *args,**kwargs: (__aop__(id), foo(*args,**kwargs)))(bar)[1]
"""
+ debug("WEAVE before call")
id = __aop__.register_joinpoint(self.woven_code, tjp)
p = parser
lambda_ret = p.ASTTuple((make_aop_call(id).expr, # we don't want the ASTDiscard
@@ -135,70 +171,56 @@
newnode = p.ASTSubscript(call,
p.OP_APPLY,
p.ASTConst(1))
- print `newnode`
+ debug('%r', newnode)
return newnode
- def weave_at_initialization_pointcut(self, node, tjp):
+ def weave_at_initialization(self, node, tjp):
raise NotImplementedError("abstract method")
- def weave_at_destruction_pointcut(self, node, tjp):
+ def weave_at_destruction(self, node, tjp):
raise NotImplementedError("abstract method")
class after(Advice):
"""specify code to be run after the pointcut"""
- def weave_at_execution_pointcut(self, node, tjp):
+ @log_exc
+ def weave_at_execution(self, node, tjp):
"""weaving after execution wraps the code of the function in a
try...finally block, and calls __aop__(id) in the finally
block"""
- print "WEAVE after!!!"
+ debug("WEAVE after execution")
id = __aop__.register_joinpoint(self.woven_code, tjp)
statement = node.code
tryfinally = parser.ASTTryFinally(statement, make_aop_call(id))
node.code = tryfinally
return node
- def weave_at_call_pointcut(self, node, tjp):
+ @log_exc
+ def weave_at_call(self, node, tjp):
"""weaving before call replaces a call to foo(bar) with the
following code:
- (lambda *args,**kwargs: (foo(*args,**kwargs), __aop__(id)))(bar)[0]
+ __aop__(id, result=foo(bar))
"""
+ debug("WEAVE after call")
id = __aop__.register_joinpoint(self.woven_code, tjp)
p = parser
- lambda_ret = p.ASTTuple((p.ASTCallFunc(node.node,
- [],
- p.ASTName('args'),
- p.ASTName('kwargs')),
- make_aop_call(id).expr, # we don't want the ASTDiscard
- )
- )
- lambda_func = p.ASTLambda([p.ASTAssName('args', 0), p.ASTAssName('kwargs', 0)],
- [], # defaults
- p.CO_VARARGS | p.CO_VARKEYWORDS,
- lambda_ret
- )
- call = p.ASTCallFunc(lambda_func,
- node.args,
- node.star_args,
- node.dstar_args)
- newnode = p.ASTSubscript(call,
- p.OP_APPLY,
- p.ASTConst(0))
- print `newnode`
+ debug('old node: %s', node)
+ newnode = make_aop_call(id, resultcallfuncnode=node).expr # we don't want the ASTDiscard
+ debug('newnode: %s', newnode)
return newnode
- def weave_at_initialization_pointcut(self, node, tjp):
+ def weave_at_initialization(self, node, tjp):
raise NotImplementedError("abstract method")
- def weave_at_destruction_pointcut(self, node, tjp):
+ def weave_at_destruction(self, node, tjp):
raise NotImplementedError("abstract method")
class introduce(Advice):
"""insert new code in the pointcut
this is the only advice available on static point cuts"""
requires_dynamic_pointcut=False
- def weave_at_pointcut(self, node, tjp):
- print "WEAVE introduce!!!"
+ def weave_at_static(self, node, tjp):
+ debug("WEAVE introduce!!!")
pass # XXX WRITEME
return node
@@ -261,69 +283,110 @@
# maybe not managed as a real collection.
# API for use in declarative code
- def __init__(self, pointcut):
- """if pointcut is a string:
- * matches a substring without . or ,
- ** matches any substring
- pointcut looks like object_name_or_pattern[([argname_or_pattern[, ...]])]
- the pointcut is static
- else, pointcut must be a pointcut instance"""
- if type(pointcut) == str:
- self.pointcutdef = pointcut
+ def __init__(self, module=".*", klass=".*", func=".*", pointcut=None):
+ """If pointcut is not None, it is assumed to be a PointCut
+ instance and will be used to get the filters. Otherwise, the
+ module, klass and func arguments are used as regular
+ expressions which will be used against the modulename,
+ classname and function/method name for the pointcut
+
+ The created point cut is static.
+
+ The pointcut argument can also be a pointcut instance"""
+ info('%r %r %r %s', module, klass, func, pointcut)
+ if pointcut is None:
+ self.func_re = re.compile(func)
+ self.module_re = re.compile(module)
+ self.class_re = re.compile(klass)
elif isinstance(pointcut, PointCut):
- self.pointcutdef = pointcut.pointcutdef # XXX FIXME
+ self.func_re = pointcut.func_re
+ self.module_re = pointcut.module_re
+ self.class_re = pointcut.class_re
else:
raise TypeError(type(pointcut))
self.isdynamic = False
def __and__(self, other):
"""return: new pointcut, intersection of the join points in the self and other"""
+ if other.__class__ != self.__class__:
+ raise TypeError(other.__class__.__name__)
pass
def __or__(self, other):
"""return: new pointcut, union of the join points in the self and other"""
+ if other.__class__ != self.__class__:
+ raise TypeError(other.__class__.__name__)
pass
- def __not__(self):
- """return: new pointcut, exclusion of the join points in self"""
- pass
-
+
+## def __not__(self):
+## """return: new pointcut, exclusion of the join points in self"""
+## pass
+
+ # Dynamic pointcut creation
def call(self):
"""return a dynamic pointcut representing places where the pointcut is called"""
- return CallPointCut(self)
+ return CallPointCut(pointcut=self)
def execution(self):
"""return a dynamic pointcut representing places where the pointcut is executed"""
- return ExecutionPointCut(self)
+ return ExecutionPointCut(pointcut=self)
def initialization(self):
"""return a dynamic pointcut representing places where the pointcut is instantiated"""
- return InitializationPointCut(self)
+ assert self.func_re.pattern == '.*'
+ return InitializationPointCut(pointcut=self)
def destruction(self):
"""return a dynamic pointcut representing places where the pointcut is destroyed"""
- return DestructionPointCut(self)
-
+ assert self.func_re.pattern == '.*'
+ return DestructionPointCut(pointcut=self)
+
+ # API for use during the Weaving process
+ def match_module(self, modulename):
+ return self.module_re.match(modulename)
+
def match(self, astnode):
- raise NotImplementedError
-
+ "a static point cut only matches classes: the function part is not used"
+ assert self.func_re.pattern == '.*'
+ return self.class_re.match(astnode.name)
+
def joinpoint(self, node):
"""returns a join point instance for the node"""
- assert self.match(node)
+# assert self.match(node)
return JoinPoint()
class AbstractDynamicPointCut(PointCut):
def __init__(self, pointcut):
- PointCut.__init__(self, pointcut)
+ PointCut.__init__(self, pointcut=pointcut)
self.isdynamic = True
+ # call, execution, initialization and destruction are disallowed
+ # on dynamic pointcuts
+ def call(self):
+ raise TypeError(self.__class__.__name__)
+
+ def execution(self):
+ raise TypeError(self.__class__.__name__)
+
+ def initialization(self):
+ raise TypeError(self.__class__.__name__)
+
+ def destruction(self):
+ raise TypeError(self.__class__.__name__)
class ExecutionPointCut(AbstractDynamicPointCut):
+ """An execution point cut matches the execution of a function
+ matching func_re, defined within a class matching class_re written
+ in a module matching module_re"""
+
def match(self, astnode):
- return isinstance(astnode, parser.ASTFunction) and astnode.name == self.pointcutdef
+ # FIXME: class_re
+ return isinstance(astnode, parser.ASTFunction) and \
+ self.func_re.match(astnode.name)
def joinpoint(self, node):
"""returns a join point instance for the node"""
- assert self.match(node)
+# assert self.match(node)
jp = JoinPoint()
jp._flags = node.flags
jp._argnames = [a.name for a in node.argnames]
@@ -332,14 +395,33 @@
return jp
class CallPointCut(AbstractDynamicPointCut):
+ """A call point cut matches a call to a function or method
+ matching func_re, within a class matching class_re written in a
+ module matching module_re"""
+
def match(self, node):
- return isinstance(node, parser.ASTCallFunc) and isinstance(node.node, parser.ASTName) and node.node.varname == self.pointcutdef
+ # FIXME: class_re
+ return isinstance(node, parser.ASTCallFunc) and \
+ isinstance(node.node, parser.ASTName) and \
+ self.func_re.match(node.node.varname)
-class DestructionPointCut(AbstractDynamicPointCut):
- pass
-class InitializationPointCut(AbstractDynamicPointCut):
- pass
+### XXX: won't match anything if no __del__ method exists (or only on a parent class)
+class DestructionPointCut(ExecutionPointCut):
+ """A destruction pointcut matches the execution of a __del__
+ method in a class matching class_re in a module matching module_re"""
+ def __init__(self, pointcut):
+ ExecutionPointCut.__init__(self, pointcut=pointcut)
+ self.func_re='^__del__$'
+
+### XXX: won't match anything if no __init__ method exists (or only on a parent class)
+class InitializationPointCut(ExecutionPointCut):
+ """An initialization point cut matches the execution of the
+ __init__ method of a class matching class_re in a module matching
+ module_re"""
+ def __init__(self, pointcut):
+ ExecutionPointCut.__init__(self, pointcut=pointcut)
+ self.func_re='^__init__$'
### make these class methods of PointCut ?
@@ -363,6 +445,10 @@
def args(typepattern):
pass
+class _UndefinedResult:
+ """used to denote that the result of a call to a aspectised
+ function is not known"""
+ pass
class Weaver:
"""The weaver is responsible for weaving the Aspects in the code
@@ -377,13 +463,35 @@
def register_advice(self, aspect, advice):
self.advices.append((aspect, advice))
- def weave(self, ast, enc):
- for aspect, advice in self.advices:
- self._curr_aspect = aspect
- ast = advice.weave(ast, enc)
- self._curr_aspect = None
- return ast
+ def _guessmodule(self, filename):
+ for p in sys.path:
+ cp = osp.commonprefix([p, filename])
+ if osp.isdir(cp):
+ break
+ else:
+ cp = ''
+ guessed = osp.splitext(filename[len(cp):])[0].replace(os.sep, '.')
+ if guessed.startswith('.'):
+ guessed = guessed[1:]
+ if guessed.endswith('.__init__'):
+ guessed = guessed[:-9]
+ return guessed
+
+ def weave(self, ast, enc, filename):
+ if not self.advices:
+ return ast
+ try:
+ info('Weaving on %s %s', filename, sys.path)
+ modulename = self._guessmodule(filename)
+ for aspect, advice in self.advices:
+ self._curr_aspect = aspect
+ ast = advice.weave(ast, enc, modulename)
+ self._curr_aspect = None
+ return ast
+ except Exception, exc:
+ error('%s: %s in weave', exc.__class__.__name__, exc)
+ return ast
def _clear_all(self):
self.advices = []
self.joinpoints = {}
@@ -394,20 +502,24 @@
finally:
self._id += 1
- def register_joinpoint(self, woven_code, joinpoint, *args):
+ def register_joinpoint(self, woven_code, joinpoint, *args): # FIXME: do we need *args ?
assert self._curr_aspect is not None
id = self._next_id()
- print "register joinpoint with id %d" % id
+ info("register joinpoint with id %d", id)
arguments = self._curr_aspect, joinpoint, args
self.joinpoints[id] = woven_code, arguments
return id
- def __call__(self, id, target=None, target_locals = None):
+ def __call__(self, id, target=None, target_locals = None, result=_UndefinedResult):
+ info('call to __aop__(%d)', id)
+ debug('arguments: id=%d, target=%s, target_locals=%s, result=%s', id, target, target_locals, result)
woven_code, (aspect, joinpoint, arguments) = self.joinpoints[id]
joinpoint.func = target
- print 'target_locals', target_locals
+ debug('target_locals = %s', target_locals)
if target_locals is not None:
joinpoint._arguments = (), dict([(n, target_locals[n]) for n in joinpoint._argnames or ()])
+ if result is not _UndefinedResult:
+ joinpoint._result = result
args = (aspect, joinpoint,) + arguments
return woven_code(*args)
@@ -428,13 +540,13 @@
instance = super(Aspect, cls).__call__(*args, **kwargs)
for name, advice in cls.__dict__.iteritems():
if isinstance(advice, Advice):
- print "registering advice %s.%s" % (instance.__class__.__name__, name)
+ info("registering advice %s.%s", instance.__class__.__name__, name)
__aop__.register_advice(instance, advice)
return instance
# helper functions
-def make_aop_call(id, targetname=None, discard=True):
+def make_aop_call(id, targetname=None, discard=True, resultcallfuncnode=None):
"""return an AST for a call to a woven function
id is the integer returned when the advice was stored in the registry"""
p = parser
@@ -443,10 +555,14 @@
arguments.append(p.ASTName(targetname))
else:
arguments.append(p.ASTName('None'))
+
arguments.append(p.ASTCallFunc(p.ASTName('locals'),
[], None, None)
)
+ if resultcallfuncnode is not None:
+ arguments.append(resultcallfuncnode)
+
if discard:
returnclass = p.ASTDiscard
else:
Modified: pypy/dist/pypy/lib/app_test/sample_aop_code.py
==============================================================================
--- pypy/dist/pypy/lib/app_test/sample_aop_code.py (original)
+++ pypy/dist/pypy/lib/app_test/sample_aop_code.py Fri Mar 16 17:39:40 2007
@@ -15,7 +15,15 @@
try:
return foo(b,c)
finally:
- bar()
+ bar(3)
+
+class Mumble:
+ def __init__(self, param):
+ self.p = param
+ def frobble(self):
+ return 3 * self.p
+ def __del__(self):
+ print 'poof'
"""
import os
import os.path as osp
@@ -32,3 +40,4 @@
def clean_module(name):
os.unlink(_make_filename(name))
+
Modified: pypy/dist/pypy/lib/app_test/test_aop.py
==============================================================================
--- pypy/dist/pypy/lib/app_test/test_aop.py (original)
+++ pypy/dist/pypy/lib/app_test/test_aop.py Fri Mar 16 17:39:40 2007
@@ -1,20 +1,18 @@
from pypy.conftest import gettestobjspace
-class AppTestAop(object):
- def setup_class(cls):
- cls.space = gettestobjspace(**{'objspace.usepycfiles':False})
-
+class AppTestAOPGeneral(object):
def test_init(self):
import aop
+class AppTestPointCut(object):
def test_static_dynamic_advice_and_pointcut(self):
from aop import PointCut, introduce, before, around, after
- dyn_pc = PointCut('foo').call()
+ dyn_pc = PointCut(func='foo').call()
assert dyn_pc.isdynamic
- stat_pc = PointCut('bar')
+ stat_pc = PointCut(func='bar')
assert not stat_pc.isdynamic
assert not introduce.requires_dynamic_pointcut
@@ -29,6 +27,44 @@
assert adv is not None
+ def test_pointcut_func(self):
+ from aop import PointCut
+ pc = PointCut(func='foobar')
+ expected = {'func_re': 'foobar',
+ 'class_re': '.*',
+ 'module_re':'.*'}
+ for key, value in expected.items():
+ assert getattr(pc, key).pattern == expected[key]
+
+ def test_pointcut_class_func(self):
+ from aop import PointCut
+ pc = PointCut(klass='^Mumble.*$', func='foobar')
+ expected = {'func_re': 'foobar',
+ 'class_re': '^Mumble.*$',
+ 'module_re':'.*'}
+ for key, value in expected.items():
+ assert getattr(pc, key).pattern == expected[key]
+
+ def test_pointcut_module_class_func(self):
+ from aop import PointCut
+ pc = PointCut(module=r'logilab\..*', klass='^Mumble.*$', func='foobar')
+ expected = {'func_re': 'foobar',
+ 'class_re': '^Mumble.*$',
+ 'module_re':r'logilab\..*'}
+ for key, value in expected.items():
+ assert getattr(pc, key).pattern == expected[key]
+
+ def test_pointcut_match_module(self):
+ from aop import PointCut
+ pc = PointCut(module=r'logilab\..+', klass='^Mumble.*$', func='foobar')
+ assert pc.match_module('logilab.common')
+ assert not pc.match_module('logilab')
+ assert not pc.match_module('common.logilab')
+
+class AppTestWeavingAtExecution(object):
+ def setup_class(cls):
+ cls.space = gettestobjspace(**{'objspace.usepycfiles':False})
+
def test_simple_aspect_before_execution(self):
from aop import PointCut, Aspect, before
from app_test import sample_aop_code
@@ -39,7 +75,7 @@
__metaclass__ = Aspect
def __init__(self):
self.executed = False
- @before(PointCut('foo').execution())
+ @before(PointCut(func='foo').execution())
def advice_before_execution(self, tjp):
self.executed = True
self.argnames = tjp._argnames
@@ -68,7 +104,7 @@
__metaclass__ = Aspect
def __init__(self):
self.executed = 0
- @after(PointCut('foo').execution())
+ @after(PointCut(func='foo').execution())
def advice_after_execution(self, tjp):
self.executed += 1
@@ -93,7 +129,7 @@
def __init__(self):
self.executed_before = 0
self.executed_after = 0
- @around(PointCut('foo').execution())
+ @around(PointCut(func='foo').execution())
def advice_around_execution(self, tjp):
print '>>>in', tjp.arguments()
self.executed_before += 1
@@ -116,6 +152,10 @@
sample_aop_code.clean_module('aop_around_execution')
+class AppTestWeavingAtCall(object):
+ def setup_class(cls):
+ cls.space = gettestobjspace(**{'objspace.usepycfiles':False})
+
def test_simple_aspect_before_call(self):
from aop import PointCut, Aspect, before
from app_test import sample_aop_code
@@ -126,7 +166,7 @@
__metaclass__ = Aspect
def __init__(self):
self.executed = False
- @before(PointCut('bar').call())
+ @before(PointCut(func='bar').call())
def advice_before_call(self, tjp):
print "IN advice before call"
self.executed = True
@@ -155,12 +195,16 @@
__metaclass__ = Aspect
def __init__(self):
self.executed = False
- @after(PointCut('bar').call())
+ self.result = None
+ @after(PointCut(func='bar').call())
def advice_after_call(self, tjp):
print "IN advice after call"
self.executed = True
self.arguments = tjp._arguments
+ self.result = tjp.result()
print "OUT advice after call"
+ print "result", self.result
+ return self.result
assert __aop__.advices == []
aspect = AspectTest()
@@ -168,9 +212,11 @@
assert not aspect.executed
from app_test import aop_after_call
- assert aspect.executed == 0
+ assert not aspect.executed
answ = aop_after_call.foo(1,2)
- assert aspect.executed == 1
+ assert aspect.executed
assert answ == 47
+ assert aspect.result == 42
sample_aop_code.clean_module('aop_after_call')
+
Modified: pypy/dist/pypy/module/recparser/hooksamples/constchanger.py
==============================================================================
--- pypy/dist/pypy/module/recparser/hooksamples/constchanger.py (original)
+++ pypy/dist/pypy/module/recparser/hooksamples/constchanger.py Fri Mar 16 17:39:40 2007
@@ -5,7 +5,7 @@
node.value = 2
return node
-def threebecomestwo(ast, enc):
+def threebecomestwo(ast, enc, filename):
ast.mutate(ConstMutator())
return ast
Modified: pypy/dist/pypy/module/recparser/hooksamples/tracer.py
==============================================================================
--- pypy/dist/pypy/module/recparser/hooksamples/tracer.py (original)
+++ pypy/dist/pypy/module/recparser/hooksamples/tracer.py Fri Mar 16 17:39:40 2007
@@ -5,7 +5,7 @@
XXX: crashes on everything else than simple assignment (AssAttr, etc.)
"""
-from parser import ASTPrintnl, ASTConst, ASTName, ASTAssign
+from parser import ASTPrintnl, ASTConst, ASTName, ASTAssign, ASTMutator
from parser import install_compiler_hook, source2ast
BEFORE_LOG_SOURCE = """if '%s' in locals() or '%s' in globals():
@@ -17,18 +17,7 @@
module = source2ast(source)
return module.node.nodes
-class Tracer:
- def visitModule(self, module):
- module.node = module.node.accept(self)
- return module
-
- def default(self, node):
- for child in node.getChildNodes():
- # let's cheat a bit
- child.parent = node
- child.accept(self)
- return node
-
+class Tracer(ASTMutator):
def visitAssName(self, assname):
assign = assname
while not isinstance(assign, ASTAssign):
@@ -40,14 +29,17 @@
stmt.insert_before(assign, before_stmts)
stmt.insert_after(assign, after_stmts)
return assname
-
- def __getattr__(self, attrname):
- if attrname.startswith('visit'):
- return self.default
- raise AttributeError('No such attribute: %s' % attrname)
-def _trace(ast, enc):
+def _trace(ast, enc, filename):
return ast.accept(Tracer())
install_compiler_hook(_trace)
+
+
+code = """
+a = 3
+b = 2
+a = 1
+"""
+exec code
More information about the Pypy-commit
mailing list