alternative to JoinableQueue's please

Filipe Fernandes ffernand.list at gmail.com
Fri Jun 26 21:19:58 EDT 2009


Raymond Hettinger wrote:
 > [Filipe Fernandes]
 >> The reasons for using JoinableQueue I think are obvious.  I want to
 >> block the main processing using queue.join() until the tasks that have
 >> been placed on the queue have been finished by the worker processes.
 >>
 >> I can't be the only one experiencing this (besides Brian)... are there
 >> others who ran into this?  Are there work arounds (besides a
 >> home-brewed one) ?
 >
 > Before Queue.task_done() and Queue.task_join() were added, other
 > idioms were used.
 >
 > One technique is to use a second queue to track incomplete tasks.
 >
 > # adding work
 > unfinished_tasks.put(None)
 > q.put(task)
 >
 >
 > # doing work
 > t = q.get()
 > f(t)
 > unfinished_tasks.get()
 >
 >
 > # waiting for unfinished tasks to complete
 > while unfinished_tasks.qsize():
 >     sleep(0.1)

Thanks Raymond... yours is by far is the simplest and should have been an 
obvious solution.  I didn't want to stray too far from what I had and this 
fits the bill.

In case others are curious...

I had looked at using the example in
http://docs.python.org/library/multiprocessing.html#using-a-remote-manager

to use the traditional Queue from module Queue which includes the join and 
task_done method calls. But creating a server process/thread just for that 
is rather over-kill.

I also looked at using process pools asynchronously and waiting for the 
iterators to come back to determine if jobs were completed.

But your solution is <embarrassingly> the simplest to implement.  And easy 
enough to create a composite class containing the two queues to simulate 
the real one (although I have not tried the following, I'm not in the 
office right now).


class JoinableQueue(object):
     def __init__(*args, **kwargs):
         self.__tasks = Queue()
         self.__queue = Queue(*args, **kwargs)

     def put(self, *args, **kwargs):
         self.__tasks.put(None)
         self.__queue.put(*args, **kwargs)

     def get(self, *args, **kwargs):
         return self.__queue.get(*args, **kwargs)

     def task_done():
         try:
            self.__tasks.get(False)
         except Queue.Empty:
            raise ValueError('task_done called too many times')

     def join():
         while not self.__tasks.empty():
             sleep(0.1)

[Add methods to simulate as required....]

filipe

ps: Thanks Raymond for the quick reply... and I feel rather apologetic for 
having bothered the list with this :S




More information about the Python-list mailing list