how to use priority queue with multiprocessing

John Nagle nagle at animats.com
Fri Jan 14 13:57:59 EST 2011


On 1/13/2011 9:07 AM, Marco Hornung wrote:
> Hey,
>
> ------------------------------------------------------------------------------------------
>
>
question
> ------------------------------------------------------------------------------------------
>
>
How can I use a priority queue to schedule jobs within the 
"multiprocessing pool" module?
>
> ------------------------------------------------------------------------------------------
>
>
my scenario
> ------------------------------------------------------------------------------------------
>
>
I want to run several jobs on a server. The jobs are being sent by 
users. However, all jobs have a different priority, and high-priority 
jobs should be processed before any low-priority job gets touched.
> Currently I just append all incoming jobs to the multiprocessing
> worker pool as follows: ### initialize worker pool pool			=
> PriorityPool(processes=worker_count) process_handles = []
>
> ### distribute function execution over several processes for
> job_parameter in job_parameter_list: handle =
> pool.apply_async(process_function, [job_parameter,])
> process_handles.append(handle)
>
> This will only put the jobs in some kind of a list - and execute the
> jobs in the order they come in. Is it possible to use a priority
> queue for the process-pool?
>

    You''ll probably have to track the available processes yourself,
starting a new job when there's a process available.

    One way to do this is to have a management thread for each
process.  Each management thread starts a subprocess, gets
a work item from the priority queue (blocking if necessary),
gives it to the subprocess, waits for the subprocess to
return a result, and goes back to get another work item.

    This is straightforward, except for working out a way
to cleanly shut the thing down.  One way to do that is
to have a "shutdown" flag visible to all the threads.
That's checked before getting a new task.  If it's set,
the thread terminates its subprocess and returns.
Set the terminate flag in a signal handler for control-C.

    (I have something that manages multiple processes
using a priority queue, where the queue is implemented
using MySQL.  This allows me to put a whole cluster to
work.)

				John Nagle




More information about the Python-list mailing list