[Tutor] oo design/interaction quandary
Kent Johnson
kent37 at tds.net
Sat Dec 26 04:24:42 CET 2009
On Fri, Dec 25, 2009 at 9:03 PM, Brian Jones <bkjones at gmail.com> wrote:
> Hi all,
> I'm having a design issue that's really bothering me. The code I'm writing
> is fairly large by now, but I've written what I think is a decent example
> that illustrates my problem.
> My app launches threads that each consume messages from a queue, send them
> to a processor object, and then the processor needs to return the message to
> the main thread -- if the processing was successful -- and then the main
> thread puts the message into a different queue.
> Here's a shortened version of my code.
>
> class Processor(object):
> def __init__(self):
> self.qhost = 'localhost'
> self.qport = '5672'
> self.uname = 'guest'
> self.pwd = 'guest'
> self.ssl = false
> self.vhost = '/'
> self.exch_name = 'fooX'
> self.exch_type = 'direct'
> self.queue_name = 'fooQ'
> self.conn = amqp.Connection(userid=self.uname, password=self.pwd,
> host=self.qhost,
> virtual_host=self.vhost, ssl=self.ssl)
> self.chan = self.conn.channel()
> self.chan.exchange_declare(self.exch_name, type=self.exch_type)
> self.chan.queue_declare(self.qname)
> self.chan.queue_bind(self.qname, self.exch_name)
>
> def consume(self, callback):
> self.chan.basic_consume(self.qname, callback=callback)
> while True:
> self.chan.wait()
>
>
> class Munger(object):
> def munge(self,msg):
> if msg % 2 == 0:
> yield msg
Why is this a generator function? From what I understand looking at
channel.py in py-amqplib this should just be a callable that processes
the message.
If you want to notify the Sender that the message has been processed,
I would either make another Queue to pass messages back, or give the
Munger a callback function to pass successful messages to. The
difference between them is that a Queue is asynchronous and you would
have to process the output in another thread; a callback is
synchronous and will run in the same thread as the munger. Either of
these can be handled in a base class leaving subclasses to just
implement the message handling. For example, using a callback (a Queue
would be similar):
class MungerBase(object):
def __init__(self, callback):
self.callback = callback
def munge(self, msg):
if self.process(msg):
self.callback(msg)
A real munger looks like this:
class Munger(MungerBase):
def process(self, msg):
if msg % 2 == 0:
return True
return False
> class Sender(object):
> def run(self):
> p = Processor()
> m = Munger()
> for msg in p.consume(m.munge):
> """
> I know this doesn't work right now. This
> piece of the code should send 'msg' to another queue.
>
> """
> pass
This would look like
class Sender(object):
def run(self):
p = Processor()
m = Munger(self.handle_processed_message)
p.consume(m.munge)
def handle_processed_message(self, msg):
# send 'msg' to another queue or whatever
Kent
More information about the Tutor
mailing list