alternative to JoinableQueue's please
Filipe Fernandes
ffernand.list at gmail.com
Fri Jun 26 21:19:58 EDT 2009
Raymond Hettinger wrote:
> [Filipe Fernandes]
>> The reasons for using JoinableQueue I think are obvious. I want to
>> block the main processing using queue.join() until the tasks that have
>> been placed on the queue have been finished by the worker processes.
>>
>> I can't be the only one experiencing this (besides Brian)... are there
>> others who ran into this? Are there work arounds (besides a
>> home-brewed one) ?
>
> Before Queue.task_done() and Queue.task_join() were added, other
> idioms were used.
>
> One technique is to use a second queue to track incomplete tasks.
>
> # adding work
> unfinished_tasks.put(None)
> q.put(task)
>
>
> # doing work
> t = q.get()
> f(t)
> unfinished_tasks.get()
>
>
> # waiting for unfinished tasks to complete
> while unfinished_tasks.qsize():
> sleep(0.1)
Thanks Raymond... yours is by far is the simplest and should have been an
obvious solution. I didn't want to stray too far from what I had and this
fits the bill.
In case others are curious...
I had looked at using the example in
http://docs.python.org/library/multiprocessing.html#using-a-remote-manager
to use the traditional Queue from module Queue which includes the join and
task_done method calls. But creating a server process/thread just for that
is rather over-kill.
I also looked at using process pools asynchronously and waiting for the
iterators to come back to determine if jobs were completed.
But your solution is <embarrassingly> the simplest to implement. And easy
enough to create a composite class containing the two queues to simulate
the real one (although I have not tried the following, I'm not in the
office right now).
class JoinableQueue(object):
def __init__(*args, **kwargs):
self.__tasks = Queue()
self.__queue = Queue(*args, **kwargs)
def put(self, *args, **kwargs):
self.__tasks.put(None)
self.__queue.put(*args, **kwargs)
def get(self, *args, **kwargs):
return self.__queue.get(*args, **kwargs)
def task_done():
try:
self.__tasks.get(False)
except Queue.Empty:
raise ValueError('task_done called too many times')
def join():
while not self.__tasks.empty():
sleep(0.1)
[Add methods to simulate as required....]
filipe
ps: Thanks Raymond for the quick reply... and I feel rather apologetic for
having bothered the list with this :S
More information about the Python-list
mailing list