How to use threading and Queue modules

Dave Brueck dave at pythonapocrypha.com
Tue Mar 4 13:40:57 EST 2003


On Tue, 4 Mar 2003, Paul Moore wrote:

> I have a problem trying to use the threading and Queue modules. What
> I'm trying to do is to fire off a host of background threads, and let
> them do some work. In my main thread, I want to wait for the
> background workers to complete, and then pick up their results.
>
> The best way to handle the results seems to be via a Queue.Queue, so
> that I don't have to worry about locking issues.
>
> But now I hit a problem: How do I wait for the workers to all finish?
> If I just join all the workers in turn, I risk the queue filling up,
> resulting in deadlock. If I wait on the queue, I don't know when all
> the workers have finished.
>
> Maybe a queue isn't the right data structure here. Is there another,
> more suitable, data structure which can collect results from worker
> threads? Maybe all I want is a locked list. After all, the queue would
> be fine if it couldn't fill up. (There isn't enough data here for it
> to be a problem to keep it all in memory).
>
> I'm new to multi-threading, so I don't want something too hard :-)
>
> Basically, the only reason I'm using threads at all is to multiplex a
> lot of database conversations (50 or more) where IO and network times
> are the bottleneck - with threading, I can run all 50 in the time it
> takes the slowest to connect!

If there are a fixed number of results, one per thread, then you can just
get items off the queue until you have them all:

def ThreadFunc(q):
  ret = None
  try:
    ret = # do some work here, assigning result to ret
  finally:
    q.put(ret)

queue = Queue.Queue(0) # Infinitely large queue
for i in range(NUM_THREADS):
  threading.Thread(target=ThreadFunc, args=(queue,)).start()

for i in range(NUM_THREADS):
  result = queue.get()
  # do something with result

You'll process the results in the order that they finish, and the
wait-till-all-done condition happens automatically. Note that the thread
func puts an item into the queue no matter what so that if an exception
occurs your main thread doesn't hang forever. An alternate approach to
wait on a large number of threads is to have the main thread acquire a
semaphore (lock object) n times where n is the number of threads. Each
worker would still do a try..finally but in the finally clause they'd
release the semaphore to signal "I'm done".

-Dave





More information about the Python-list mailing list