aspect-oriented demo using metaclasses
Mark McEahern
marklists at mceahern.com
Sun Jun 30 13:00:32 EDT 2002
Here's the latest version of my attempt to implement AOP in Python with
metaclasses:
This version fixes the problem where the wrapped method's self variable was
pointing to the field descriptor instance rather than the class instance.
I've also separated the functionality into three files:
aspect.py - the framework and the Observer interface definition
trace.py - the simple trace Observer
test.py - not yet a PyUnit test, just a demo for now
***
# aspect.py
#! /usr/bin/env python
__doc__ = \
"""
TODO:
[x] Separate into appropriate modules.
[ ] Should the framework raise errors?
[ ] Add unit tests.
[ ] Define scenarios for join points, point cutting, etc.:
Register for all events, all classes, all methods.
Register for some events, some classes, some methods. Etc.
What's most common?
[ ] Explicitly specify default metaclass for framework classes. Fixed by
separating aspect into separate module?
[ ] The data in the trace file is not valid xml because there is no
top-level
document. That may be ok, but document it.
"""
class VirtualClassError(Exception):pass
class Observer:
"""Interface for observers of Aspected classes."""
def __init__(self, *args, **kwargs):
if self.__class__ is Observer:
raise VirtualClassError(self.__class__.__name__)
def on_before(self, class_name, method_name, *args, **kwargs):
pass
def on_after(self, class_name, method_name, *args, **kwargs):
pass
def on_error(self, class_name, method_name, *args, **kwargs):
pass
def make_event(event_name, method_name):
def event(self, *args, **kwargs):
class_name = self.__class__.__name__
for o in self.observers[event_name]:
e = getattr(o, event_name)
e(class_name, method_name, *args, **kwargs)
return event
def make_wrapped_method(cls, original_name, method):
# TODO: Refactor.
on_before_event = make_event("on_before", original_name)
on_after_event = make_event("on_after", original_name)
on_error_event = make_event("on_error", original_name)
on_before_event_name = "on_before_%s" % original_name
on_error_event_name = "on_error_%s" % original_name
on_after_event_name = "on_after_%s" % original_name
setattr(cls, on_before_event_name, on_before_event)
setattr(cls, on_error_event_name, on_error_event)
setattr(cls, on_after_event_name, on_after_event)
on_before_event = getattr(cls, on_before_event_name)
on_error_event = getattr(cls, on_error_event_name)
on_after_event = getattr(cls, on_after_event_name)
def wrapped(self, *args, **kwargs):
on_before_event(self, *args, **kwargs)
try:
ret = method(self, *args, **kwargs)
except:
on_error_event(self, *args, **kwargs)
raise
on_after_event(self, *args, **kwargs)
return ret
return wrapped
class Aspected(type):
def __init__(cls, name, bases, dict):
super(Aspected, cls).__init__(name, bases, dict)
# Skip "private" methods for now.
skip_prefix = "__"
observers_dict = {'on_before': [],
'on_error': [],
'on_after': []}
setattr(cls, "observers", observers_dict)
for k, v in dict.items():
print type(v)
if not k.startswith(skip_prefix):
# TODO: This test will skip staticmethods and classmethods.
if type(v) == type(lambda x: x):
new_name = "__%s" % k
setattr(cls, new_name, v)
new_method = getattr(cls, new_name)
setattr(cls, k, make_wrapped_method(cls, k, new_method))
***
***
# trace.py
# --------------------------------------------------------------------------
---
# Standard library imports
# --------------------------------------------------------------------------
---
import sys
# --------------------------------------------------------------------------
---
# Third party imports
# --------------------------------------------------------------------------
---
import mx.DateTime
# --------------------------------------------------------------------------
---
# Sibling imports
# --------------------------------------------------------------------------
---
import aspect
# --------------------------------------------------------------------------
---
# Classes
# --------------------------------------------------------------------------
---
class Trace(aspect.Observer):
def __init__(self, filename=None):
self.filename = filename
def write(self, prefix, data, class_name, method_name, *args, **kwargs):
# TODO: Wrap import mx in try:except ImportError and use time?
now = mx.DateTime.now()
if not data:
data = ""
s = """<event name='%(prefix)s' datetime='%(now)s'>
<call>%(class_name)s.%(method_name)s(%(args)s, %(kwargs)s)</call>
<data>%(data)s</data>
</event>
""" % locals()
if not self.filename:
f = sys.stdout
else:
f = file(self.filename, "a")
f.write(s)
if self.filename:
f.close()
def on_before(self, class_name, method_name, *args, **kwargs):
prefix = "on_before"
data = None
self.write(prefix, data, class_name, method_name, *args, **kwargs)
def on_after(self, class_name, method_name, *args, **kwargs):
prefix = "on_after"
data = None
self.write(prefix, data, class_name, method_name, *args, **kwargs)
def on_error(self, class_name, method_name, *args, **kwargs):
prefix = "on_error"
data = get_traceback()
self.write(prefix, data, class_name, method_name, *args, **kwargs)
# --------------------------------------------------------------------------
---
# Helper methods
# --------------------------------------------------------------------------
---
def get_traceback():
"""get_traceback() --> string
Return the traceback exception information as a string.
"""
import traceback
from StringIO import StringIO
f = StringIO()
traceback.print_exc(limit=None, file=f)
s = f.getvalue()
return s
***
***
# test.py
#! /usr/bin/env python
# --------------------------------------------------------------------------
---
# Sibling imports
# --------------------------------------------------------------------------
---
import aspect
import trace
__metaclass__ = aspect.Aspected
# --------------------------------------------------------------------------
---
# Classes
# --------------------------------------------------------------------------
---
class MyClass:
def bar(self, *args, **kwargs):
print "real bar(%s, %s)" % (args, kwargs)
def raise_error(self, *args, **kwargs):
print "real raise_error(%s, %s)" % (args, kwargs)
1/0
def return_something(self, *args, **kwargs):
print "real return_something(%s, %s)" % (args, kwargs)
return self.something
def static_method(*args, **kwargs):
print "this is a staticmethod"
def class_method(cls, *args, **kwargs):
print "this is a classmethod"
print cls.__name__
static_method = staticmethod(static_method)
class_method = classmethod(class_method)
# --------------------------------------------------------------------------
---
# Demo
# --------------------------------------------------------------------------
---
def main():
filename = "junk.txt"
file_trace = trace.Trace(filename)
stdout_trace = trace.Trace()
MyClass.observers["on_before"] += [file_trace, stdout_trace]
MyClass.observers["on_after"] += [file_trace, stdout_trace]
MyClass.observers["on_error"] += [file_trace, stdout_trace]
# Exercise the class to test the Aspected framework.
f = MyClass()
f.bar("a", "b", foo="c")
try:
f.raise_error("a", "b", foo="c")
except Exception, e:
print "An error occurred in main(): %s" % e
f.something = "foo"
x = f.return_something(1, 2, 3)
print x
MyClass.static_method()
MyClass.class_method()
if __name__ == "__main__":
main()
***
-
More information about the Python-list
mailing list