Q: multiprocessing.Queue size limitations or bug...

Michael knurxs at gmx.de
Thu Aug 27 09:49:21 EDT 2009


On Aug 27, 8:56 am, ryles <ryle... at gmail.com> wrote:
> On Aug 26, 4:56 am, Michael Riedel <mrie... at inova-semiconductors.de>
> wrote:
>
>
>
> > Sorry for being not more specific but I'm not absolutely certain whether
> > I encountered a bug or did anything wrong:
>
> > The (stupid) code below results in a stall forever or not at 'p0.join()'
> > depending on the value of TROUBLE_MAKER.
>
> > Any help, thoughts, comments?
>
> > Thank you for your time.
>
> > Michael
>
> > # ----------------------------------------------------------------------
>
> > from multiprocessing import Process, Queue
>
> > # bit vector size
> > BVS=8
>
> > #
> > TROUBLE_MAKER=12  # for greater values p0.join() is never satisfied...
>
> > def evaluate(q, id, start=0, stop=2**BVS):
>
> >     cmin = {0: []}
>
> >     for mask0 in range(start, stop):
> >         for mask1 in range(0, 2**BVS):
> >             for mask2 in range(mask1, TROUBLE_MAKER):
> >                 cmin[0].append((mask0, mask1, mask2))
>
> >     print 'process %d finished (dict layout: %d/%d)...' % (id,
> > len(cmin), len(cmin[0]))
> >     q.put(cmin.copy())
> >     q.close()
>
> > if __name__ == '__main__':
>
> >     q0 = Queue()
> >     q1 = Queue()
> >     q2 = Queue()
> >     q3 = Queue()
>
> >     part = 2**BVS/4
> >     p0 = Process(target=evaluate, args=(q0, 0, 0*part, 1*part),
> > name='worker_0')
> >     p1 = Process(target=evaluate, args=(q1, 1, 1*part, 2*part),
> > name='worker_1')
> >     p2 = Process(target=evaluate, args=(q2, 2, 2*part, 3*part),
> > name='worker_2')
> >     p3 = Process(target=evaluate, args=(q3, 3, 3*part, 4*part),
> > name='worker_3')
> >     p0.start()
> >     print 'process 0 started...'
> >     p1.start()
> >     print 'process 1 started...'
> >     p2.start()
> >     print 'process 2 started...'
> >     p3.start()
> >     print 'process 3 started...'
> >     # main process stalls at p0.join() for bigger TROUBLE_MAKER
> >     p0.join()
> >     p1.join()
> >     p2.join()
> >     p3.join()
> >     res0 = q0.get()
> >     res1 = q1.get()
> >     res2 = q2.get()
> >     res3 = q3.get()
> >     print 'results fetched...'
>
> > # ----------------------------------------------------------------------
>
> > --
>
> There is a warning related to this in the documentation:
>
> http://docs.python.org/library/multiprocessing.html#pipes-and-queues
>
> Basically, you should reverse the order of the get() and join() calls.
>
> multiprocessing does a pretty nice job of abstracting away the low-
> level details of IPC, but there are still some gotchas. As you've
> noticed, your program will deadlock when there is a large enough
> amount of data being put into the queue. This is related to a hidden
> thread that exists inside each of your child processes. The thread is
> responsible for taking your queue items from an internal buffer and
> then writing them into a pipe that your parent process will read from
> when get() is called. The pipe mechanism is what allows the two
> processes to pass information, and is supported directly by the
> Operating System. However, the pipe has a limited capacity, and when
> it is full, the writer thread is stuck waiting for the reader to read
> enough from the pipe so that it can finish its write. The problem is
> that your parent process (reader) is not actually calling get() to
> drain the pipe. Instead it's stuck in join() waiting for the writer to
> complete.

I see. I really appreciate your valuable feedback.



More information about the Python-list mailing list