aspect-oriented demo using metaclasses

Mark McEahern marklists at mceahern.com
Sun Jun 30 13:00:32 EDT 2002


Here's the latest version of my attempt to implement AOP in Python with
metaclasses:

This version fixes the problem where the wrapped method's self variable was
pointing to the field descriptor instance rather than the class instance.
I've also separated the functionality into three files:

  aspect.py - the framework and the Observer interface definition
  trace.py - the simple trace Observer
  test.py - not yet a PyUnit test, just a demo for now

***
# aspect.py
#! /usr/bin/env python

__doc__ = \
"""
TODO:

  [x] Separate into appropriate modules.
  [ ] Should the framework raise errors?
  [ ] Add unit tests.
  [ ] Define scenarios for join points, point cutting, etc.:
        Register for all events, all classes, all methods.
        Register for some events, some classes, some methods.  Etc.
        What's most common?
  [ ] Explicitly specify default metaclass for framework classes.  Fixed by
      separating aspect into separate module?
  [ ] The data in the trace file is not valid xml because there is no
top-level
      document.  That may be ok, but document it.

"""

class VirtualClassError(Exception):pass

class Observer:
    """Interface for observers of Aspected classes."""

    def __init__(self, *args, **kwargs):
        if self.__class__ is Observer:
            raise VirtualClassError(self.__class__.__name__)

    def on_before(self, class_name, method_name, *args, **kwargs):
        pass

    def on_after(self, class_name, method_name, *args, **kwargs):
        pass

    def on_error(self, class_name, method_name, *args, **kwargs):
        pass

def make_event(event_name, method_name):
    def event(self, *args, **kwargs):
        class_name = self.__class__.__name__
        for o in self.observers[event_name]:
            e = getattr(o, event_name)
            e(class_name, method_name, *args, **kwargs)
    return event

def make_wrapped_method(cls, original_name, method):
    # TODO: Refactor.
    on_before_event = make_event("on_before", original_name)
    on_after_event = make_event("on_after", original_name)
    on_error_event = make_event("on_error", original_name)
    on_before_event_name = "on_before_%s" % original_name
    on_error_event_name = "on_error_%s" % original_name
    on_after_event_name = "on_after_%s" % original_name

    setattr(cls, on_before_event_name, on_before_event)
    setattr(cls, on_error_event_name, on_error_event)
    setattr(cls, on_after_event_name, on_after_event)

    on_before_event = getattr(cls, on_before_event_name)
    on_error_event = getattr(cls, on_error_event_name)
    on_after_event = getattr(cls, on_after_event_name)

    def wrapped(self, *args, **kwargs):
        on_before_event(self, *args, **kwargs)
        try:
            ret = method(self, *args, **kwargs)
        except:
            on_error_event(self, *args, **kwargs)
            raise
        on_after_event(self, *args, **kwargs)
        return ret
    return wrapped

class Aspected(type):

    def __init__(cls, name, bases, dict):
        super(Aspected, cls).__init__(name, bases, dict)
        # Skip "private" methods for now.
        skip_prefix = "__"
        observers_dict = {'on_before': [],
                          'on_error': [],
                          'on_after': []}
        setattr(cls, "observers", observers_dict)
        for k, v in dict.items():
            print type(v)
            if not k.startswith(skip_prefix):
                # TODO: This test will skip staticmethods and classmethods.
                if type(v) == type(lambda x: x):
                    new_name = "__%s" % k
                    setattr(cls, new_name, v)
                    new_method = getattr(cls, new_name)
                    setattr(cls, k, make_wrapped_method(cls, k, new_method))

***

***
# trace.py
# --------------------------------------------------------------------------
---
# Standard library imports
# --------------------------------------------------------------------------
---
import sys

# --------------------------------------------------------------------------
---
# Third party imports
# --------------------------------------------------------------------------
---
import mx.DateTime

# --------------------------------------------------------------------------
---
# Sibling imports
# --------------------------------------------------------------------------
---
import aspect

# --------------------------------------------------------------------------
---
# Classes
# --------------------------------------------------------------------------
---
class Trace(aspect.Observer):

    def __init__(self, filename=None):
        self.filename = filename

    def write(self, prefix, data, class_name, method_name, *args, **kwargs):
        # TODO: Wrap import mx in try:except ImportError and use time?
        now = mx.DateTime.now()
        if not data:
            data = ""
        s = """<event name='%(prefix)s' datetime='%(now)s'>
<call>%(class_name)s.%(method_name)s(%(args)s, %(kwargs)s)</call>
<data>%(data)s</data>
</event>
""" % locals()
        if not self.filename:
            f = sys.stdout
        else:
            f = file(self.filename, "a")
        f.write(s)
        if self.filename:
            f.close()

    def on_before(self, class_name, method_name, *args, **kwargs):
        prefix = "on_before"
        data = None
        self.write(prefix, data, class_name, method_name, *args, **kwargs)

    def on_after(self, class_name, method_name, *args, **kwargs):
        prefix = "on_after"
        data = None
        self.write(prefix, data, class_name, method_name, *args, **kwargs)

    def on_error(self, class_name, method_name, *args, **kwargs):
        prefix = "on_error"
        data = get_traceback()
        self.write(prefix, data, class_name, method_name, *args, **kwargs)

# --------------------------------------------------------------------------
---
# Helper methods
# --------------------------------------------------------------------------
---
def get_traceback():
    """get_traceback() --> string

    Return the traceback exception information as a string.
    """
    import traceback
    from StringIO import StringIO

    f = StringIO()
    traceback.print_exc(limit=None, file=f)
    s = f.getvalue()
    return s
***

***
# test.py
#! /usr/bin/env python

# --------------------------------------------------------------------------
---
# Sibling imports
# --------------------------------------------------------------------------
---
import aspect
import trace

__metaclass__ = aspect.Aspected

# --------------------------------------------------------------------------
---
# Classes
# --------------------------------------------------------------------------
---
class MyClass:

    def bar(self, *args, **kwargs):
        print "real bar(%s, %s)" % (args, kwargs)

    def raise_error(self, *args, **kwargs):
        print "real raise_error(%s, %s)" % (args, kwargs)
        1/0

    def return_something(self, *args, **kwargs):
        print "real return_something(%s, %s)" % (args, kwargs)
        return self.something

    def static_method(*args, **kwargs):
        print "this is a staticmethod"

    def class_method(cls, *args, **kwargs):
        print "this is a classmethod"
        print cls.__name__

    static_method = staticmethod(static_method)
    class_method = classmethod(class_method)

# --------------------------------------------------------------------------
---
# Demo
# --------------------------------------------------------------------------
---
def main():
    filename = "junk.txt"
    file_trace = trace.Trace(filename)
    stdout_trace = trace.Trace()
    MyClass.observers["on_before"] += [file_trace, stdout_trace]
    MyClass.observers["on_after"] += [file_trace, stdout_trace]
    MyClass.observers["on_error"] += [file_trace, stdout_trace]

    # Exercise the class to test the Aspected framework.
    f = MyClass()
    f.bar("a", "b", foo="c")
    try:
        f.raise_error("a", "b", foo="c")
    except Exception, e:
        print "An error occurred in main(): %s" % e
    f.something = "foo"
    x = f.return_something(1, 2, 3)
    print x
    MyClass.static_method()
    MyClass.class_method()

if __name__ == "__main__":
    main()
***


-






More information about the Python-list mailing list