help with multiprocessing pool

Craig Yoshioka craigyk at me.com
Thu Jan 27 18:44:31 EST 2011


I did eventually get the original code to run from the command line but not the interpreter, so the new example does have a similar problem.

Of course it's not as simple as saying I can't run an imported parallelized function from the interpreter because I can, as long as the parallelized function is being invoked directly.
But if I caused the parallelized function to be invoked indirectly, for example, by importing a class that uses the parallelized function to set a class variable, it'll hang the interpreter.

for now I added the following to any module that uses parallelized functions

import __main__

if hasattr(__main__,'__file__'):
    __SHOULD_MULTITHREAD__ = True
else:
    __SHOULD_MULTITHREAD__ = False

and the parallelized functions check this flag to determine wether to run serially or not.

This at least lets me import my classes into the interpreter so I can 'play' with them, although they initialize much slower.

I'm not sure why Pool needs the __main__ module, except maybe someone sticks centralized process tracking information in there... sigh add it to the fuel of my love/hate with Python.





On Jan 27, 2011, at 10:38 AM, Philip Semanchuk wrote:

> 
> On Jan 27, 2011, at 1:12 PM, Craig Yoshioka wrote:
> 
>> The code will be multi-platform.  The OSXisms are there as an example, though I am developing on OS X machine.
>> 
>> I've distilled my problem down to a simpler case, so hopefully that'll help troubleshoot.  
>> 
>> I have 2 files:
>> 
>> test.py:
>> --------------------------------------------------------------
>> from multiprocessing import Pool
>> 
>> def square(x):
>>   return x*x
>> 
>> def squares(numbers):
>>    pool = Pool(12)
>>    return pool.map(square,numbers)
>> 
>> 
>> test2.py:
>> --------------------------------------------------------------
>> from test import squares
>> 
>> maxvalues = squares(range(3))
>> print maxvalues
>> 
>> 
>> 
>> Now if I import squares into the interactive interpreter:
>> 
>> from test import squares
>> print squares(range(3))
>> 
>> I get the correct result, but if I try to import maxvalues from test2 the interactive interpreter python hangs.
>> if I run the script from bash, though, it seems to run fine. 
> 
> The short, complete example is much more useful, but it sounds like it demonstrates a different problem than you first described. Your first posting said that your code worked in the interpreter but failed when run from the command line. This code has the opposite problem. Correct?
> 
>> I think it might have something to do with this note in the docs, though I am not sure how to use this information to fix my problem:
>> 
>> Note: Functionality within this package requires that the __main__ method be importable by the children. This is covered inProgramming guidelines however it is worth pointing out here. This means that some examples, such as themultiprocessing.Pool examples will not work in the interactive interpreter.
> 
> I suspect this is the problem with the demo above. Your original code ran fine in the interpreter, though, correct?
> 
> bye
> Philip
> 
> 
>> 
>> On Jan 27, 2011, at 6:39 AM, Philip Semanchuk wrote:
>> 
>>> 
>>> On Jan 25, 2011, at 8:19 PM, Craig Yoshioka wrote:
>>> 
>>>> Hi all, 
>>>> 
>>>> I could really use some help with a problem I'm having.
>>> 
>>> 
>>> Hiya Craig,
>>> I don't know if I can help, but it's really difficult to do without a full working example. 
>>> 
>>> Also, your code has several OS X-isms in it so I guess that's the platform you're on. But in case you're on Windows, note that that platform requires some extra care when using multiprocessing:
>>> http://docs.python.org/library/multiprocessing.html#windows
>>> 
>>> 
>>> Good luck
>>> Philip
>>> 
>>> 
>>>> I wrote a function that can take a pattern of actions and it apply it to the filesystem.
>>>> It takes a list of starting paths, and a pattern like this:
>>>> 
>>>> pattern = {
>>>>         InGlob('Test/**'):{
>>>> 		MatchRemove('DS_Store'):[],
>>>>             NoMatchAdd('(alhpaID_)|(DS_Store)','warnings'):[],
>>>>             MatchAdd('alphaID_','alpha_found'):[],
>>>> 		InDir('alphaID_'):{
>>>>                 NoMatchAdd('(betaID_)|(DS_Store)','warnings'):[],
>>>>                 InDir('betaID_'):{
>>>>                     NoMatchAdd('(gammaID_)|(DS_Store)','warnings'):[],
>>>>                     MatchAdd('gammaID_','gamma_found'):[] }}}}
>>>> 
>>>> so if you run evalFSPattern(['Volumes/**'],pattern) it'll return a dictionary where:
>>>> 
>>>> dict['gamma_found'] = [list of paths that matched] (i.e. '/Volumes/HD1/Test/alphaID_3382/betaID_38824/gammaID_848384')
>>>> dict['warning'] = [list of paths that failed to match] (ie. '/Volumes/HD1/Test/alphaID_3382/gammaID_47383') 
>>>> 
>>>> Since some of these volumes are on network shares I also wanted to parallelize this so that it would not block on IO.  I started the parallelization by using multiprocessing.Pool and got it to work if I ran the fsparser from the interpreter.  It ran in *much* less time and produced correct output that matched the non-parallelized version.  The problem begins if I then try to use the parallelized function from within the code.
>>>> 
>>>> For example I wrote a class whose instances are created around valid FS paths, that are cached to reduce expensive FS lookups.
>>>> 
>>>> class Experiment(object):
>>>> 	
>>>> 	SlidePaths = None
>>>> 
>>>> 	@classmethod
>>>>     def getSlidePaths(cls):
>>>>           if cls.SlidePaths == None:
>>>>                cls.SlidePaths = fsparser(['/Volumes/**'],pattern)
>>>> 	      return cls.SlidePaths
>>>> 	
>>>> 	@classmethod
>>>> 	def lookupPathWithGammaID(cls,id):
>>>>             paths = cls.getSlidePaths()
>>>>             ...
>>>>             return paths[selected]
>>>> 
>>>> 	@classmethod
>>>>     def fromGamaID(cls,id):
>>>>     	path = cls.lookupPathWithGammaID(id)
>>>>             return cls(path)
>>>> 	
>>>> 	def __init__(self,path)
>>>> 		self.Path = path
>>>> 		...
>>>> 	
>>>> 	...	
>>>> 
>>>> If I do the following from the interpreter it works:
>>>> 
>>>>>>> from experiment import Experiment
>>>>>>> expt = Experiment.fromGammaID(10102)
>>>> 
>>>> but if I write a script called test.py:
>>>> 
>>>> from experiment import Experiment
>>>> expt1 = Experiment.fromGammaID(10102)
>>>> expt2 = Experiment.fromGammaID(10103)
>>>> comparison = expt1.compareTo(expt2)
>>>> 
>>>> it fails, if I try to import it or run it from bash prompt:
>>>> 
>>>>>>> from test import comparison (hangs forever)
>>>> $ python test.py (hangs forever)
>>>> 
>>>> I would really like some help trying to figure this out...  I thought it should work easily since all the spawned processes don't share data or state (their results are merged in the main thread).  The classes used in the pattern are also simple python objects (use python primitives).
>>>> 
>>>> 
>>>> These are the main functions:
>>>> 
>>>> def mapAction(pool,paths,action):
>>>> merge = {'next':[]}
>>>> for result in pool.map(action,paths):
>>>>     if result == None:
>>>>         continue
>>>>     merge = mergeDicts(merge,result)
>>>> return merge
>>>> 
>>>> 
>>>> def mergeDicts(d1,d2):
>>>> for key in d2:
>>>>     if key not in d1:
>>>>         d1[key] = d2[key]
>>>>     else:
>>>>         d1[key] += d2[key]
>>>> return d1
>>>> 
>>>> 
>>>> def evalFSPattern(paths,pattern):
>>>> pool = Pool(10)
>>>> results = {}
>>>> for action in pattern:
>>>>     tomerge1 = mapAction(pool,paths,action)
>>>>     tomerge2 = evalFSPattern(tomerge1['next'],pattern[action])
>>>>     del tomerge1['next']
>>>>     results = mergeDicts(results,tomerge1)
>>>>     results = mergeDicts(results,tomerge2)
>>>> return results
>>>> 
>>>> the classes used in the pattern (InGlob,NoMatchAdd,etc.) are callable classes that take a single parameter (a path) and return a dict result or None which makes them trivial to adapt to Pool.map.
>>>> 
>>>> Note if I change the mapAction function to:
>>>> 
>>>> def mapAction(pool,paths,action):
>>>> merge = {'next':[]}
>>>> for path in paths:
>>>>      result = action(path)
>>>>      if result == None:
>>>>         continue
>>>>     merge = mergeDicts(merge,result)
>>>> return merge
>>>> 
>>>> everything works just fine.
>>>> 
>>>> 
>>>> Thanks.
>>>> 
>>>> 
>>>> -- 
>>>> http://mail.python.org/mailman/listinfo/python-list
>>> 
>>> -- 
>>> http://mail.python.org/mailman/listinfo/python-list
>> 
> 
> -- 
> http://mail.python.org/mailman/listinfo/python-list




More information about the Python-list mailing list