Asynchronous Messaging
wink
wink at saville.com
Wed Sep 26 11:18:32 EDT 2007
On Sep 26, 4:47 am, wink <w... at saville.com> wrote:
To make it easier to comment on the code, I'm including
"mproc.py" file below. Fredrik, was commenting about using
Queue and in fact I do. Queue is quite nice and is also
thread safe, which is a requirement for this implementation.
But its performance is poor if the number of items on a
Queue becomes large because it is implemented using a list.
One of the things I was thinking of was doing another implementation
using of Queue which was based on deque.
"""Message Processor Module.
This modules allows programmers to create compnents
which communicate asynchronously via messages. In
addition the receiving component is will only handle
one message at a time. This allows the programmer to
create multi-threaded program with fewer shared memory
thus fewer mutex's and semaphore's.
The basic communication is via an instance of the
Msg class which is sent to mproc's using the send
method. An Mproc is an active component which has a
thread and executes asynchronously from all other
Mproc's by using one MprocDriver for each Mproc. It
is also possible for several mproc's to share one
MprocDriver by using BaseMproc.
Each mproc must override the _handler method. When a
message arrives for a mproc is it placed in a Queue
and the driver calls the _handler method passing the
Msg as a parameter. The driver uses the Queue to
serialize the message processing, thus the _handler
method will be invoked with one message at a time.
Thus the _handler method does not generally need to
use mutex's or semaphore's. But because each message's
processing must be completed before the next message
will be started it is important that the message be
processed as quickly as possible.
Add more documentation."""
import copy
import threading
import Queue
import traceback
class Msg:
"""A message"""
def __init__(self):
"""Initializer"""
self.dstMpId = None
self.dstCnId = None
self.srcMpId = None
self.srcCnId = None
self.mid = None
self.cmd = None
self.tag = None
self.status = None
self.data = None
def dup(self):
"""Duplicate the message"""
msgCopy = copy.deepcopy(self)
return msgCopy
def send(self):
"""Send a message.
Returns True if the message was started on its way"""
try:
MprocDriver._mpList[self.dstMpId]._msgQueue.put(self)
return True
except:
return False
class BaseMproc:
"""Message Processor.
A message processor requires a handler method and another
driver which passes messages to it. This mproc driver has
one overriding requirement, it must only pass one message at
a time to the handler method. This eliminates any need for
the programmer to worry about multi-threading issues while
processing messages.
This does put a burden on the handler routine, it must process
messages quickly and in a non-blocking fashion so that the
mproc may remain lively.
The name of an BaseMproc must be unique, an exception is thrown
if the name is already used."""
def __init__(self, name, mprocDriver):
"""Initializer"""
self.name = name
addMproc(self, mprocDriver)
def close(self):
"""Close the mproc"""
#print "BaseMproc.close: ", self.name
try:
rmvMproc(self)
except:
#print "BaseMproc.close: excption"
traceback.print_exc()
self._unreg()
def _handler(self):
"""Override this routine."""
raise Exception("BaseMproc._handler needs to be overridden")
def _reg(self, mprocDriver, id):
"""Register the mproc driver for this mproc"""
self._mprocDriver = mprocDriver
self._msgQueue = mprocDriver._msgQueue
self.id = id
def _unreg(self):
"""Unregister the mproc driver for this mproc"""
self._mprocDriver = None
self._msgQueue = None
self.id = None
class Mproc(BaseMproc):
"""Active Message Processor.
An active message processor isa BaseMproc but it always creates a
MprocDriver instance as its driver"""
def __init__(self, name):
"""Initializer"""
BaseMproc.__init__(self, name,
MprocDriver("ActiveMprocDriver_" + name))
def close(self):
"""Close the active mproc"""
try:
this_mprocDriver = self._mprocDriver
BaseMproc.close(self)
this_mprocDriver.close()
except:
print "Mproc.close: excption"
traceback.print_exc()
self._unreg()
class MprocDriver(threading.Thread, BaseMproc):
"""Message processor driver."""
_mpList = []
_mpDict = {}
def __init__(self, name):
"""Initializer"""
self._thisMpdDict = {}
self._running = True
self._msgQueue = Queue.Queue()
threading.Thread.__init__(self)
BaseMproc.__init__(self, name, self)
self.start()
def _regMproc(self, mproc, id):
self._thisMpdDict[mproc.name] = mproc
mproc._reg(self, id)
def _unregMproc(self, mproc):
#print "%s._unregMproc(%s):" % (self.name, mproc.name)
del self._thisMpdDict[mproc.name]
mproc._unreg()
def _handler(self, msg):
if (msg.mid == -1) and (msg.cmd == 0):
self._running = False
msg.completeQ.put(0)
def close(self):
"""Close the mproc driver"""
# Remove all mprocs related to this MprocDriver?????
mprocs = self._thisMpdDict.values()
for mproc in mprocs:
if (mproc.name != self.name):
rmvMproc(mproc)
completeQ = Queue.Queue()
msg = Msg()
msg.dstMpId = self.id
msg.mid = -1
msg.cmd = 0
msg.completeQ = completeQ
msg.send()
completeQ.get()
# Remove ourself
BaseMproc.close(self)
del self._thisMpdDict
def run(self):
while (self._running):
try:
msg = self._msgQueue.get()
mproc = MprocDriver._mpList[msg.dstMpId]
mproc._handler(msg)
except:
if msg == None:
print "run: no message"
elif mproc == None:
print "run: no mproc"
elif not callable(mproc._handler):
print "run: mproc._handler is not callable"
else:
print "run: mproc._handler caused an exception"
traceback.print_exc()
class PsMgr:
"""Publish Subscribe Manager.
Allow mprocs to subscribe to any message having
a specified mid/cmd.
Maybe this should be in a separte module and instead
of using class variables?
"""
midDict = {}
@classmethod
def publish(self, msg):
"""Send the message to all subscribers"""
pass
@classmethod
def subscribe(self, mproc, mid, cmd):
"""Subscribe the mproc to messages with mid/cmd"""
self.publish = PsMgr._publish
self.subscribe = PsMgr._subscribe
self.unsubscribe = PsMgr._unsubscribe
self.subscribe(mproc, mid, cmd)
@classmethod
def unsubscribe(self, mproc, mid, cmd):
"""Unsubscirve the mproc"""
pass
@classmethod
def _publish(self, msg):
"""The actual publish routine where there is one or more
subscribers"""
try:
#print "_publish: msg.mid=%d msg.cmd=%d" % (msg.mid,
msg.cmd)
subscribers = self.midDict[msg.mid][msg.cmd]
except KeyError:
#print "_publish: error no subscribers for msg.mid=%d
msg.cmd=%d" % (msg.mid, msg.cmd)
pass
else:
for mproc in subscribers:
msgNew = msg.dup()
#print "_public mid=%d cmd=%d to %s" % (msgNew.mid,
msgNew.cmd, mproc.name) #, mproc.id)
msgNew.dstMpId = mproc.id
msgNew.send()
@classmethod
def _subscribe(self, mproc, mid, cmd):
"""The actual subscribe routine"""
#print "_subscribe: add mproc %s for mid=%d cmd=%d" %
(mproc.name, mid, cmd)
cmdDict = self.midDict.get(mid, {})
subscribers = cmdDict.get(cmd, [])
subscribers.append(mproc)
cmdDict[cmd] = subscribers
self.midDict[mid] = cmdDict
@classmethod
def _unsubscribe(self, mproc, mid, cmd):
"""The actual unsubscribe routine when there is one or more
subscribers"""
#print "_unsubscribe: remove mproc %s for mid=%d cmd=%d" %
(mproc.name, mid, cmd)
cmdDict = self.midDict.get(mid, {})
subscribers = cmdDict.get(cmd, [])
delList = []
count = 0
for mp in subscribers:
if mp == mproc:
delList.append(count)
count += 1
l = len(delList)
for x in xrange(l-1, -1, -1):
del subscribers[delList[x]]
if len(self.midDict) == 0:
self.publish = PsMgr.publish
self.subscribe = PsMgr.subscribe
self.unsubscribe = PsMgr.unsubscribe
def lookupMproc(name, onNotFound=None):
"""Lookup an message processor"""
try:
#print "lookupMproc: %s dict=%s" % (name, MprocDriver._mpDict)
mproc = MprocDriver._mpDict[name]
except: # Add not found exception?
#print "lookupMproc: NOT FOUND", name
mproc = onNotFound
return mproc
def addMproc(mproc, mprocDriver):
"""Add a new message processor to the database"""
if (lookupMproc(mproc.name) != None):
raise NameError("%s BaseMproc already exists" % mproc.name)
#print "addMproc:", mproc.name
MprocDriver._mpList.append(mproc)
MprocDriver._mpDict[mproc.name] = mproc
#print "addMproc: dict=", MprocDriver._mpDict
id = MprocDriver._mpList.index(mproc)
mprocDriver._regMproc(mproc, id)
def rmvMproc(mproc):
"""Remove message processor from the database"""
#print "rmvMproc:", mproc.name
MprocDriver._mpList[mproc.id] = None
del MprocDriver._mpDict[mproc.name];
#print "rmvMproc: dict=", MprocDriver._mpDict
mproc._mprocDriver._unregMproc(mproc)
More information about the Python-list
mailing list