Processes not exiting
ma3mju
matt.urry at googlemail.com
Fri Jul 31 04:04:08 EDT 2009
Hi all,
I'm having trouble with multiprocessing I'm using it to speed up some
simulations, I find for large queues when the process reaches the
poison pill it does not exit whereas for smaller queues it works
without any problems. Has anyone else had this trouble? Can anyone
tell me a way around it? The code is in two files below.
Thanks
Matt
parallel.py
===================================================
import GaussianProcessRegression as GP
import numpy as np
import networkx as nx
import pickle
import multiprocessing
############################################################################################
# Things You Can Change
############################################################################################
#savefiles
savefile = "wattsdata2"
graphfile = "wattsgraphs2"
#sample sizes
num_graphs = 5
num_sets_of_data = 10
#other things...
intervals = np.ceil(np.logspace(-2,1,50)*500)
noise = [np.sqrt(0.1),np.sqrt(0.01),np.sqrt(0.001),np.sqrt(0.0001)]
############################################################################################
#generate graphs
graphs = []
for i in range(0,num_graphs):
graphs.append(nx.watts_strogatz_graph(500,5,0.01))
#save them for later reference
filehandler = open(graphfile,'w')
pickle.dump(graphs,filehandler,-1)
filehandler.close()
#queues
easy_work_queue = multiprocessing.Queue()
hard_work_queue = multiprocessing.Queue()
result_queue = multiprocessing.Queue()
#construct the items in the hard queue
l=0
for j in range(0,len(intervals)):
for i in range(0,len(noise)):
for k in range(0,num_graphs):
if int(intervals[j]) <=4000:
easy_work_queue.put({'datapt': l,'graph': graphs
[k],'noise': noise[i],'number_of_sets_of_data':
num_sets_of_data,'number_of_data_points':int(intervals[j])})
else:
hard_work_queue.put({'datapt': l,'graph': graphs
[k],'noise': noise[i],'number_of_sets_of_data':
num_sets_of_data,'number_of_data_points':int(intervals[j])})
l+=1
#get number of cores and set the number on concurrent processes
num_hard_workers = 2
num_workers = multiprocessing.cpu_count()*1.5
easy_workers = []
hard_workers = []
#add poison pill for each worker and create the worker
for i in range(0,num_workers-num_hard_workers):
easy_work_queue.put(None)
easy_workers.append(multiprocessing.Process
(target=GP.RandomWalkGeneralizationErrorParallel,args=
(easy_work_queue,result_queue,)))
for i in range(0,num_hard_workers):
hard_work_queue.put(None)
hard_workers.append(multiprocessing.Process
(target=GP.RandomWalkGeneralizationErrorParallel,args=
(hard_work_queue,result_queue,)))
#run all workers
for worker in hard_workers:
worker.start()
for worker in easy_workers:
worker.start()
#wait for easy workers to finish
for worker in easy_workers:
worker.join()
print('worker joined')
#set off some of the easy workers on the hard work (maybe double
number of hard)
for i in range(0,num_hard_workers):
hard_work_queue.put(None)
hard_workers.append(multiprocessing.Process
(target=GP.RandomWalkGeneralizationErrorParallel,args=
(hard_work_queue,result_queue,)))
#wait for all hard workers to finish
for worker in hard_workers:
worker.join()
#construct data from the mess in the result queue
tempdata = np.zeros(l)
while not result_queue.empty():
data = result_queue.get()
tempdata[data[0]] = data[1]
finaldata = tempdata.reshape((len(intervals),len(noise),num_graphs))
np.save(savefile,finaldata)
=======================================================
GaussianProcessRegression.py
=======================================================
import CovarianceFunction as CF
import networkx as nx
import numpy as np
import scipy.linalg as sp
#fortran code from lapack-blas (hopefully when scipy updated this wont
be needed)
import dtrsv
#to use more than one core
import multiprocessing
#Currently we assume Gaussian noise TODO change to general noise
#Assume 0 mean TODO change to general mean Gaussian Process
class GaussianProcessRegression:
def __init__(self,covariance_function,sigma):
#a covariance function object defined in CovarianceFunction
class
#note this uses the parent class but any children can be used
self.C = covariance_function
#a list of pts that are known and their values
self.pts = []
self.vals = []
#the inverse of K as defined in
#@book{coolen05:theoryofneural,
#ISBN = {0-19-853024-2},
#publisher = {Oxford University Press, USA},
#author = {Coolen, A. C. C. and K{\"{u}}hn, R. and Sollich, P.},
#title = {Theory of neural information processing systems},
#year = {2005},
#}
self.K = np.array([])
#gaussian noise variable
self.sigma = float(sigma)
self.cholL = np.array([])
def add_data_points(self,points,vals):
#add all points to list
self.pts += points
self.vals += vals
arraysize = len(self.pts)
#construct K
K = np.zeros((arraysize,arraysize))
#for speed
pts = self.pts
between_points = self.C.between_points
if len(self.K):
K[:-1,:-1] = self.K
for i in xrange(0,arraysize):
for j in xrange(arraysize-len(points),arraysize):
K[i,j] = between_points(pts[i],pts[j])
K[j,i] = K[i,j]
K[arraysize-len(points):arraysize,arraysize-len
(points):arraysize] = K[arraysize-len(points):arraysize,arraysize-len
(points):arraysize] + self.sigma**2 * np.eye(len(points))
self.K = K
#calculate the prediction of a point based on data previously
given
def point_prediction(self,points):
mean = []
variance =[]
arraysize = len(self.pts)
#cholesky
#if self.cholL.shape[0] < arraysize:
L=np.linalg.cholesky(self.K)
# self.cholL = L
#else:
# L = self.cholL
alpha = sp.cho_solve((L,1),self.vals)
#create L in banded form
k=np.zeros((arraysize,len(points)))
##################################################################
#for speed get ref to functions im going to use and save them
between_points = self.C.between_points
pts = self.pts
dot = np.dot
##################################################################
for j in xrange(0,len(points)):
#create k
for i in xrange(0,arraysize):
k[i,j] = between_points(pts[i],points[j])
#calculate mean and variance
#call the command for forward substitution
###############fortran
call#######################################
v = dtrsv.dtrsv('L','N',arraysize,L,k)
##################################################################
#result
mean=dot(alpha,k)
for i in xrange(0,len(points)):
variance.append(between_points(points[i],points[i]) - dot(v
[:,i],v[:,i]))
#return it in dictionary form
return {'mean':mean,'variance':variance}
# calculate the error for data given, where function is a vector
# of the function evaluated at a sufficiently large number of
points
# that the GPregression has been trying to learn
def error(self,function):
total = 0
#sum up variances
result = self.point_prediction(function[::2])
total = np.sum(result['variance'])
total = (1/float(len(function)/2))*total
return total
#clear what has been learnt so far
def clear(self):
self.pts = []
self.vals = []
self.K = np.array([])
#calculate the average error for a function defined in function
when give
#number_of_examples examples
def average_error_over_samples(self,function, sample_size,
number_of_examples):
avg = 0
numberofpoints = len(function)/2
for i in range(0,sample_size):
self.clear()
#generate points of the function
permpts = np.random.randint
(0,numberofpoints,number_of_examples)
#create the vectors
pts = []
vals = []
for j in range(0,number_of_examples):
pts.append(function[permpts[j]*2])
vals.append(function[permpts[j]*2+1])
#learn these points
self.add_data_points(pts,vals)
#print("points added")
avg = avg + self.error(function)
avg = avg/sample_size
return avg
#calculate the average error over functions over data of size
number_of_data_points for MOST cases this is
#also the generalization error a summary of which and
approximations to can be found in:
#@inproceedings{Sollich99learningcurves,
#booktitle = {Neural Computation},
#author = {Sollich, P.},
#title = {Learning curves for Gaussian process regression:
Approximations and bounds},
#pages = {200-2},
#year = {1999},
#}
def emprical_average_error_over_functions
(self,number_of_functions,number_of_sets_of_data,number_of_data_points,function_detail
=0,progress=0):
avg = 0
step = float(100)/number_of_functions
for i in range(0,number_of_functions):
if progress:
print step*float(i),"%"
if function_detail:
fx = self.C.generate_function
(self.sigma,function_detail)
else:
fx = self.C.generate_function(self.sigma)
avg = self.average_error_over_samples
(fx,number_of_sets_of_data,number_of_data_points)+avg
avg = avg / number_of_functions
return avg
def average_error_over_functions
(self,number_of_sets_of_data,number_of_data_points,function_detail=0):
if function_detail:
fx = self.C.generate_function
(self.sigma,function_detail)
else:
fx = self.C.generate_function(self.sigma)
avg = self.average_error_over_samples
(fx,number_of_sets_of_data,number_of_data_points)
return(avg)
def function_prediction(self,pts):
temp = self.point_prediction(pts)
return {'func':temp['mean'],'varpos':temp
['variance'],'varneg':-temp['variance']}
#########################################################################################################################################################
#Functions not contained in a class
#########################################################################################################################################################
#function to calculate the generalization error for a RandomWalk
kernel averaging over graphs graphs
def RandomWalkGeneralizationError
(noise,graphs,number_of_sets_of_data,number_of_data_points,a=2,p=10):
graph_specific = np.zeros(len(graphs))
avg = 0
for i in range(0,len(graphs)):
rw = CF.RandomWalk(a,p,graphs[i])
GP = GaussianProcessRegression(rw,noise)
graph_specific[i] = GP.average_error_over_functions
(number_of_sets_of_data,number_of_data_points)
avg = np.sum(graph_specific)/len(graphs)
return avg, graph_specific
#as above but using queues to create parallel architechture
def RandomWalkGeneralizationErrorParallel
(work_queue,result_queue,a=2,p=10):
while True:
input = work_queue.get()
if input is None:
print "poison"
break
print 'this should not appear'
print input['datapt'], ' ', input['number_of_data_points']
rw=CF.RandomWalk(a,p,input['graph'])
GP = GaussianProcessRegression(rw,input['noise'])
err = GP.average_error_over_functions(input
['number_of_sets_of_data'],input['number_of_data_points'])
result_queue.put([input['datapt'],err])
print 'here'
return
More information about the Python-list
mailing list