multiprocessing problems
Nils Ruettershoff
nils at ccsg.de
Wed Jan 20 18:30:21 EST 2010
Hi Doxa,
DoxaLogos wrote:
[...]
> I found out my problems. One thing I did was followed the test queue
> example in the documentation, but the biggest problem turned out to be
> a pool instantiated globally in my script was causing most of the
> endless process spawn, even with the "if __name__ == "__main__":"
> block.
>
Problems who solves them self, are the best problems ;)
One tip: currently your algorithm has some overhead. 'Cause you are
starting 4 time an additional python interpreter, compute the files and,
closing all new spawned interpreter and starting again 4 interpreter,
which are processing the files.
For such kind of jobs I prefer to start processes once and feeding them
with data via a queue. This reduces some overhead and increase runtime
performance.
This could look like this:
(due some pseudo functions not directly executeable -> untested)
import multiprocessing
import Queue
class Worker(multiprocessing.Process):
def __init__(self, feeder_q, queue_filled):
multiprocessing.Process.__init__(self)
self.feeder_q = feeder_q
self.queue_filled = queue_filled
def run(self):
serve = True
# start infinite loop
while serve:
try:
# scan queue for work, will block process up to 5
seconds. If until then no item is in queue a Queue.Empty will be raised
text = self.feeder_q.get(True, timeout=5)
if text:
do_stuff(text)
# very important! tell the queue that the fetched
work has been finished
# otherwise the feeder_q.join() would block infinite
self.input_queue.task_done()
except Queue.Empty:
# as soon as queue is empty and all work has been enqueued
# process can terminate itself
if self.queue_filled.is_set() and feeder_q.empty():
serve = False
return
if __name__ == '__main__':
number_of_processes = 4
queue_filled = multiprocessing.Event()
feeder_q = multiprocessing.JoinableQueue()
process_list =[]
# get file name which need to be processed
all_files = get_all_files()
# start processes
for i in xrange(0,number_of_processes):
process = Worker(feeder_q, queue_filled)
process.start()
process_list.append(thread)
# start feeding
for file in all_files:
feeder_q.put(file)
# inform processes that all work has been ordered
queue_filled.set()
# wait until queue is empty
feeder_q.join()
# wait until all processed have finished their jobs
for process in process_list:
process.join()
Cheers,
Nils
More information about the Python-list
mailing list