Howto wait for multiple queues (Queue.py)?

Andreas Ames andreas.ames at tenovis.com
Wed Nov 20 06:50:07 EST 2002


Hi,

as I prefer to use queues for interthread communication, I need
something like a 'mainloop' for my threads in the sense that every
thread can have multiple input queues and I want to be able to block
the consumer thread until input within one (or more) queues is
available.  This mechanism should be somewhat comparable to 'select()'
or 'MsgWaitForMultipleObjects()' (on w32).

I've googled a bit but couldn't find a solution.  The only things I
can come up with are the following:

1) Polling:

   Iterating over all relevant queues and issue a nonblocking 'get' on
   each one; possibly yielding after each iteration to be a bit more
   polite.

   Pro:

        - straightforward

   Con:

        - performance penalty (wasting cpu time)

        - imposes implicitely a priority on the queues;  could even
          lead to the point where only the inputs of a single queue
          (say the first one in the loop) are handled while all others
          are ignored (if this single queue is *very* busy).  To avoid
          this I'd need to have some 'scheduling'/prioritization for
          the relevant queues.  This could easily get massive
          (depending on the number of queues).

         - very ugly ;-B

   Conclusion:  This is lousy.

2) Synchronization (take I):

   I associate some sort of a synchronization item with a Queue
   derived class.  Every 'put' increases a counter and every 'get'
   decreases it.  If the counter gets 0 by a 'get' the synchronization
   item gets unsignaled, if the counter is 1 after a 'put' it gets
   signaled.  Several queues can share the same synch item.  The
   consumer blocks on the unsignaled synch item and awakes when it
   gets signaled.  The simplest synch item I can come up with for this
   is a Semaphore.  Every (successful) DerivedQueue.put does a
   Semaphore.release and every (successful) DerivedQueue.get does a
   Semaphore.acquire.  The consumer does a Semaphore.acquire and has
   to issue a Semaphore.release immediately afterwards (to always have
   the semaphore counter reflect the correct number of items within
   all related queues and to avoid a deadlock on the next
   DerivedQueue.get anyway ;-)

   Pro:

        - consumer doesn't waste cpu cycles

   Con:

        - performance hit on 'get' and 'put'

        - consumer still has to test all related queues to find out
          which one signaled the synch item (some 'scheduling'
          algorithm needed as with polling)

        - when the consumer awakens from a synch item wait, there is
          no guarantee that it actually gets something from one of the
          queues due to multithreading (this might be acceptable for
          me)

        - two threads can share no queues at all or only the very same
          set of queues unless I let DerivedQueue accept several synch
          items (which makes another lock necessary within the
          DerivedQueue to maintain the list of synch items)

        - quite ugly

   Conclusion:  anything but optimal

3) Synchronization (take II):

   I implement a QueueContainer which accepts derived Queues
   (similarly as above).  The 'get' and 'put' methods call back the
   container.  The consumer uses a QueueContainer.get to get items out
   of own of its queues.  The DerivedQueue is able to call back
   different containers.  Furthermore DerivedQueue must expose an
   external lock so that 'qsize' can be used reliably when a new
   container is attached.  The problem which queue has issued a signal
   can be solved by maintaning a stack within the container.
   DerivedQueue.put corresponds to Stack.push, DerivedQueue.pop
   corresponds to Stack.pop.  Thus all events are handled in the order
   they were delivered to the input queues.

   Pro:

	- solves most of the problems from above

	- simple interface for consumers

   Con:

	- performance hit

	- what is the semantics of the QueueContainer.get when several
	  threads are blocking on it?

	- deadlocks lurking everywhere

	- not all too nice either

   Conclusion:  somewhat full featured but also somewhat bloated :-B

I would very much appreciate any suggestions for a better
(performancewise, more 'pythonic', whatsoever) implementation or have
I even overseen an already exisitng solution to my problem?


TIA,

andreas




More information about the Python-list mailing list