Asynchronous Messaging

wink wink at saville.com
Wed Sep 26 11:18:32 EDT 2007


On Sep 26, 4:47 am, wink <w... at saville.com> wrote:
To make it easier to comment on the code, I'm including
"mproc.py" file below. Fredrik, was commenting about using
Queue and in fact I do. Queue is quite nice and is also
thread safe, which is a requirement for this implementation.
But its performance is poor if the number of items on a
Queue becomes large because it is implemented using a list.

One of the things I was thinking of was doing another implementation
using of Queue which was based on deque.


"""Message Processor Module.

This modules allows programmers to create compnents
which communicate asynchronously via messages. In
addition the receiving component is will only handle
one message at a time. This allows the programmer to
create multi-threaded program with fewer shared memory
thus fewer mutex's and semaphore's.

The basic communication is via an instance of the
Msg class which is sent to mproc's using the send
method. An Mproc is an active component which has a
thread and executes asynchronously from all other
Mproc's by using one MprocDriver for each Mproc. It
is also possible for several mproc's to share one
MprocDriver by using BaseMproc.

Each mproc must override the _handler method. When a
message arrives for a mproc is it placed in a Queue
and the driver calls the _handler method passing the
Msg as a parameter. The driver uses the Queue to
serialize the message processing, thus the _handler
method will be invoked with one message at a time.
Thus the _handler method does not generally need to
use mutex's or semaphore's. But because each message's
processing must be completed before the next message
will be started it is important that the message be
processed as quickly as possible.

Add more documentation."""

import copy
import threading
import Queue
import traceback

class Msg:
    """A message"""

    def __init__(self):
        """Initializer"""
        self.dstMpId = None
        self.dstCnId = None
        self.srcMpId = None
        self.srcCnId = None
        self.mid = None
        self.cmd = None
        self.tag = None
        self.status = None
        self.data = None

    def dup(self):
        """Duplicate the message"""
        msgCopy = copy.deepcopy(self)
        return msgCopy

    def send(self):
        """Send a message.

        Returns True if the message was started on its way"""

        try:
            MprocDriver._mpList[self.dstMpId]._msgQueue.put(self)
            return True
        except:
            return False

class BaseMproc:
    """Message Processor.

    A message processor requires a handler method and another
    driver which passes messages to it. This mproc driver has
    one overriding requirement, it must only pass one message at
    a time to the handler method. This eliminates any need for
    the programmer to worry about multi-threading issues while
    processing messages.

    This does put a burden on the handler routine, it must process
    messages quickly and in a non-blocking fashion so that the
    mproc may remain lively.

    The name of an BaseMproc must be unique, an exception is thrown
    if the name is already used."""

    def __init__(self, name, mprocDriver):
        """Initializer"""
        self.name = name
        addMproc(self, mprocDriver)

    def close(self):
        """Close the mproc"""
        #print "BaseMproc.close: ", self.name
        try:
            rmvMproc(self)
        except:
            #print "BaseMproc.close: excption"
            traceback.print_exc()
            self._unreg()

    def _handler(self):
        """Override this routine."""
        raise Exception("BaseMproc._handler needs to be overridden")

    def _reg(self, mprocDriver, id):
        """Register the mproc driver for this mproc"""
        self._mprocDriver = mprocDriver
        self._msgQueue = mprocDriver._msgQueue
        self.id = id

    def _unreg(self):
        """Unregister the mproc driver for this mproc"""
        self._mprocDriver = None
        self._msgQueue = None
        self.id = None

class Mproc(BaseMproc):
    """Active Message Processor.

    An active message processor isa BaseMproc but it always creates a
    MprocDriver instance as its driver"""

    def __init__(self, name):
        """Initializer"""
        BaseMproc.__init__(self, name,
MprocDriver("ActiveMprocDriver_" + name))

    def close(self):
        """Close the active mproc"""
        try:
            this_mprocDriver = self._mprocDriver
            BaseMproc.close(self)
            this_mprocDriver.close()
        except:
            print "Mproc.close: excption"
            traceback.print_exc()
            self._unreg()

class MprocDriver(threading.Thread, BaseMproc):
    """Message processor driver."""
    _mpList = []
    _mpDict = {}

    def __init__(self, name):
        """Initializer"""

        self._thisMpdDict = {}
        self._running = True
        self._msgQueue = Queue.Queue()

        threading.Thread.__init__(self)
        BaseMproc.__init__(self, name, self)

        self.start()

    def _regMproc(self, mproc, id):
        self._thisMpdDict[mproc.name] = mproc
        mproc._reg(self, id)

    def _unregMproc(self, mproc):
        #print "%s._unregMproc(%s):" % (self.name, mproc.name)
        del self._thisMpdDict[mproc.name]
        mproc._unreg()

    def _handler(self, msg):
        if (msg.mid == -1) and (msg.cmd == 0):
            self._running = False
            msg.completeQ.put(0)

    def close(self):
        """Close the mproc driver"""

        # Remove all mprocs related to this MprocDriver?????
        mprocs = self._thisMpdDict.values()
        for mproc in mprocs:
            if (mproc.name != self.name):
                rmvMproc(mproc)

        completeQ = Queue.Queue()
        msg = Msg()
        msg.dstMpId = self.id
        msg.mid = -1
        msg.cmd = 0
        msg.completeQ = completeQ
        msg.send()
        completeQ.get()

        # Remove ourself
        BaseMproc.close(self)
        del self._thisMpdDict

    def run(self):
        while (self._running):
            try:
                msg = self._msgQueue.get()
                mproc = MprocDriver._mpList[msg.dstMpId]
                mproc._handler(msg)
            except:
                if msg == None:
                    print "run: no message"
                elif mproc == None:
                    print "run: no mproc"
                elif not callable(mproc._handler):
                    print "run: mproc._handler is not callable"
                else:
                    print "run: mproc._handler caused an exception"
                    traceback.print_exc()

class PsMgr:
    """Publish Subscribe Manager.

    Allow mprocs to subscribe to any message having
    a specified mid/cmd.

    Maybe this should be in a separte module and instead
    of using class variables?
    """

    midDict = {}

    @classmethod
    def publish(self, msg):
        """Send the message to all subscribers"""
        pass

    @classmethod
    def subscribe(self, mproc, mid, cmd):
        """Subscribe the mproc to messages with mid/cmd"""
        self.publish = PsMgr._publish
        self.subscribe = PsMgr._subscribe
        self.unsubscribe = PsMgr._unsubscribe
        self.subscribe(mproc, mid, cmd)

    @classmethod
    def unsubscribe(self, mproc, mid, cmd):
        """Unsubscirve the mproc"""
        pass

    @classmethod
    def _publish(self, msg):
        """The actual publish routine where there is one or more
subscribers"""
        try:
            #print "_publish: msg.mid=%d msg.cmd=%d" % (msg.mid,
msg.cmd)
            subscribers = self.midDict[msg.mid][msg.cmd]
        except KeyError:
            #print "_publish: error no subscribers for msg.mid=%d
msg.cmd=%d" % (msg.mid, msg.cmd)
            pass
        else:
            for mproc in subscribers:
                msgNew = msg.dup()
                #print "_public mid=%d cmd=%d to %s" % (msgNew.mid,
msgNew.cmd, mproc.name) #, mproc.id)
                msgNew.dstMpId = mproc.id
                msgNew.send()

    @classmethod
    def _subscribe(self, mproc, mid, cmd):
        """The actual subscribe routine"""
        #print "_subscribe: add mproc %s for mid=%d cmd=%d" %
(mproc.name, mid, cmd)
        cmdDict = self.midDict.get(mid, {})
        subscribers = cmdDict.get(cmd, [])
        subscribers.append(mproc)
        cmdDict[cmd] = subscribers
        self.midDict[mid] = cmdDict

    @classmethod
    def _unsubscribe(self, mproc, mid, cmd):
        """The actual unsubscribe routine when there is one or more
subscribers"""
        #print "_unsubscribe: remove mproc %s for mid=%d cmd=%d" %
(mproc.name, mid, cmd)
        cmdDict = self.midDict.get(mid, {})
        subscribers = cmdDict.get(cmd, [])

        delList = []
        count = 0
        for mp in subscribers:
            if mp == mproc:
                delList.append(count)
            count += 1
        l = len(delList)
        for x in xrange(l-1, -1, -1):
            del subscribers[delList[x]]

        if len(self.midDict) == 0:
            self.publish = PsMgr.publish
            self.subscribe = PsMgr.subscribe
            self.unsubscribe = PsMgr.unsubscribe

def lookupMproc(name, onNotFound=None):
    """Lookup an message processor"""
    try:
        #print "lookupMproc: %s dict=%s" % (name, MprocDriver._mpDict)
        mproc = MprocDriver._mpDict[name]
    except: # Add not found exception?
        #print "lookupMproc: NOT FOUND", name
        mproc = onNotFound
    return mproc

def addMproc(mproc, mprocDriver):
    """Add a new message processor to the database"""
    if (lookupMproc(mproc.name) != None):
        raise NameError("%s BaseMproc already exists" % mproc.name)

    #print "addMproc:", mproc.name
    MprocDriver._mpList.append(mproc)
    MprocDriver._mpDict[mproc.name] = mproc
    #print "addMproc: dict=", MprocDriver._mpDict

    id = MprocDriver._mpList.index(mproc)
    mprocDriver._regMproc(mproc, id)

def rmvMproc(mproc):
    """Remove message processor from the database"""
    #print "rmvMproc:", mproc.name
    MprocDriver._mpList[mproc.id] = None
    del MprocDriver._mpDict[mproc.name];
    #print "rmvMproc: dict=", MprocDriver._mpDict
    mproc._mprocDriver._unregMproc(mproc)





More information about the Python-list mailing list