multiprocessing.Pool, its queue, and pre-emption

John Ladasky ladasky at my-deja.com
Sat Sep 17 04:34:18 EDT 2011


Hey, this pretty easy hack appears to work!

[code]

from multiprocessing.pool import Pool, RUN, MapResult, mapstar

class PriorityPool(Pool):

    def map_async_nowait(self, func, iterable, chunksize=None, \
        callback=None):
        """
        Same as map_async(), except uses put_nowait() and
        thus posts tasks to the head of the task queue
        rather than its tail.
        """
        assert self._state == RUN
        if not hasattr(iterable, '__len__'):
            iterable = list(iterable)

        if chunksize is None:
            chunksize, extra = divmod(len(iterable), len(self._pool) *
4)
            if extra:
                chunksize += 1

        task_batches = Pool._get_tasks(func, iterable, chunksize)
        result = MapResult(self._cache, chunksize, len(iterable), \
            callback)
        self._taskqueue.put_nowait((((result._job, i, mapstar, (x,),
{}) \
            for i, x in enumerate(task_batches)), None))
        return result

    def size(self):
        """
        This is not an essential function, but I use it in the
        demo to ensure that I initially create enough tasks to
        occupy every Process.
        """
        return len(self._pool)

##================================================================##

if __name__ == "__main__":

    from time import sleep

    def demo_task(args):
        num, time = args
        sleep(time)
        print num, time

    pool = PriorityPool()
    size = pool.size()
    print "\nConstructed a pool which contains", size, "Processes."
    print "Queueing", 2*size, "normal-priority tasks."
    normal = enumerate([3.0 + t for t in range(2*size)])
    pool.map_async(demo_task, normal, chunksize = 1)
    print "Queueing", size, "high-priority tasks."
    high = [(2*size + t, 0.2 + 0.1*t) for t in range(size)]
    pool.map_async_nowait(demo_task, high, chunksize = 1)
    sleep(30) # Give all tasks on the queue time to complete.
    print "Complete."

[/code]

Below is a typical output from my six-core CPU system.  The output
differs slightly from run to run -- that's multiprocessing for you,
it's asynchronous.

The tasks are given numbers which correspond to the order that they
are added to the queue.  The high-priority tasks are added last and
are thus numbered 12-17 (I place asterisks next to these in the
output, below).  Each task prints its number and its time when it
completes.  I expect the normal-priority tasks 0-5 to finish before
any high-priority tasks, and they always do.  Tasks 6 and 7 are then
interleaved among the high-priority tasks -- not quite what I expect,
but that may have something to do with my rather arbitrary choices of
sleep times.  But tasks 8-11 always get pushed to the back, and
complete last.

[output]

Constructed a pool which contains 6 Processes.
Queueing 12 normal-priority tasks.
Queueing 6 high-priority tasks.
0 3.0
1 4.0
2 5.0
3 6.0
4 7.0
5 8.0
6 9.0
12 0.2 *
13 0.3 *
14 0.4 *
15 0.5 *
7 10.0
16 0.6 *
17 0.7 *
8 11.0
9 12.0
10 13.0
11 14.0

[/output]

Please feel free to use this, though I would appreciate an
acknowledgment in your code if you do.  :^)



More information about the Python-list mailing list