Processes not exiting
ma3mju
matt.urry at googlemail.com
Mon Aug 10 16:54:38 CEST 2009
On 7 Aug, 16:02, MRAB <pyt... at mrabarnett.plus.com> wrote:
> ma3mju wrote:
> > On 3 Aug, 09:36, ma3mju <matt.u... at googlemail.com> wrote:
> >> On 2 Aug, 21:49, Piet van Oostrum <p... at cs.uu.nl> wrote:
>
> >>>>>>>> MRAB <pyt... at mrabarnett.plus.com> (M) wrote:
> >>>> M> I wonder whether one of the workers is raising an exception, perhaps due
> >>>> M> to lack of memory, when there are large number of jobs to process.
> >>> But that wouldn't prevent the join. And you would probably get an
> >>> exception traceback printed.
> >>> I wonder if something fishy is happening in the multiprocessing
> >>> infrastructure. Or maybe the Fortran code goes wrong because it has no
> >>> protection against buffer overruns and similar problems, I think.
> >>> --
> >>> Piet van Oostrum <p... at cs.uu.nl>
> >>> URL:http://pietvanoostrum.com[PGP8DAE142BE17999C4]
> >>> Private email: p... at vanoostrum.org
> >> I don't think it's a memory problem, the reason for the hard and easy
> >> queue is because for larger examples it uses far more RAM. If I run
> >> all of workers with harder problems I do begin to run out of RAM and
> >> end up spending all my time switching in and out of swap so I limit
> >> the number of harder problems I run at the same time. I've watched it
> >> run to the end (a very boring couple of hours) and it stays out of my
> >> swap space and everything appears to be staying in RAM. Just hangs
> >> after all "poison" has been printed for each process.
>
> >> The other thing is that I get the message "here" telling me I broke
> >> out of the loop after seeing the poison pill in the process and I get
> >> all the things queued listed as output surely if I were to run out of
> >> memory I wouldn't expect all of the jobs to be listed as output.
>
> >> I have a serial script that works fine so I know individually for each
> >> example the fortran code works.
>
> >> Thanks
>
> >> Matt
>
> > Any ideas for a solution?
>
> A workaround is to do them in small batches.
>
> You could put each job in a queue with a flag to say whether it's hard
> or easy, then:
>
> while have more jobs:
> move up to BATCH_SIZE jobs into worker queues
> create and start workers
> wait for workers to finish
> discard workers
Yeah, I was hoping for something with a bit more finesse. In the end I
used pool instead with a callback function and that has solved the
problem. I did today find this snippet;
Joining processes that use queues
Bear in mind that a process that has put items in a queue will
wait before terminating until all the buffered items are fed by the
“feeder” thread to the underlying pipe. (The child process can call
the Queue.cancel_join_thread() method of the queue to avoid this
behaviour.)
This means that whenever you use a queue you need to make sure
that all items which have been put on the queue will eventually be
removed before the process is joined. Otherwise you cannot be sure
that processes which have put items on the queue will terminate.
Remember also that non-daemonic processes will be automatically be
joined.
I don't know (not a computer scientist) but could it have been the
pipe getting full?
In case anyway else is effected by this I've attached the new code to
see the changes I made to fix it.
Thanks for all your help
Matt
============================================================================================================================
parallel.py
============================================================================================================================
import GaussianProcessRegression as GP
import numpy as np
import networkx as nx
import pickle
from multiprocessing import Pool
global result
def cb(r):
global result
print r
result[r[0]] = r[1]
############################################################################################
# Things You Can Change
############################################################################################
#savefiles
savefile = "powerlaw"
graphfile = "powerlawgraph"
#sample sizes
num_graphs = 5
num_sets_of_data = 10
#other things...
intervals = np.ceil(np.logspace(-2,1,50)*500)
noise = [np.sqrt(0.1),np.sqrt(0.01),np.sqrt(0.001),np.sqrt(0.0001)]
num_hard_workers = 5
hard_work_threshold = 4000
############################################################################################
#generate graphs
graphs = []
for i in range(0,num_graphs):
graphs.append(nx.powerlaw_cluster_graph(500,0.1,0.05))
#save them for later reference
filehandler = open(graphfile,'w')
pickle.dump(graphs,filehandler,-1)
filehandler.close()
#queues
easy_work = []
hard_work = []
#construct the items in the hard queue
l=0
for j in range(0,len(intervals)):
for i in range(0,len(noise)):
for k in range(0,num_graphs):
if int(intervals[j]) <=hard_work_threshold:
easy_work.append({'datapt': l,'graph': graphs
[k],'noise': noise[i],'number_of_sets_of_data':
num_sets_of_data,'number_of_data_points':int(intervals[j])})
else:
hard_work.append({'datapt': l,'graph': graphs
[k],'noise': noise[i],'number_of_sets_of_data':
num_sets_of_data,'number_of_data_points':int(intervals[j])})
l+=1
result = np.zeros(l)
#create pool with all cores possible
worker_pool = Pool()
for i in xrange(0,len(easy_work)):
worker_pool.apply_async(GP.RandomWalkGeneralizationErrorParallel,
(easy_work[i],),callback=cb)
worker_pool.close()
worker_pool.join()
#create hard work queue
worker_pool = Pool(processes = num_hard_workers)
for i in xrange(0,len(hard_work)):
worker_pool.apply_async(GP.RandomWalkGeneralizationErrorParallel,
(hard_work[i],),callback=cb)
worker_pool.close()
worker_pool.join()
finaldata = result.reshape((len(intervals),len(noise),num_graphs))
np.save(savefile,finaldata)
================================================================================================================================================
GaussianProcessRegression.py
================================================================================================================================================
import CovarianceFunction as CF
import networkx as nx
import numpy as np
import scipy.linalg as sp
#fortran code from lapack-blas (hopefully when scipy updated this wont
be needed)
import dtrsv
#Currently we assume Gaussian noise TODO change to general noise
#Assume 0 mean TODO change to general mean Gaussian Process
class GaussianProcessRegression:
def __init__(self,covariance_function,sigma):
#a covariance function object defined in CovarianceFunction
class
#note this uses the parent class but any children can be used
self.C = covariance_function
#a list of pts that are known and their values
self.pts = []
self.vals = []
#the inverse of K as defined in
#@book{coolen05:theoryofneural,
#ISBN = {0-19-853024-2},
#publisher = {Oxford University Press, USA},
#author = {Coolen, A. C. C. and K{\"{u}}hn, R. and Sollich, P.},
#title = {Theory of neural information processing systems},
#year = {2005},
#}
self.K = np.array([])
#gaussian noise variable
self.sigma = float(sigma)
self.cholL = np.array([])
def add_data_points(self,points,vals):
#add all points to list
self.pts += points
self.vals += vals
arraysize = len(self.pts)
#construct K
K = np.zeros((arraysize,arraysize))
#for speed
pts = self.pts
between_points = self.C.between_points
if len(self.K):
K[:-1,:-1] = self.K
for i in xrange(0,arraysize):
for j in xrange(arraysize-len(points),arraysize):
K[i,j] = between_points(pts[i],pts[j])
K[j,i] = K[i,j]
K[arraysize-len(points):arraysize,arraysize-len
(points):arraysize] = K[arraysize-len(points):arraysize,arraysize-len
(points):arraysize] + self.sigma**2 * np.eye(len(points))
self.K = K
#calculate the prediction of a point based on data previously
given
def point_prediction(self,points):
mean = []
variance =[]
arraysize = len(self.pts)
#cholesky
#if self.cholL.shape[0] < arraysize:
L=np.linalg.cholesky(self.K)
# self.cholL = L
#else:
# L = self.cholL
alpha = sp.cho_solve((L,1),self.vals)
#create L in banded form
k=np.zeros((arraysize,len(points)))
##################################################################
#for speed get ref to functions im going to use and save them
between_points = self.C.between_points
pts = self.pts
dot = np.dot
##################################################################
for j in xrange(0,len(points)):
#create k
for i in xrange(0,arraysize):
k[i,j] = between_points(pts[i],points[j])
#calculate mean and variance
#call the command for forward substitution
###############fortran
call#######################################
v = dtrsv.dtrsv('L','N',arraysize,L,k)
##################################################################
#result
mean=dot(alpha,k)
for i in xrange(0,len(points)):
variance.append(between_points(points[i],points[i]) - dot(v
[:,i],v[:,i]))
#return it in dictionary form
return {'mean':mean,'variance':variance}
# calculate the error for data given, where function is a vector
# of the function evaluated at a sufficiently large number of
points
# that the GPregression has been trying to learn
def error(self,function):
total = 0
#sum up variances
result = self.point_prediction(function[::2])
total = np.sum(result['variance'])
total = (1/float(len(function)/2))*total
return total
#clear what has been learnt so far
def clear(self):
self.pts = []
self.vals = []
self.K = np.array([])
#calculate the average error for a function defined in function
when give
#number_of_examples examples
def average_error_over_samples(self,function, sample_size,
number_of_examples):
avg = 0
numberofpoints = len(function)/2
for i in range(0,sample_size):
self.clear()
#generate points of the function
permpts = np.random.randint
(0,numberofpoints,number_of_examples)
#create the vectors
pts = []
vals = []
for j in range(0,number_of_examples):
pts.append(function[permpts[j]*2])
vals.append(function[permpts[j]*2+1])
#learn these points
self.add_data_points(pts,vals)
#print("points added")
avg = avg + self.error(function)
avg = avg/sample_size
return avg
More information about the Python-list
mailing list