Suggestion for a new thread API for Python (OpenMP inspired)

I have lately been using OpenMP to write parallel C and Fortran code. I must admit I am impressed. OpenMP is a much better abstraction for writing concurrent code than using Win32/posix threads directly (or similarly threading.Thread i Java or Python). What matters most is that code can be written as sequential, tested, and then parallelised using compiler pragmas. This is much easier than writing code intended to be parallel from the start. Not only is the abstraction more easy to apply, it also leads to fewer problems with deadlocks, race conditions, livelocks, etc. I was thinking something similar could be created for Python, e.g. on top of the existing thread or threading modules, and possibly multiprocessing. I believe a context manager could be used for this purpose. What I have in mind is an API that would look approximately like this (OpenMP pragmas for C on top, proposed Python equivalent below): #pragma omp parallel with pymp.Pool() as pool: #pragma omp for for item in pool.parallel(<iterable>): #pragma omp for shedule(guided) for item in pool.parallel(<iterable>, shed='guided'): #pragma omp parallel for with pymp.Pool() as pool: for item in pool.parallel(<iterable>): #pragma omp barrier pool.barrier() #pragma omp section pool.section(fun, *args, **kwargs) #pragma omp parallel sections with pymp.Pool() as pool: pool.section(fun1, *args, **kwargs) pool.section(fun2, *args, **kwargs) #pragma omp master if pool.master: #pragma omp critical #pragma omp atomic with pool.lock: #pragma omp single with pool.single(): #pragma omp ordered with pool.ordered(): This is all trivial to program, except for the context manager on top. It has somehow to get access to the code block below, spawn multiple threads, and execute that block in each of the threads. I am not sure how to grab the next executable block as a Python object (so I could pass it to eval or exec), so a little help would be appreciated :) Regards, Sturla Molden

On Thu, Jan 22, 2009 at 5:53 AM, Sturla Molden <sturla@molden.no> wrote:
Hi Sturla, Interesting that you bring this up - while I'm not in the know about openMP - I have been sketching out some improvements to threading and multiprocessing that follow some of this thinking. The first batch would be adding context managers where appropriate to the multiprocessing module (e.g. pool and managers/etc). The second is adding a library of decorators where you could say: @multiprocessing.Pool(5, 'apply') def myfunc() .... Or something along those lines. In the case of getting the function object back, decorators make more sense for some of the use cases. Now, for threading - I personally feel that there's some improvements to be made by adding a series of abstractions and utilities on top of threading ala multiprocessing.pool and the like. See java.util.concurrent for some good examples of threading abstractions. The same decorators and contextmanagers could be used for any threading abstractions added. Just some thoughts - right now I've sworn off writing new features until I zero out the multiprocessing bug queue, so these might be done in 2025. -jesse

On 1/22/2009 3:17 PM, Jesse Noller wrote:
I was actually thinking about writing it myself. :) I think the OpenMP style of abstracting concurrency fits better with the mind, and therefore is more 'pythonic' than the Java inspired threading module. Not just for the sake of speed on SMPs, but also for other concurrent tasks for which threads (or processes) can be used. A skeleton of such a context manager would look roughly like this. Anyway I am stuck with what to exec in the static method Pool._threadproc, in order to execute the correct code object. I somehow have to exec (or eval) the code object in the frame from f_lineno or f_lasti. I have noticed the Queue bug. Very annoying indeed. import thread import sys import os from math import ceil, log class Pool(object): def __init__(self, nthreads=None, masterpool=None, tid=0): self.nthreads = nthreads self.masterpool = masterpool self.tid = tid if nthreads is None: self.nthreads = self._num_processors() def __enter__(self): if self.masterpool is None: frame = sys._getframe().f_back code = frame.f_code lasti = frame.f_lasti localdict = frame.f_locals globaldict = frame.f_globals # find name of self in frame for name in localdict: if localdict[name] is self: selfname = name break # spawn threads for tid in range(1,self.nthreads): pool = Pool(nthreads=self.nthreads, masterpool=self, tid=tid) _localdict = localdict.copy() _globaldict = globaldict.copy() _localdict[selfname] = pool thread.start_new_thread(self._threadproc, (code,lasti,_globaldict,_localdict)) def __exit__(self, exc_type, exc_val, exc_tb): self.barrier() if self.masterpool is not None: thread.exit() # ++ more cleanup @staticmethod def _threadproc(code, lasti, globals, locals): # we must somehow execute the code object from # the last instruction (lasti) # Something like 'exec code in globals, locals' @staticmethod def _num_processors(): if os.name == 'nt': return int(os.getenv('NUMBER_OF_PROCESSORS')) elif sys.platform == 'linux2': # Linux ... I think this works # each cpu is separated by a blank line, and there # is a blank line at the end retv = 0 with open('/proc/cpuinfo','rt') as cpuinfo: for line in cpuinfo: if not len(line): retv += 1 return retv else: raise RuntimeError, 'unknown platform' def barrier(self): ''' dissemination barrier ''' nt = self.nthreads if nt == 1: return tid = self.tid log2 = lambda x : for k in range(int(ceil(log(nt)/log(2)))): # send event to thread (tid + 2**k) % nt # wait for event from thread (tid - 2**k) % nt Regards, S.M.

On 1/22/2009 3:17 PM, Jesse Noller wrote:
Here is a toy example of what I have in mind. Say you would want to compute the DFT of some signal (real apps would use an FFT in C for this, but never mind). In Python using an O(n**2) algorithm, this would look like somthing like this: def real_dft(x): ''' DFT for a real valued sequence x ''' r = [] N = len(x) M = N//2 + 1 if N%2 else N//2 for n in range(M): s = 0j for k in range(N): tmp = 2*pi*k*n/N s += x[k] * (cos(tmp) - 1j*sin(tmp)) r.append(s) return r Then, one could 'magically' transform this algorithm into to a parallel one simply by inserting directives from the 'pymp' module: def real_dft(x): ''' DFT for a real valued sequence x ''' ''' parallelized ''' r = [] N = len(x) M = N//2 + 1 if N%2 else N//2 with Pool() as pool: for n in pool.parallel(range(M)): s = 0j for k in range(N): tmp = 2*pi*k*n/N s += x[k] * (cos(tmp) - 1j*sin(tmp)) with pool.ordered(): r.append(s) return r The idea is that 'parallelizing' a sequential algorithm like this is much easier than writing a parallel one from scratch using the abstractions in threading or multiprocessing. Sturla Molden

On Thu, Jan 22, 2009 at 10:26 AM, Sturla Molden <sturla@molden.no> wrote:
Interesting - this is a slightly more extreme series of changes then I was thinking, a good way to approach this would not to be a patch against python-core (unless you find bugs) but rather as a separate package hosted outside of core and posted to pypi. I think it has merit - but would require more use/eyeballs on it than just a few of us. -jesse

On Thu, Jan 22, 2009 at 5:53 AM, Sturla Molden <sturla@molden.no> wrote:
Hi Sturla, Interesting that you bring this up - while I'm not in the know about openMP - I have been sketching out some improvements to threading and multiprocessing that follow some of this thinking. The first batch would be adding context managers where appropriate to the multiprocessing module (e.g. pool and managers/etc). The second is adding a library of decorators where you could say: @multiprocessing.Pool(5, 'apply') def myfunc() .... Or something along those lines. In the case of getting the function object back, decorators make more sense for some of the use cases. Now, for threading - I personally feel that there's some improvements to be made by adding a series of abstractions and utilities on top of threading ala multiprocessing.pool and the like. See java.util.concurrent for some good examples of threading abstractions. The same decorators and contextmanagers could be used for any threading abstractions added. Just some thoughts - right now I've sworn off writing new features until I zero out the multiprocessing bug queue, so these might be done in 2025. -jesse

On 1/22/2009 3:17 PM, Jesse Noller wrote:
I was actually thinking about writing it myself. :) I think the OpenMP style of abstracting concurrency fits better with the mind, and therefore is more 'pythonic' than the Java inspired threading module. Not just for the sake of speed on SMPs, but also for other concurrent tasks for which threads (or processes) can be used. A skeleton of such a context manager would look roughly like this. Anyway I am stuck with what to exec in the static method Pool._threadproc, in order to execute the correct code object. I somehow have to exec (or eval) the code object in the frame from f_lineno or f_lasti. I have noticed the Queue bug. Very annoying indeed. import thread import sys import os from math import ceil, log class Pool(object): def __init__(self, nthreads=None, masterpool=None, tid=0): self.nthreads = nthreads self.masterpool = masterpool self.tid = tid if nthreads is None: self.nthreads = self._num_processors() def __enter__(self): if self.masterpool is None: frame = sys._getframe().f_back code = frame.f_code lasti = frame.f_lasti localdict = frame.f_locals globaldict = frame.f_globals # find name of self in frame for name in localdict: if localdict[name] is self: selfname = name break # spawn threads for tid in range(1,self.nthreads): pool = Pool(nthreads=self.nthreads, masterpool=self, tid=tid) _localdict = localdict.copy() _globaldict = globaldict.copy() _localdict[selfname] = pool thread.start_new_thread(self._threadproc, (code,lasti,_globaldict,_localdict)) def __exit__(self, exc_type, exc_val, exc_tb): self.barrier() if self.masterpool is not None: thread.exit() # ++ more cleanup @staticmethod def _threadproc(code, lasti, globals, locals): # we must somehow execute the code object from # the last instruction (lasti) # Something like 'exec code in globals, locals' @staticmethod def _num_processors(): if os.name == 'nt': return int(os.getenv('NUMBER_OF_PROCESSORS')) elif sys.platform == 'linux2': # Linux ... I think this works # each cpu is separated by a blank line, and there # is a blank line at the end retv = 0 with open('/proc/cpuinfo','rt') as cpuinfo: for line in cpuinfo: if not len(line): retv += 1 return retv else: raise RuntimeError, 'unknown platform' def barrier(self): ''' dissemination barrier ''' nt = self.nthreads if nt == 1: return tid = self.tid log2 = lambda x : for k in range(int(ceil(log(nt)/log(2)))): # send event to thread (tid + 2**k) % nt # wait for event from thread (tid - 2**k) % nt Regards, S.M.

On 1/22/2009 3:17 PM, Jesse Noller wrote:
Here is a toy example of what I have in mind. Say you would want to compute the DFT of some signal (real apps would use an FFT in C for this, but never mind). In Python using an O(n**2) algorithm, this would look like somthing like this: def real_dft(x): ''' DFT for a real valued sequence x ''' r = [] N = len(x) M = N//2 + 1 if N%2 else N//2 for n in range(M): s = 0j for k in range(N): tmp = 2*pi*k*n/N s += x[k] * (cos(tmp) - 1j*sin(tmp)) r.append(s) return r Then, one could 'magically' transform this algorithm into to a parallel one simply by inserting directives from the 'pymp' module: def real_dft(x): ''' DFT for a real valued sequence x ''' ''' parallelized ''' r = [] N = len(x) M = N//2 + 1 if N%2 else N//2 with Pool() as pool: for n in pool.parallel(range(M)): s = 0j for k in range(N): tmp = 2*pi*k*n/N s += x[k] * (cos(tmp) - 1j*sin(tmp)) with pool.ordered(): r.append(s) return r The idea is that 'parallelizing' a sequential algorithm like this is much easier than writing a parallel one from scratch using the abstractions in threading or multiprocessing. Sturla Molden

On Thu, Jan 22, 2009 at 10:26 AM, Sturla Molden <sturla@molden.no> wrote:
Interesting - this is a slightly more extreme series of changes then I was thinking, a good way to approach this would not to be a patch against python-core (unless you find bugs) but rather as a separate package hosted outside of core and posted to pypi. I think it has merit - but would require more use/eyeballs on it than just a few of us. -jesse
participants (2)
-
Jesse Noller
-
Sturla Molden