safely returning objects from a Process to the parent through a Queue()

MRAB python at mrabarnett.plus.com
Fri Nov 6 20:53:59 CET 2009


Verweij, Arjen wrote:
> Hi,
> 
> I'm trying to parallelize a loop using Process and Queue. My code looks similar to the example in http://docs.python.org/library/multiprocessing.html?highlight=multiprocessing#module-multiprocessing.managers
> 
> It looks like this:
> 
> def pp(q, t, s, a, h, p):
>     doStuff()
>     c.generate()
>     q.put(c, True) #stuff a in a queue
> 
> main:
> 
> for a in aa:
>     processes = []
>     q = Queue()
>     for b in bb:
>         try:
>             if bla > 24:
>                 print "too old"
>                 continue
>         except:
>             print "etc" + t #file not found
>             pass
>         try:
>             p = Process(target=pp, args=(q, t, s, a, h, p,)) #trailing comma, do not touch m'kay
>             p.start()
>             processes.append(p)
>     #end for b in bb
>     for p in processes:
>         # p.join()                    # this deadlocks in combination with the Queue() at some point
>         ss = q.get()
>         bigcontainer.add(ss)
>     bigcontainer.generate()
>     world.add(bigcontainer)
> #end for a in aa
> world.generate()
> 
> So I have some XML, that requires translation into HTML. I take a sublist, traverse it, spawn a process for every XML file in that list and generate HTML inside that process. Then I would very much like to have the results back in the original main() so it can be used. Is there a way to guarantee that the a in aa loop will not start the next loop? In other words, I'm worried that q will be depleted due to some unknown factor while a subprocess from b in bb still has to write to the Queue() and it will continue somehow leaking/destroying data.
> 
> Before I found the example in the webpage that pointed out the deadlock I couldn't get the program to finish, but finishing without all the data would be equally bad.
> 
Here the answer I gave to an earlier question about multiprocessing:

"""It think it's down to the difference between multithreading and
multiprocessing.

When multithreading, the threads share the same address space, so items
can be passed between the threads directly.

However, when multiprocessing, the processes don't share the same
address space, so items need to be passed from process to process via a
pipe. Unfortunately, the pipe has a limited capacity, so if a process
doesn't read from one end then the pipe will eventually fill up and the
sender will block. Also, a process won't terminate until it has finished
writing to the pipe, and it can't be joined until it has terminated.

You can therefore get into a deadlock where:

* Process A won't read from the queue until it has joined process B.
* The join won't succeed until process B has terminated.
* Process B won't terminate until it has finished writing to the queue.
* Process B can't finish writing to the queue because it's full.
* The queue is full because process A isn't reading from it.
"""

I suggest you get all the results from the queue and then join all the
processes:

     for p in processes:
         ss = q.get()
         bigcontainer.add(ss)
     for p in processes:
         p.join()




More information about the Python-list mailing list