On 1/22/2009 3:17 PM, Jesse Noller wrote:
Just some thoughts - right now I've sworn off writing new features until I zero out the multiprocessing bug queue, so these might be done in 2025.
I was actually thinking about writing it myself. :)
I think the OpenMP style of abstracting concurrency fits better with the mind, and therefore is more 'pythonic' than the Java inspired threading module. Not just for the sake of speed on SMPs, but also for other concurrent tasks for which threads (or processes) can be used.
A skeleton of such a context manager would look roughly like this.
Anyway I am stuck with what to exec in the static method Pool._threadproc, in order to execute the correct code object. I somehow have to exec (or eval) the code object in the frame from f_lineno or f_lasti.
I have noticed the Queue bug. Very annoying indeed.
import thread import sys import os from math import ceil, log
def __init__(self, nthreads=None, masterpool=None, tid=0): self.nthreads = nthreads self.masterpool = masterpool self.tid = tid if nthreads is None: self.nthreads = self._num_processors()
def __enter__(self): if self.masterpool is None: frame = sys._getframe().f_back code = frame.f_code lasti = frame.f_lasti localdict = frame.f_locals globaldict = frame.f_globals
# find name of self in frame for name in localdict: if localdict[name] is self: selfname = name break
# spawn threads for tid in range(1,self.nthreads): pool = Pool(nthreads=self.nthreads, masterpool=self, tid=tid) _localdict = localdict.copy() _globaldict = globaldict.copy() _localdict[selfname] = pool thread.start_new_thread(self._threadproc, (code,lasti,_globaldict,_localdict))
def __exit__(self, exc_type, exc_val, exc_tb): self.barrier() if self.masterpool is not None: thread.exit() # ++ more cleanup
@staticmethod def _threadproc(code, lasti, globals, locals): # we must somehow execute the code object from # the last instruction (lasti) # Something like 'exec code in globals, locals'
@staticmethod def _num_processors(): if os.name == 'nt': return int(os.getenv('NUMBER_OF_PROCESSORS')) elif sys.platform == 'linux2': # Linux ... I think this works # each cpu is separated by a blank line, and there # is a blank line at the end retv = 0 with open('/proc/cpuinfo','rt') as cpuinfo: for line in cpuinfo: if not len(line): retv += 1 return retv else: raise RuntimeError, 'unknown platform'
def barrier(self): ''' dissemination barrier ''' nt = self.nthreads if nt == 1: return tid = self.tid log2 = lambda x : for k in range(int(ceil(log(nt)/log(2)))): # send event to thread (tid + 2**k) % nt # wait for event from thread (tid - 2**k) % nt