problem with multiprocessing and defaultdict
wiso
gtu2003 at alice.it
Mon Jan 11 18:50:56 EST 2010
Robert Kern wrote:
> On 2010-01-11 17:15 PM, wiso wrote:
>> I'm using a class to read some data from files:
>>
>> import multiprocessing
>> from collections import defaultdict
>>
>> def SingleContainer():
>> return list()
>>
>>
>> class Container(defaultdict):
>> """
>> this class store odd line in self["odd"] and even line in
>> self["even"]. It is stupid, but it's only an example.
>> """
>> def __init__(self,file_name):
>> if type(file_name) != str:
>> raise AttributeError, "%s is not a string" % file_name
>> defaultdict.__init__(self,SingleContainer)
>> self.file_name = file_name
>> self.readen_lines = 0
>> def read(self):
>> f = open(self.file_name)
>> print "start reading file %s" % self.file_name
>> for line in f:
>> self.readen_lines += 1
>> values = line.split()
>> key = {0: "even", 1: "odd"}[self.readen_lines %2]
>> self[key].append(values)
>> print "readen %d lines from file %s" % (self.readen_lines,
>> self.file_name)
>>
>> """
>> Now I want to read more than one file per times
>> """
>>
>> def do(file_name):
>> container = Container(file_name)
>> container.read()
>> return container
>>
>> if __name__ == "__main__":
>> file_names = ["prova_200909.log", "prova_200910.log"]
>> pool = multiprocessing.Pool(len(file_names))
>> result = pool.map(do,file_names)
>> pool.close()
>> pool.join()
>> print "Finish"
>>
>>
>>
>> but I got:
>> start reading file prova_200909.log
>> start reading file prova_200910.log
>> readen 142 lines from file prova_200909.log
>> readen 160 lines from file prova_200910.log
>> Exception in thread Thread-2:
>> Traceback (most recent call last):
>> File "/usr/lib64/python2.6/threading.py", line 522, in
>> __bootstrap_inner
>> self.run()
>> File "/usr/lib64/python2.6/threading.py", line 477, in run
>> self.__target(*self.__args, **self.__kwargs)
>> File "/usr/lib64/python2.6/multiprocessing/pool.py", line 259, in
>> _handle_results
>> task = get()
>> File "main2.py", line 11, in __init__
>> raise AttributeError, "%s is not a string" % file_name
>> AttributeError: (AttributeError('<function SingleContainer at
>> 0x7f08b253d938> is not a string',),<class '__main__.Container'>,
>> (<function SingleContainer at 0x7f08b253d938>,))
>>
>>
>> the problem is when pool share objects, but why is it calling
>> Container.__init__ with a Container parameter?
>
> When you return the container from do() in the worker process, it must be
> pickled in order to be sent over the socket. You did not override the
> implementation of the .__reduce_ex__() method, so it used defaultdict's
> version which passes the factory function as the first argument to the
> constructor.
>
> I would recommend passing back the container.items() list instead of a
> Container instance as the easiest path forward.
>
Thank you very much, I change from
return container
to
return container.items()
and it works:
start reading file prova_200909.log
readen 142 lines from file prova_200909.log
start reading file prova_200910.log
readen 160 lines from file prova_200910.log
Finish
The problem now is this:
start reading file r1_200909.log
start reading file r1_200910.log
readen 488832 lines from file r1_200910.log
readen 517247 lines from file r1_200909.log
with huge file (the real case) the program freeze. Is there a solution to
avoid pickling/serialization, ... for example something like this:
if __name__ == "__main__":
file_names = ["r1_200909.log", "r1_200910.log"]
pool = multiprocessing.Pool(len(file_names))
childrens = [Container(f) for f in file_names]
pool.map(lambda c: c.read(), childrens)
PicklingError: Can't pickle <type 'function'>: attribute lookup
__builtin__.function failed
More information about the Python-list
mailing list