using multiple processors for particle filtering
I am using a particle filter to estimate the trajectory of a camera based on a sequence of images taken by the camera. The code is slow, but I have 8 processors in my desktop machine. I'd like to use them to get results 8 times faster. I've been looking at the following sections of http://docs.python.org/library: "16.6. multiprocessing" and "16.2. threading". I've also read some discussion from 2006 on scipy-user@scipy.org about seeds for random numbers in threads. I don't have any experience with multiprocessing and would appreciate advice. Here is a bit of code that I want to modify: for i in xrange(len(self.particles)): self.particles[i] = self.particles[i].random_fork() Each particle is a class instance that represents a possible camera state (position, orientation, and velocities). particle.random_fork() is a method that moves the position and orientation based on current velocities and then uses numpy.random.standard_normal((N,)) to perturb the velocities. I handle the correlation structure of the noise by matrices that are members of particle, and I do some of the calculations in c++. I would like to do something like: for i in xrange(len(self.particles)): nv = numpy.random.standard_normal((N,)) launch_on_any_available_processor( self.particles[i] = self.particles[i].random_fork(nv) ) wait_for_completions() But I don't see a command like "launch_on_any_available_processor". I would be grateful for any advice. -- Andy Fraser ISR-2 (MS:B244) afraser@lanl.gov Los Alamos National Laboratory 505 665 9448 Los Alamos, NM 87545
I would like to do something like:
for i in xrange(len(self.particles)): nv = numpy.random.standard_normal((N,)) launch_on_any_available_processor( self.particles[i] = self.particles[i].random_fork(nv) ) wait_for_completions()
But I don't see a command like "launch_on_any_available_processor". I would be grateful for any advice.
Look more in depth at the multiprocessing library -- it's likely to be what you want... more or less. However, it might be a bit "less" than "more" because, from above, what it looks like you want to do is to launch many (millions?) of very lightweight tasks ("random_fork") on different processors. If you naively start up a fresh python process for each "random_fork" task, the startup costs will dominate (hugely so). So you'll likely need to re-jigger the task you want to dispatch to each processor so that the chunks are larger: processes = [] for i in range(num_processors): processes.append(start_worker_process(num_particles=total_particles// num_processors)) wait_for_processes_to_end() self.particles = numpy.concatenate([process.particles]) Though even this might be a too-granular a task, if you have numerous time-steps for which you need to generate particles. (E.g. if the above would be within another large loop.) Then you'd need to write the worker processes as sort of "particle servers": processes = [] for i in range(num_processors): processes.append(start_worker()) for task in huge_task_list: sub_tasks = divide_tasks(num_processors) for process, sub_task in zip(processes, sub_tasks): process.start(sub_task) wait_for_processes_to_complete_task() self.results = assemble_results(processes) This is of course pretty naive still, but it'll be a start, and the architectures I've (roughly) outlined here fit better with the multiprocessing paradigm. You'll need to do some internet reading and looking at various examples first, to get the details of how to actually implement this stuff. I don't know anything off the top of my head, but perhaps others can chime in? Zach
On Tue, May 25, 2010 at 5:39 PM, Andy Fraser <afraser@lanl.gov> wrote:
I am using a particle filter to estimate the trajectory of a camera based on a sequence of images taken by the camera. The code is slow, but I have 8 processors in my desktop machine. I'd like to use them to get results 8 times faster. I've been looking at the following sections of http://docs.python.org/library: "16.6. multiprocessing" and "16.2. threading". I've also read some discussion from 2006 on scipy-user@scipy.org about seeds for random numbers in threads. I don't have any experience with multiprocessing and would appreciate advice.
Here is a bit of code that I want to modify:
for i in xrange(len(self.particles)): self.particles[i] = self.particles[i].random_fork()
If the updates are independent and don't have to be done sequentially you can use the multiprocessing.Pool interface which I've found very convenient for this sort of thing. Ideally if particles[i] is a class instance then random_fork could modify itself in place instad of returning a modified copy of the instance... then you could do something like def update_particle(self, i): nv = numpy.random.standard_normal((N,)) self.particles[i].random_fork(nv) p = multiprocessing.Pool(8) p.map(self.update_particle, range(len(self.particles))) this will distribute each update_particle call to a different process using all cores (providing the processing is independent). I'm not sure if random is multiprocessor safe for use like this so that would need checking but I hope this helps a bit... cheers Robin
Each particle is a class instance that represents a possible camera state (position, orientation, and velocities). particle.random_fork() is a method that moves the position and orientation based on current velocities and then uses numpy.random.standard_normal((N,)) to perturb the velocities. I handle the correlation structure of the noise by matrices that are members of particle, and I do some of the calculations in c++.
I would like to do something like:
for i in xrange(len(self.particles)): nv = numpy.random.standard_normal((N,)) launch_on_any_available_processor( self.particles[i] = self.particles[i].random_fork(nv) ) wait_for_completions()
But I don't see a command like "launch_on_any_available_processor". I would be grateful for any advice.
-- Andy Fraser ISR-2 (MS:B244) afraser@lanl.gov Los Alamos National Laboratory 505 665 9448 Los Alamos, NM 87545 _______________________________________________ SciPy-User mailing list SciPy-User@scipy.org http://mail.scipy.org/mailman/listinfo/scipy-user
On Tue, May 25, 2010 at 10:16 PM, Robin <robince@gmail.com> wrote:
If the updates are independent and don't have to be done sequentially you can use the multiprocessing.Pool interface which I've found very convenient for this sort of thing.
Ideally if particles[i] is a class instance then random_fork could modify itself in place instad of returning a modified copy of the instance... then you could do something like
def update_particle(self, i): nv = numpy.random.standard_normal((N,)) self.particles[i].random_fork(nv)
p = multiprocessing.Pool(8) p.map(self.update_particle, range(len(self.particles)))
Sorry - just thought it probably doesn't make sense to use map in this case since your processing function isn't returning anything... you can check Pool.apply_async (which returns control and lets stuff continue in the background) and Pool.apply_sync (which is probably what you want). Cheers Robin
this will distribute each update_particle call to a different process using all cores (providing the processing is independent).
I'm not sure if random is multiprocessor safe for use like this so that would need checking but I hope this helps a bit...
On Tue, May 25, 2010 at 10:19 PM, Robin <robince@gmail.com> wrote:
Sorry - just thought it probably doesn't make sense to use map in this case since your processing function isn't returning anything... you can check Pool.apply_async (which returns control and lets stuff continue in the background) and Pool.apply_sync (which is probably what you want).
I'm being a bit silly I think - this won't work properly of course because the particle instances will be changed in the subprocesses and not propagated back.. but hopefully the pointer to using Pool if useful. If you can split the task into an independent function then it's really handy... Maybe something like self.updated_particles = p.map(update_particle, self.particles) where update_particle takes a single particle instance, does the random number generation and returns the updated particle. Cheers Robin
Thanks for the replies and pointers. I got multiprocessing.Pool to work, but it eats up memory and time. I append two implementation segments below. The multiprocessing version is about 33 times _slower_ than the single processor version. Unless I use a small number of processors, memory fills up and I kill the job to make the computer usable again. The following segments of code are inside a loop that steps over 115 lines of pixels. def func(job): return job[0].random_fork(job[1]) . . . . . . #Multiprocessing version: noise = numpy.random.standard_normal((N_particles,noise_df)) jobs = zip(self.particles,noise) self.particles = self.pool.map(func, jobs, self.chunk_size) return (m,v) . . . . . . #Single processing version noise = numpy.random.standard_normal((N_particles,noise_df)) jobs = zip(self.particles,noise) self.particles = map(func, jobs) return (m,v) -- Andy Fraser ISR-2 (MS:B244) afraser@lanl.gov Los Alamos National Laboratory 505 665 9448 Los Alamos, NM 87545
Thanks for the replies and pointers. I got multiprocessing.Pool to work, but it eats up memory and time. I append two implementation segments below. The multiprocessing version is about 33 times _slower_ than the single processor version. Unless I use a small number of processors, memory fills up and I kill the job to make the computer usable again. The following segments of code are inside a loop that steps over 115 lines of pixels.
Several problems here: (1) I am sorry I didn't mention this earlier, but looking over your original email, it appears that your single-process code might be very inefficient: it seems to perturb each particle individually in a for- loop rather than working on an array of all the particles. Perhaps you should try to fix that before adding multiprocessing? Basically, you should hopefully be able to write random_fork to work on a number of particles at once using numpy broadcasting, etc. This way, the for- loop that steps through the elements is implemented in compiled C, rather than interpreted python. Check out various numpy tutorials for details, but here's the general gist: points = numpy.arange(6000).reshape((3000,2)) # 3000 x,y points perturbations = numpy.random.normal(size=(3000,2)) def perturb_bad(points, perturbations): for point, perturbation in zip(points, perturbations): point += perturbation def perturb_good(points, perturbations): points += perturbations timeit perturb_bad(points, perturbations) # 10 loops, best of 3: 18.7 milliseconds per loop timeit perturb_good(points, perturbations) # 10000 loops, best of 3: 161 microseconds per loop Compare this orders-of-magnitude gain to the at-best-8-fold gain you'd get from multiprocessing the bad code. Also note that "map" is basically just an interpreted for-loop under the hood: import operator timeit map(operator.add, points, perturbations) # 10 loops, best of 3: 18.7 milliseconds per loop The moral here is to avoid looping constructs in python when working with sets of numbers and instead use numpy operations that operate on lots of numbers with one python command. (2) From the slowdowns you report, it looks like overhead costs are completely dominating. For each job, the code and data need to be serialized (pickled, I think, is how the multiprocessing library handles it), written to a pipe, unpickled, executed, and the results need to be pickled, sent back, and unpickled. Perhaps using memmap to share state might be better? Or you can make sure that the function parameters and results can be very rapidly pickled and unpickled (single numpy arrays, e.g., not lists-of-sub-arrays or something). Still, tune the single-processor code first. Perhaps you can send more detailed code samples and folks on the list can offer some advice about how to make it numpy-friendly and fast. Zach On May 27, 2010, at 5:37 PM, Andy Fraser wrote:
Thanks for the replies and pointers. I got multiprocessing.Pool to work, but it eats up memory and time. I append two implementation segments below. The multiprocessing version is about 33 times _slower_ than the single processor version. Unless I use a small number of processors, memory fills up and I kill the job to make the computer usable again. The following segments of code are inside a loop that steps over 115 lines of pixels.
def func(job): return job[0].random_fork(job[1])
. . . . . .
#Multiprocessing version:
noise = numpy.random.standard_normal((N_particles,noise_df)) jobs = zip(self.particles,noise) self.particles = self.pool.map(func, jobs, self.chunk_size) return (m,v)
. . . . . .
#Single processing version
noise = numpy.random.standard_normal((N_particles,noise_df)) jobs = zip(self.particles,noise) self.particles = map(func, jobs) return (m,v)
-- Andy Fraser ISR-2 (MS:B244) afraser@lanl.gov Los Alamos National Laboratory 505 665 9448 Los Alamos, NM 87545 _______________________________________________ SciPy-User mailing list SciPy-User@scipy.org http://mail.scipy.org/mailman/listinfo/scipy-user
Zach, Thank you for your detailed reply. The way I've structured my code makes it difficult to implement your advice. After taking some time to work on the problem, I will post again. I may not get back to it till after my summer vacation.
"ZP" == Zachary Pincus <zachary.pincus@yale.edu> writes:
ZP> [...] Several problems here: ZP> (1) I am sorry I didn't mention this earlier, but looking over ZP> your original email, it appears that your single-process code ZP> might be very inefficient: it seems to perturb each particle ZP> individually in a for- loop rather than working on an array of ZP> all the particles. [...] Correct. My particles are planes that carry cameras. I have three kinds of classes: ParticleFilters, Planes, and Cameras. That structure makes it easy to change the characteristics of the Planes or Cameras by using subclasses at the expense of making it hard to speed things up. ZP> (2) From the slowdowns you report, it looks like overhead ZP> costs are completely dominating. For each job, the code and ZP> data need to be serialized (pickled, I think, is how the ZP> multiprocessing library handles it), written to a pipe, ZP> unpickled, executed, and the results need to be pickled, sent ZP> back, and unpickled. Perhaps using memmap to share state might ZP> be better? Or you can make sure that the function parameters ZP> and results can be very rapidly pickled and unpickled (single ZP> numpy arrays, e.g., not lists-of-sub-arrays or something). I suspected that [un]pickling was the dominating factor. I had not looked at mmap before. It looks like a better tool. Andy
Thank you for your detailed reply. The way I've structured my code makes it difficult to implement your advice. After taking some time to work on the problem, I will post again. I may not get back to it till after my summer vacation.
You're welcome; good luck! Overall, the only take-home here is that the main commandment in writing numerical codes for interpreted languages like Python or Matlab is: "Thou shalt not write looping constructs". If your design absolutely requires iteration over individual data elements (and they often do), you might look at cython, which is a nice way to write in (more or less) python that gets compiled to C and can easily interact with python objects / numpy arrays. Zach On Jun 1, 2010, at 10:45 AM, Andy Fraser wrote:
Zach,
Thank you for your detailed reply. The way I've structured my code makes it difficult to implement your advice. After taking some time to work on the problem, I will post again. I may not get back to it till after my summer vacation.
"ZP" == Zachary Pincus <zachary.pincus@yale.edu> writes:
ZP> [...] Several problems here:
ZP> (1) I am sorry I didn't mention this earlier, but looking over ZP> your original email, it appears that your single-process code ZP> might be very inefficient: it seems to perturb each particle ZP> individually in a for- loop rather than working on an array of ZP> all the particles. [...]
Correct. My particles are planes that carry cameras. I have three kinds of classes: ParticleFilters, Planes, and Cameras. That structure makes it easy to change the characteristics of the Planes or Cameras by using subclasses at the expense of making it hard to speed things up.
ZP> (2) From the slowdowns you report, it looks like overhead ZP> costs are completely dominating. For each job, the code and ZP> data need to be serialized (pickled, I think, is how the ZP> multiprocessing library handles it), written to a pipe, ZP> unpickled, executed, and the results need to be pickled, sent ZP> back, and unpickled. Perhaps using memmap to share state might ZP> be better? Or you can make sure that the function parameters ZP> and results can be very rapidly pickled and unpickled (single ZP> numpy arrays, e.g., not lists-of-sub-arrays or something).
I suspected that [un]pickling was the dominating factor. I had not looked at mmap before. It looks like a better tool.
Andy _______________________________________________ SciPy-User mailing list SciPy-User@scipy.org http://mail.scipy.org/mailman/listinfo/scipy-user
On Thu, May 27, 2010 at 10:37 PM, Andy Fraser <afraser@lanl.gov> wrote:
#Multiprocessing version:
noise = numpy.random.standard_normal((N_particles,noise_df)) jobs = zip(self.particles,noise) self.particles = self.pool.map(func, jobs, self.chunk_size) return (m,v)
What platform are you on? I often forget that multiprocessing works quite differently on Windows to unix platforms (and is much less useful). On unix platforms the child processes are spawned with fork(), which means they share all the memory state of the parent process, with copy on write if they make changes. On Windows seperate processes are spawned and all the state has to be past through the serialiser (I think). So on unix you can share large quantities of (read only) data very cheaply by making it accessible before the fork. So if you are on Mac/Linux and the slow down is caused by passing the large noise array, you could get around this by making it a global somehow before the fork when you initiate the pool... ie import mymodule mymodule.noise = numpy.random.standard_normal((N_particles,noise_df)) then use this in func, dont pass the noise array in the map call. But I agree with Zachary about using arrays of object parameters rather than lists of objects each with their own parameter variables. Cheers Robin
Thank you for your continuing help.
"R" == Robin <robince@gmail.com> writes:
R> On Thu, May 27, 2010 at 10:37 PM, Andy Fraser <afraser@lanl.gov> wrote: >>
#Multiprocessing version: >> >> noise = >> numpy.random.standard_normal((N_particles,noise_df)) >> jobs = zip(self.particles,noise) self.particles = >> self.pool.map(func, jobs, self.chunk_size) return (m,v)
R> What platform are you on? [...] Ubuntu/GNU/Linux R> So if you are on Mac/Linux and the slow down is caused by R> passing the large noise array, [...] I believe that large image arrays were being copied and maybe pickled. R> But I agree with Zachary about using arrays of object R> parameters rather than lists of objects each with their own R> parameter variables. Following Zach's advice (and my own experience), I've moved all of the loops over particles from python to C++ or implemented them as single numpy functions. That has cut the time by a factor of about 25. My next moves are to figure out where the remaining time gets spent and if there are big expenditures in the C++ code, I will look into multiprocessing there. -- Andy Fraser ISR-2 (MS:B244) afraser@lanl.gov Los Alamos National Laboratory 505 665 9448 Los Alamos, NM 87545
If you end up with most of your time spent in c code, you might be able to release the GIL and then use multiple threads, in which case you won't need to worry about process spawning overhead & shared memory. my 2 cents, David ----- Original Message ---- From: Andy Fraser <afraser@lanl.gov> To: SciPy Users List <scipy-user@scipy.org> Sent: Fri, 4 June, 2010 2:30:34 AM Subject: Re: [SciPy-User] using multiple processors for particle filtering Thank you for your continuing help.
"R" == Robin <robince@gmail.com> writes:
R> On Thu, May 27, 2010 at 10:37 PM, Andy Fraser <afraser@lanl.gov> wrote: >>
#Multiprocessing version: >> >> noise = >> numpy.random.standard_normal((N_particles,noise_df)) >> jobs = zip(self.particles,noise) self.particles = >> self.pool.map(func, jobs, self.chunk_size) return (m,v)
R> What platform are you on? [...] Ubuntu/GNU/Linux R> So if you are on Mac/Linux and the slow down is caused by R> passing the large noise array, [...] I believe that large image arrays were being copied and maybe pickled. R> But I agree with Zachary about using arrays of object R> parameters rather than lists of objects each with their own R> parameter variables. Following Zach's advice (and my own experience), I've moved all of the loops over particles from python to C++ or implemented them as single numpy functions. That has cut the time by a factor of about 25. My next moves are to figure out where the remaining time gets spent and if there are big expenditures in the C++ code, I will look into multiprocessing there. -- Andy Fraser ISR-2 (MS:B244) afraser@lanl.gov Los Alamos National Laboratory 505 665 9448 Los Alamos, NM 87545 _______________________________________________ SciPy-User mailing list SciPy-User@scipy.org http://mail.scipy.org/mailman/listinfo/scipy-user
Thanks to all who offered advice. I am posting a final follow up in case anyone reads the thread looking for a solution. The weight calculations were taking more than 97% of the time in my code. By putting the loop in c++ and using pthreads for the most time consuming computations, I reduced the execution time by a factor of 68. I used the boost multi_array library for arrays and the numpy_boost code to do conversions from numpy to boost arrays. Lessons learned: 1. As Zack says, for speed don't loop over data in python 2. Don't use python multiprocessing for this kind of problem 3. Don't put calls to python interface functions (eg, PyObject_GetAttrString) in threaded c++ code. Extract data (or at least pointers) from python objects before starting multiple threads. 4. Do use numpy_boost (http://code.google.com/p/numpy-boost/) and pthreads. Here are the key lines of c++: void *weight_thread(void *thread_arg){ /* Code that runs in separate thread. The expensive calculations are done by t_d->c->weight(). The ugly code here, sets up data for that call using the single argument to this function. */ weight_data *t_d = (struct weight_data *) thread_arg; array_d2_ref par_list = *t_d->par_list; array_d1_ref w = *t_d->w; for (int i=t_d->start; i<t_d->stop; i++){ array_d1_ref X = t_d->XQ->X[i]; array_d1_ref q = t_d->XQ->q[i]; for (int k=0;k<3;k++){ par_list[i][k+4] = X[k]; } for (int k=0;k<4;k++){ par_list[i][k] = q[k]; } w[i] = t_d->c->weight(X, q, t_d->t); } pthread_exit(NULL); } static PyObject * weights_list(PyObject *self, PyObject *args){ /* Python call: weights_list(others,t,N_threads,w,par_list,Y,ref,data,mu,Q) This function fills the numpy arrays 'w' and 'par_list' with results of weight calculations for each plane in the list 'others'. Calls to the 'weight' method of a c++ camera 'c' built from 'Y', 'ref', 'data', 'mu' and 'Q' does the calculations. */ PyArrayObject *Py_ref, *Py_data; PyObject *others, *dummy; int t, N_threads; if (!PyArg_ParseTuple(args, "OiiOOOO&O&OO", &others, &t, &N_threads, &dummy, &dummy, &dummy, PyArray_Converter, &Py_ref, PyArray_Converter, &Py_data, &dummy, &dummy )) return NULL; Py_ssize_t i = 3; // Starting argument for conversions numpy_boost<double, 1> w(PySequence_GetItem(args, i++)); numpy_boost<double, 2> par_list(PySequence_GetItem(args, i++)); numpy_boost<double, 2> Y(PySequence_GetItem(args, i++)); PyImage ref = PyImage(Py_ref); PyImage data = PyImage(Py_data); Py_DECREF(Py_ref); Py_DECREF(Py_data); i += 2; numpy_boost<double, 1> mu(PySequence_GetItem(args, i++)); numpy_boost<double, 2> Q(PySequence_GetItem(args, i++)); Xq XQ(others); int N = PyList_Size(others); Camera c = Camera::Camera(&Y, &ref, &data, &mu, &Q); weight_data t_d[N_threads]; pthread_t t_id[N_threads]; pthread_attr_t attr; pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); for (int i=0; i<N_threads; i++){ t_d[i].t = t; t_d[i].start = (i*N)/N_threads; t_d[i].stop = ((i+1)*N)/N_threads; t_d[i].XQ = &XQ; t_d[i].w = &w; t_d[i].par_list = &par_list; t_d[i].c = &c; int rc = pthread_create(&t_id[i], &attr, weight_thread, (void *)&t_d[i]); if (rc) { printf("ERROR; return code from pthread_create() is %d\n", rc); exit(-1); } } /* Free attribute and wait for the other threads */ pthread_attr_destroy(&attr); for(int i=0; i<N_threads; i++) { int rc = pthread_join(t_id[i], NULL); if (rc) { printf("ERROR; return code from pthread_join() is %d\n", rc); exit(-1); } } return Py_BuildValue(""); }
"A" == Andy Fraser <afraser@lanl.gov> writes:
A> I am using a particle filter to estimate the trajectory of a A> camera based on a sequence of images taken by the camera. The A> code is slow, but I have 8 processors in my desktop machine. A> I'd like to use them to get results 8 times faster. I've been A> looking at the following sections of A> http://docs.python.org/library: "16.6. multiprocessing" and A> "16.2. threading". I've also read some discussion from 2006 on A> scipy-user@scipy.org about seeds for random numbers in threads. A> I don't have any experience with multiprocessing and would A> appreciate advice. A> Here is a bit of code that I want to modify: A> for i in xrange(len(self.particles)): self.particles[i] A> = self.particles[i].random_fork() A> Each particle is a class instance that represents a possible A> camera state (position, orientation, and velocities). A> particle.random_fork() is a method that moves the position and A> orientation based on current velocities and then uses A> numpy.random.standard_normal((N,)) to perturb the velocities. A> I handle the correlation structure of the noise by matrices A> that are members of particle, and I do some of the calculations A> in c++. A> I would like to do something like: A> for i in xrange(len(self.particles)): nv = A> numpy.random.standard_normal((N,)) A> launch_on_any_available_processor( self.particles[i] = A> self.particles[i].random_fork(nv) ) wait_for_completions() A> But I don't see a command like A> "launch_on_any_available_processor". I would be grateful for A> any advice.
participants (4)
-
Andy Fraser
-
David Baddeley
-
Robin
-
Zachary Pincus