[Python-ideas] Suggestion for a new thread API for Python (OpenMP inspired)

Sturla Molden sturla at molden.no
Thu Jan 22 16:03:06 CET 2009


On 1/22/2009 3:17 PM, Jesse Noller wrote:

> Just some thoughts - right now I've sworn off writing new features
> until I zero out the multiprocessing bug queue, so these might be done
> in 2025.

I was actually thinking about writing it myself. :)

I think the OpenMP style of abstracting concurrency fits better with the 
mind, and therefore is more 'pythonic' than the Java inspired threading 
module. Not just for the sake of speed on SMPs, but also for other 
concurrent tasks for which threads (or processes) can be used.

A skeleton of such a context manager would look roughly like this.

Anyway I am stuck with what to exec in the static method 
Pool._threadproc, in order to execute the correct code object. I somehow 
have to exec (or eval) the code object in the frame from f_lineno or 
f_lasti.

I have noticed the Queue bug. Very annoying indeed.


import thread
import sys
import os
from math import ceil, log


class Pool(object):

     def __init__(self, nthreads=None, masterpool=None, tid=0):
         self.nthreads = nthreads
         self.masterpool = masterpool
         self.tid = tid
         if nthreads is None:
             self.nthreads = self._num_processors()


     def __enter__(self):
         if self.masterpool is None:
             frame = sys._getframe().f_back
             code = frame.f_code
             lasti = frame.f_lasti
             localdict = frame.f_locals
             globaldict = frame.f_globals

             # find name of self in frame
             for name in localdict:
                 if localdict[name] is self:
                     selfname = name
                     break

             # spawn threads
             for tid in range(1,self.nthreads):
                 pool = Pool(nthreads=self.nthreads,
			masterpool=self, tid=tid)
                 _localdict = localdict.copy()
                 _globaldict = globaldict.copy()
                 _localdict[selfname] = pool
                 thread.start_new_thread(self._threadproc,
                         (code,lasti,_globaldict,_localdict))


     def __exit__(self, exc_type, exc_val, exc_tb):
         self.barrier()
         if self.masterpool is not None:
             thread.exit()
         # ++ more cleanup


     @staticmethod
     def _threadproc(code, lasti, globals, locals):
         # we must somehow execute the code object from
         # the last instruction (lasti)
         # Something like 'exec code in globals, locals'


     @staticmethod
     def _num_processors():
         if os.name == 'nt':
             return int(os.getenv('NUMBER_OF_PROCESSORS'))
         elif sys.platform == 'linux2':
             # Linux ... I think this works
             # each cpu is separated by a blank line, and there
             # is a blank line at the end
             retv = 0
             with open('/proc/cpuinfo','rt') as cpuinfo:
                 for line in cpuinfo:
                     if not len(line): retv += 1
             return retv
         else:
             raise RuntimeError, 'unknown platform'


     def barrier(self):
         ''' dissemination barrier '''
         nt = self.nthreads
         if nt == 1: return
         tid = self.tid
         log2 = lambda x :
         for k in range(int(ceil(log(nt)/log(2)))):
             # send event to thread (tid + 2**k) % nt
             # wait for event from thread (tid - 2**k) % nt


Regards,
S.M.




More information about the Python-ideas mailing list