Adding a Par construct to Python?

Steven D'Aprano steven at REMOVE.THIS.cybersource.com.au
Tue May 19 05:24:29 EDT 2009


On Mon, 18 May 2009 02:27:06 -0700, jeremy wrote:

> Let me clarify what I think par, pmap, pfilter and preduce would mean
> and how they would be implemented. 
[...]

Just for fun, I've implemented a parallel-map function, and done a couple 
of tests. Comments, criticism and improvements welcome!



import threading
import Queue
import random
import time

def f(arg):  # Simulate a slow function.
    time.sleep(0.5)
    return 3*arg-2


class PMapThread(threading.Thread):
    def __init__(self, clients):
        super(PMapThread, self).__init__()
        self._clients = clients
    def start(self):
        super(PMapThread, self).start()
    def run(self):
        while True:
            try:
                data = self._clients.get_nowait()
            except Queue.Empty:
                break
            target, where, func, arg = data
            result = func(arg)
            target[where] = result


class VerbosePMapThread(threading.Thread):
    def __init__(self, clients):
        super(VerbosePMapThread, self).__init__()
        print "Thread %s created at %s" % (self.getName(), time.ctime())
    def start(self):
        super(VerbosePMapThread, self).start()
        print "Thread %s starting at %s" % (self.getName(), time.ctime())
    def run(self):
        super(VerbosePMapThread, self).run()
        print "Thread %s finished at %s" % (self.getName(), time.ctime())


def pmap(func, seq, verbose=False, numthreads=4):
    size = len(seq)
    results = [None]*size
    if verbose:
        print "Initiating threads"
        thread = VerbosePMapThread
    else:
        thread = PMapThread
    datapool = Queue.Queue(size)
    for i in xrange(size):
        datapool.put( (results, i, f, seq[i]) )
    threads = [PMapThread(datapool) for i in xrange(numthreads)]
    if verbose:
        print "All threads created."
    for t in threads:
        t.start()
    # Block until all threads are done.
    while any([t.isAlive() for t in threads]):
        if verbose:
            time.sleep(0.25)
            print results
    return results


And here's the timing results:

>>> from timeit import Timer
>>> setup = "from __main__ import pmap, f; data = range(50)"
>>> min(Timer('map(f, data)', setup).repeat(repeat=5, number=3))
74.999755859375
>>> min(Timer('pmap(f, data)', setup).repeat(repeat=5, number=3))
20.490942001342773



-- 
Steven



More information about the Python-list mailing list