[Tutor] oo design/interaction quandary

Kent Johnson kent37 at tds.net
Sat Dec 26 04:24:42 CET 2009


On Fri, Dec 25, 2009 at 9:03 PM, Brian Jones <bkjones at gmail.com> wrote:
> Hi all,
> I'm having a design issue that's really bothering me. The code I'm writing
> is fairly large by now, but I've written what I think is a decent example
> that illustrates my problem.
> My app launches threads that each consume messages from a queue, send them
> to a processor object, and then the processor needs to return the message to
> the main thread -- if the processing was successful -- and then the main
> thread puts the message into a different queue.
> Here's a shortened version of my code.
>
> class Processor(object):
>     def __init__(self):
>         self.qhost = 'localhost'
>         self.qport = '5672'
>         self.uname = 'guest'
>         self.pwd = 'guest'
>         self.ssl = false
>         self.vhost = '/'
>         self.exch_name = 'fooX'
>         self.exch_type = 'direct'
>         self.queue_name = 'fooQ'
>         self.conn = amqp.Connection(userid=self.uname, password=self.pwd,
> host=self.qhost,
>                                     virtual_host=self.vhost, ssl=self.ssl)
>         self.chan = self.conn.channel()
>         self.chan.exchange_declare(self.exch_name, type=self.exch_type)
>         self.chan.queue_declare(self.qname)
>         self.chan.queue_bind(self.qname, self.exch_name)
>
>     def consume(self, callback):
>         self.chan.basic_consume(self.qname, callback=callback)
>         while True:
>             self.chan.wait()
>
>
> class Munger(object):
>     def munge(self,msg):
>         if msg % 2 == 0:
>             yield msg

Why is this a generator function? From what I understand looking at
channel.py in py-amqplib this should just be a callable that processes
the message.

If you want to notify the Sender that the message has been processed,
I would either make another Queue to pass messages back, or give the
Munger a callback function to pass successful messages to. The
difference between them is that a Queue is asynchronous and you would
have to process the output in another thread; a callback is
synchronous and will run in the same thread as the munger. Either of
these can be handled in a base class leaving subclasses to just
implement the message handling. For example, using a callback (a Queue
would be similar):

class MungerBase(object):
  def __init__(self, callback):
    self.callback = callback

  def munge(self, msg):
    if self.process(msg):
      self.callback(msg)

A real munger looks like this:

class Munger(MungerBase):
  def process(self, msg):
    if msg % 2 == 0:
      return True
    return False


> class Sender(object):
>     def run(self):
>         p = Processor()
>         m = Munger()
>         for msg in p.consume(m.munge):
>             """
>             I know this doesn't work right now. This
>             piece of the code should send 'msg' to another queue.
>
>             """
>             pass

This would look like
class Sender(object):
  def run(self):
    p = Processor()
    m = Munger(self.handle_processed_message)
    p.consume(m.munge)

  def handle_processed_message(self, msg):
    # send 'msg' to another queue or whatever

Kent


More information about the Tutor mailing list