how to use priority queue with multiprocessing

Adam Tauno Williams awilliam at whitemice.org
Fri Jan 14 15:16:48 EST 2011


On Fri, 2011-01-14 at 10:57 -0800, John Nagle wrote: 
> On 1/13/2011 9:07 AM, Marco Hornung wrote:
> I want to run several jobs on a server. The jobs are being sent by 
> users. However, all jobs have a different priority, and high-priority 
> jobs should be processed before any low-priority job gets touched.
> > Currently I just append all incoming jobs to the multiprocessing
> > worker pool as follows: ### initialize worker pool pool			=
> > PriorityPool(processes=worker_count) process_handles = []
> > ### distribute function execution over several processes for
> > job_parameter in job_parameter_list: handle =
> > pool.apply_async(process_function, [job_parameter,])
> > process_handles.append(handle)
> > This will only put the jobs in some kind of a list - and execute the
> > jobs in the order they come in. Is it possible to use a priority
> > queue for the process-pool?
> You''ll probably have to track the available processes yourself,
> starting a new job when there's a process available.

Which is exactly what we do in OpenGroupwre Coils' OIE.

There is a process [job] list which is sorted by priority and the next
available process is started when a worker is available.  We use
multiprocessing to create a *process*, rather than a thread, for each
job.

> One way to do this is to have a management thread for each
> process.  Each management thread starts a subprocess, gets
> a work item from the priority queue (blocking if necessary),
> gives it to the subprocess, waits for the subprocess to
> return a result, and goes back to get another work item.

We have a manager process and an executor process.  These communicate
via AMQ, but you could use any mechanism.  The manager process controls
the process [job] list.  When a process needs to be started a message is
send to the executor which creates a worker process if an opening is
available.  Otherwise it messages the manager process to place the
process in a queued state.  When a worker process completes it messages
the executor which in turn messages the manager that a process slot may
be available; then the manager looks up the next available process and
messages the executor to start it - provided a worker slot is still
available the executor will start the worker.... [otherwise the process
will go back into a queued state]. 

> This is straightforward, except for working out a way
> to cleanly shut the thing down.  One way to do that is
> to have a "shutdown" flag visible to all the threads.

Using a message bus helps a lot, and with multiprocessing you just do a
join/isalive to make sure a worker is still working.




More information about the Python-list mailing list