problem with multiprocessing and defaultdict

wiso gtu2003 at alice.it
Mon Jan 11 18:50:56 EST 2010


Robert Kern wrote:

> On 2010-01-11 17:15 PM, wiso wrote:
>> I'm using a class to read some data from files:
>>
>> import multiprocessing
>> from collections import defaultdict
>>
>> def SingleContainer():
>>      return list()
>>
>>
>> class Container(defaultdict):
>>      """
>>      this class store odd line in self["odd"] and even line in
>>      self["even"]. It is stupid, but it's only an example.
>>      """
>>      def __init__(self,file_name):
>>          if type(file_name) != str:
>>              raise AttributeError, "%s is not a string" % file_name
>>          defaultdict.__init__(self,SingleContainer)
>>          self.file_name = file_name
>>          self.readen_lines = 0
>>      def read(self):
>>          f = open(self.file_name)
>>          print "start reading file %s" % self.file_name
>>          for line in f:
>>              self.readen_lines += 1
>>              values = line.split()
>>              key = {0: "even", 1: "odd"}[self.readen_lines %2]
>>              self[key].append(values)
>>          print "readen %d lines from file %s" % (self.readen_lines,
>> self.file_name)
>>
>> """
>> Now I want to read more than one file per times
>> """
>>
>> def do(file_name):
>>      container = Container(file_name)
>>      container.read()
>>      return container
>>
>> if __name__ == "__main__":
>>      file_names = ["prova_200909.log", "prova_200910.log"]
>>      pool = multiprocessing.Pool(len(file_names))
>>      result = pool.map(do,file_names)
>>      pool.close()
>>      pool.join()
>>      print "Finish"
>>
>>
>>
>> but I got:
>> start reading file prova_200909.log
>> start reading file prova_200910.log
>> readen 142 lines from file prova_200909.log
>> readen 160 lines from file prova_200910.log
>> Exception in thread Thread-2:
>> Traceback (most recent call last):
>>    File "/usr/lib64/python2.6/threading.py", line 522, in
>>    __bootstrap_inner
>>      self.run()
>>    File "/usr/lib64/python2.6/threading.py", line 477, in run
>>      self.__target(*self.__args, **self.__kwargs)
>>    File "/usr/lib64/python2.6/multiprocessing/pool.py", line 259, in
>> _handle_results
>>      task = get()
>>    File "main2.py", line 11, in __init__
>>      raise AttributeError, "%s is not a string" % file_name
>> AttributeError: (AttributeError('<function SingleContainer at
>> 0x7f08b253d938>  is not a string',),<class '__main__.Container'>,
>> (<function SingleContainer at 0x7f08b253d938>,))
>>
>>
>> the problem is when pool share objects, but why is it calling
>> Container.__init__ with a Container parameter?
> 
> When you return the container from do() in the worker process, it must be
> pickled in order to be sent over the socket. You did not override the
> implementation of the .__reduce_ex__() method, so it used defaultdict's
> version which passes the factory function as the first argument to the
> constructor.
> 
> I would recommend passing back the container.items() list instead of a
> Container instance as the easiest path forward.
> 

Thank you very much, I change from
 return container
to
 return container.items()
and it works:

start reading file prova_200909.log
readen 142 lines from file prova_200909.log
start reading file prova_200910.log
readen 160 lines from file prova_200910.log
Finish

The problem now is this:
start reading file r1_200909.log
start reading file r1_200910.log
readen 488832 lines from file r1_200910.log
readen 517247 lines from file r1_200909.log

with huge file (the real case) the program freeze. Is there a solution to 
avoid pickling/serialization, ... for example something like this:

if __name__ == "__main__":
    file_names = ["r1_200909.log", "r1_200910.log"]
    pool = multiprocessing.Pool(len(file_names))
    childrens = [Container(f) for f in file_names]
    pool.map(lambda c: c.read(), childrens)

PicklingError: Can't pickle <type 'function'>: attribute lookup 
__builtin__.function failed




More information about the Python-list mailing list