Multiprocessing problem
larudwer
larudwer at freenet.de
Wed Mar 3 12:49:36 EST 2010
Hello Matt
I think the problem is here:
for n in xrange(100000):
outqueue.put(str(n)) <-- fill the queue with 100000
elements
try:
r = inqueue.get_nowait() <-- queue is still empty because
processes need some time to start
results.append(r)
except Empty:
pass <-- causing 100000 passes
....
print "-"
for task in tasks:
outqueue.put(None) <-- put even more data in the queue
...
# in the meantime the processes start to run and are trying to put data
# in to the output queue. However this queue might fill up, and lock
# all processes that try to write data in the already filled up queue
print "joining"
for task in tasks:
task.join() <-- can never succeed because processes
are waiting for someone reading the result queue
print "joined"
This example works:
from Queue import Empty, Full
from multiprocessing import Queue, Process
from base64 import b64encode
import time, random
class Worker(Process):
def __init__(self, inqueue, outqueue):
Process.__init__(self)
self.inqueue = inqueue
self.outqueue = outqueue
def run(self):
inqueue = self.inqueue
outqueue = self.outqueue
c = 0
while True:
arg = inqueue.get()
if arg is None: break
c += 1
b = b64encode(arg)
outqueue.put(b)
# Clean-up code goes here
outqueue.put(c)
class Supervisor(object):
def __init__(self):
pass
def go(self):
outqueue = Queue()
inqueue = Queue()
tasks = [Worker(outqueue, inqueue) for _ in xrange(4)]
for task in tasks:
task.start()
results = []
print "*"
for n in xrange(100000):
outqueue.put(str(n))
print "-"
for task in tasks:
outqueue.put(None)
print "emptying queue"
try:
while True:
r = inqueue.get_nowait()
results.append(r)
except Empty:
pass
print "done"
print len(results)
print "joining"
for task in tasks:
task.join()
print "joined"
if __name__ == "__main__":
s = Supervisor()
s.go()
More information about the Python-list
mailing list