multiprocessing.Pool, its queue, and pre-emption
John Ladasky
ladasky at my-deja.com
Sat Sep 17 04:34:18 EDT 2011
Hey, this pretty easy hack appears to work!
[code]
from multiprocessing.pool import Pool, RUN, MapResult, mapstar
class PriorityPool(Pool):
def map_async_nowait(self, func, iterable, chunksize=None, \
callback=None):
"""
Same as map_async(), except uses put_nowait() and
thus posts tasks to the head of the task queue
rather than its tail.
"""
assert self._state == RUN
if not hasattr(iterable, '__len__'):
iterable = list(iterable)
if chunksize is None:
chunksize, extra = divmod(len(iterable), len(self._pool) *
4)
if extra:
chunksize += 1
task_batches = Pool._get_tasks(func, iterable, chunksize)
result = MapResult(self._cache, chunksize, len(iterable), \
callback)
self._taskqueue.put_nowait((((result._job, i, mapstar, (x,),
{}) \
for i, x in enumerate(task_batches)), None))
return result
def size(self):
"""
This is not an essential function, but I use it in the
demo to ensure that I initially create enough tasks to
occupy every Process.
"""
return len(self._pool)
##================================================================##
if __name__ == "__main__":
from time import sleep
def demo_task(args):
num, time = args
sleep(time)
print num, time
pool = PriorityPool()
size = pool.size()
print "\nConstructed a pool which contains", size, "Processes."
print "Queueing", 2*size, "normal-priority tasks."
normal = enumerate([3.0 + t for t in range(2*size)])
pool.map_async(demo_task, normal, chunksize = 1)
print "Queueing", size, "high-priority tasks."
high = [(2*size + t, 0.2 + 0.1*t) for t in range(size)]
pool.map_async_nowait(demo_task, high, chunksize = 1)
sleep(30) # Give all tasks on the queue time to complete.
print "Complete."
[/code]
Below is a typical output from my six-core CPU system. The output
differs slightly from run to run -- that's multiprocessing for you,
it's asynchronous.
The tasks are given numbers which correspond to the order that they
are added to the queue. The high-priority tasks are added last and
are thus numbered 12-17 (I place asterisks next to these in the
output, below). Each task prints its number and its time when it
completes. I expect the normal-priority tasks 0-5 to finish before
any high-priority tasks, and they always do. Tasks 6 and 7 are then
interleaved among the high-priority tasks -- not quite what I expect,
but that may have something to do with my rather arbitrary choices of
sleep times. But tasks 8-11 always get pushed to the back, and
complete last.
[output]
Constructed a pool which contains 6 Processes.
Queueing 12 normal-priority tasks.
Queueing 6 high-priority tasks.
0 3.0
1 4.0
2 5.0
3 6.0
4 7.0
5 8.0
6 9.0
12 0.2 *
13 0.3 *
14 0.4 *
15 0.5 *
7 10.0
16 0.6 *
17 0.7 *
8 11.0
9 12.0
10 13.0
11 14.0
[/output]
Please feel free to use this, though I would appreciate an
acknowledgment in your code if you do. :^)
More information about the Python-list
mailing list