multiprocessing.Pool, its queue, and pre-emption
ladasky at my-deja.com
Sat Sep 17 10:34:18 CEST 2011
Hey, this pretty easy hack appears to work!
from multiprocessing.pool import Pool, RUN, MapResult, mapstar
def map_async_nowait(self, func, iterable, chunksize=None, \
Same as map_async(), except uses put_nowait() and
thus posts tasks to the head of the task queue
rather than its tail.
assert self._state == RUN
if not hasattr(iterable, '__len__'):
iterable = list(iterable)
if chunksize is None:
chunksize, extra = divmod(len(iterable), len(self._pool) *
chunksize += 1
task_batches = Pool._get_tasks(func, iterable, chunksize)
result = MapResult(self._cache, chunksize, len(iterable), \
self._taskqueue.put_nowait((((result._job, i, mapstar, (x,),
for i, x in enumerate(task_batches)), None))
This is not an essential function, but I use it in the
demo to ensure that I initially create enough tasks to
occupy every Process.
if __name__ == "__main__":
from time import sleep
num, time = args
print num, time
pool = PriorityPool()
size = pool.size()
print "\nConstructed a pool which contains", size, "Processes."
print "Queueing", 2*size, "normal-priority tasks."
normal = enumerate([3.0 + t for t in range(2*size)])
pool.map_async(demo_task, normal, chunksize = 1)
print "Queueing", size, "high-priority tasks."
high = [(2*size + t, 0.2 + 0.1*t) for t in range(size)]
pool.map_async_nowait(demo_task, high, chunksize = 1)
sleep(30) # Give all tasks on the queue time to complete.
Below is a typical output from my six-core CPU system. The output
differs slightly from run to run -- that's multiprocessing for you,
The tasks are given numbers which correspond to the order that they
are added to the queue. The high-priority tasks are added last and
are thus numbered 12-17 (I place asterisks next to these in the
output, below). Each task prints its number and its time when it
completes. I expect the normal-priority tasks 0-5 to finish before
any high-priority tasks, and they always do. Tasks 6 and 7 are then
interleaved among the high-priority tasks -- not quite what I expect,
but that may have something to do with my rather arbitrary choices of
sleep times. But tasks 8-11 always get pushed to the back, and
Constructed a pool which contains 6 Processes.
Queueing 12 normal-priority tasks.
Queueing 6 high-priority tasks.
12 0.2 *
13 0.3 *
14 0.4 *
15 0.5 *
16 0.6 *
17 0.7 *
Please feel free to use this, though I would appreciate an
acknowledgment in your code if you do. :^)
More information about the Python-list