Processes not exiting

ma3mju matt.urry at googlemail.com
Fri Jul 31 10:04:08 CEST 2009


Hi all,

I'm having trouble with multiprocessing I'm using it to speed up some
simulations, I find for large queues when the process reaches the
poison pill it does not exit whereas for smaller queues it works
without any problems. Has anyone else had this trouble? Can anyone
tell me a way around it? The code is in two files below.

Thanks

Matt

parallel.py
===================================================
import GaussianProcessRegression as GP
import numpy as np
import networkx as nx
import pickle
import multiprocessing
############################################################################################
# Things You Can Change
############################################################################################
#savefiles
savefile = "wattsdata2"
graphfile = "wattsgraphs2"
#sample sizes
num_graphs = 5
num_sets_of_data = 10
#other things...
intervals = np.ceil(np.logspace(-2,1,50)*500)
noise = [np.sqrt(0.1),np.sqrt(0.01),np.sqrt(0.001),np.sqrt(0.0001)]

############################################################################################
#generate graphs
graphs  = []
for i in range(0,num_graphs):
    graphs.append(nx.watts_strogatz_graph(500,5,0.01))
#save them for later reference
filehandler = open(graphfile,'w')
pickle.dump(graphs,filehandler,-1)
filehandler.close()

#queues
easy_work_queue = multiprocessing.Queue()
hard_work_queue = multiprocessing.Queue()
result_queue = multiprocessing.Queue()
#construct the items in the hard queue
l=0
for j in range(0,len(intervals)):
    for i in range(0,len(noise)):
        for k in range(0,num_graphs):
            if int(intervals[j]) <=4000:
                easy_work_queue.put({'datapt': l,'graph': graphs
[k],'noise': noise[i],'number_of_sets_of_data':
num_sets_of_data,'number_of_data_points':int(intervals[j])})
            else:
                hard_work_queue.put({'datapt': l,'graph': graphs
[k],'noise': noise[i],'number_of_sets_of_data':
num_sets_of_data,'number_of_data_points':int(intervals[j])})
            l+=1

#get number of cores and set the number on concurrent processes
num_hard_workers = 2
num_workers = multiprocessing.cpu_count()*1.5
easy_workers = []
hard_workers = []
#add poison pill for each worker and create the worker
for i in range(0,num_workers-num_hard_workers):
    easy_work_queue.put(None)
    easy_workers.append(multiprocessing.Process
(target=GP.RandomWalkGeneralizationErrorParallel,args=
(easy_work_queue,result_queue,)))
for i in range(0,num_hard_workers):
    hard_work_queue.put(None)
    hard_workers.append(multiprocessing.Process
(target=GP.RandomWalkGeneralizationErrorParallel,args=
(hard_work_queue,result_queue,)))

#run all workers
for worker in hard_workers:
    worker.start()
for worker in easy_workers:
    worker.start()
#wait for easy workers to finish
for worker in easy_workers:
    worker.join()
    print('worker joined')

#set off some of the easy workers on the hard work (maybe double
number of hard)
for i in range(0,num_hard_workers):
    hard_work_queue.put(None)
    hard_workers.append(multiprocessing.Process
(target=GP.RandomWalkGeneralizationErrorParallel,args=
(hard_work_queue,result_queue,)))
#wait for all hard workers to finish
for worker in hard_workers:
    worker.join()

#construct data from the mess in the result queue

tempdata = np.zeros(l)
while not result_queue.empty():
    data = result_queue.get()
    tempdata[data[0]] = data[1]

finaldata = tempdata.reshape((len(intervals),len(noise),num_graphs))

np.save(savefile,finaldata)

=======================================================
GaussianProcessRegression.py
=======================================================
import CovarianceFunction as CF
import networkx as nx
import numpy as np
import scipy.linalg as sp
#fortran code from lapack-blas (hopefully when scipy updated this wont
be needed)
import dtrsv
#to use more than one core
import multiprocessing

#Currently we assume Gaussian noise TODO change to general noise
#Assume 0 mean TODO change to general mean Gaussian Process
class GaussianProcessRegression:
    def __init__(self,covariance_function,sigma):
        #a covariance function object defined in CovarianceFunction
class
        #note this uses the parent class but any children can be used
        self.C = covariance_function
        #a list of pts that are known and their values
        self.pts = []
        self.vals = []
        #the inverse of K as defined in
        #@book{coolen05:theoryofneural,
	    #ISBN = {0-19-853024-2},
	    #publisher = {Oxford University Press, USA},
	    #author = {Coolen, A. C. C. and K{\"{u}}hn, R. and Sollich, P.},
	    #title = {Theory of neural information processing systems},
	    #year = {2005},
        #}
        self.K = np.array([])
        #gaussian noise variable
        self.sigma = float(sigma)
        self.cholL = np.array([])


    def add_data_points(self,points,vals):
       #add all points to list
       self.pts += points
       self.vals += vals
       arraysize = len(self.pts)
       #construct K
       K = np.zeros((arraysize,arraysize))
       #for speed
       pts = self.pts
       between_points = self.C.between_points
       if len(self.K):
            K[:-1,:-1] = self.K
       for i in xrange(0,arraysize):
           for j in xrange(arraysize-len(points),arraysize):
                 K[i,j] = between_points(pts[i],pts[j])
                 K[j,i] = K[i,j]
       K[arraysize-len(points):arraysize,arraysize-len
(points):arraysize] =  K[arraysize-len(points):arraysize,arraysize-len
(points):arraysize] + self.sigma**2 * np.eye(len(points))
       self.K = K

    #calculate the prediction of a point based on data previously
given
    def point_prediction(self,points):
        mean = []
        variance =[]
        arraysize = len(self.pts)
        #cholesky
        #if self.cholL.shape[0] < arraysize:
        L=np.linalg.cholesky(self.K)
        #    self.cholL = L
        #else:
        #    L = self.cholL

        alpha = sp.cho_solve((L,1),self.vals)
        #create L in banded form
        k=np.zeros((arraysize,len(points)))
 
##################################################################
        #for speed get ref to functions im going to use and save them
        between_points = self.C.between_points
        pts = self.pts
        dot = np.dot
 
##################################################################
        for j in xrange(0,len(points)):
            #create k
            for i in xrange(0,arraysize):
                k[i,j] = between_points(pts[i],points[j])

        #calculate mean and variance
        #call the command for forward substitution
        ###############fortran
call#######################################
        v = dtrsv.dtrsv('L','N',arraysize,L,k)
 
##################################################################

        #result
        mean=dot(alpha,k)
        for i in xrange(0,len(points)):
            variance.append(between_points(points[i],points[i]) - dot(v
[:,i],v[:,i]))
        #return it in dictionary form
        return {'mean':mean,'variance':variance}


    # calculate the error for data given, where function is a vector
    # of the function evaluated at a sufficiently large number of
points
    # that the GPregression has been trying to learn
    def error(self,function):
        total = 0
        #sum up variances
        result = self.point_prediction(function[::2])
        total = np.sum(result['variance'])
        total = (1/float(len(function)/2))*total
        return total

    #clear what has been learnt so far
    def clear(self):
        self.pts = []
        self.vals = []
        self.K = np.array([])

    #calculate the average error for a function defined in function
when give
    #number_of_examples examples
    def average_error_over_samples(self,function, sample_size,
number_of_examples):
        avg = 0
        numberofpoints = len(function)/2
        for i in range(0,sample_size):
            self.clear()
            #generate points of the function
            permpts = np.random.randint
(0,numberofpoints,number_of_examples)
            #create the vectors
            pts = []
            vals = []
            for j in range(0,number_of_examples):
                pts.append(function[permpts[j]*2])
                vals.append(function[permpts[j]*2+1])

            #learn these points
            self.add_data_points(pts,vals)
            #print("points added")
            avg = avg + self.error(function)
        avg = avg/sample_size
        return avg

    #calculate the average error over functions over data of size
number_of_data_points for MOST cases this is
    #also the generalization error a summary of which and
approximations to can be found in:
    #@inproceedings{Sollich99learningcurves,
	#booktitle = {Neural Computation},
	#author = {Sollich, P.},
	#title = {Learning curves for Gaussian process regression:
Approximations and bounds},
	#pages = {200-2},
	#year = {1999},
    #}

    def emprical_average_error_over_functions
(self,number_of_functions,number_of_sets_of_data,number_of_data_points,function_detail
=0,progress=0):
        avg = 0
        step = float(100)/number_of_functions
        for i in range(0,number_of_functions):
            if progress:
                print step*float(i),"%"
            if function_detail:
                fx = self.C.generate_function
(self.sigma,function_detail)
            else:
                fx = self.C.generate_function(self.sigma)
            avg = self.average_error_over_samples
(fx,number_of_sets_of_data,number_of_data_points)+avg
        avg = avg / number_of_functions
        return avg

    def average_error_over_functions
(self,number_of_sets_of_data,number_of_data_points,function_detail=0):
            if function_detail:
                fx = self.C.generate_function
(self.sigma,function_detail)
            else:
                fx = self.C.generate_function(self.sigma)
            avg = self.average_error_over_samples
(fx,number_of_sets_of_data,number_of_data_points)
            return(avg)



    def function_prediction(self,pts):
        temp = self.point_prediction(pts)
        return {'func':temp['mean'],'varpos':temp
['variance'],'varneg':-temp['variance']}


#########################################################################################################################################################
#Functions not contained in a class
#########################################################################################################################################################

#function to calculate the generalization error for a RandomWalk
kernel averaging over graphs graphs
def RandomWalkGeneralizationError
(noise,graphs,number_of_sets_of_data,number_of_data_points,a=2,p=10):
    graph_specific = np.zeros(len(graphs))
    avg = 0
    for i in range(0,len(graphs)):
        rw = CF.RandomWalk(a,p,graphs[i])
        GP = GaussianProcessRegression(rw,noise)
        graph_specific[i] = GP.average_error_over_functions
(number_of_sets_of_data,number_of_data_points)
    avg = np.sum(graph_specific)/len(graphs)
    return avg, graph_specific

#as above but using queues to create parallel architechture
def RandomWalkGeneralizationErrorParallel
(work_queue,result_queue,a=2,p=10):
    while True:
        input = work_queue.get()
        if input is None:
            print "poison"
            break
            print 'this should not appear'
        print input['datapt'], ' ', input['number_of_data_points']
        rw=CF.RandomWalk(a,p,input['graph'])
        GP = GaussianProcessRegression(rw,input['noise'])
        err = GP.average_error_over_functions(input
['number_of_sets_of_data'],input['number_of_data_points'])
        result_queue.put([input['datapt'],err])
    print 'here'
    return



More information about the Python-list mailing list