Processes not exiting

ma3mju matt.urry at googlemail.com
Mon Aug 10 16:54:38 CEST 2009


On 7 Aug, 16:02, MRAB <pyt... at mrabarnett.plus.com> wrote:
> ma3mju wrote:
> > On 3 Aug, 09:36, ma3mju <matt.u... at googlemail.com> wrote:
> >> On 2 Aug, 21:49, Piet van Oostrum <p... at cs.uu.nl> wrote:
>
> >>>>>>>> MRAB <pyt... at mrabarnett.plus.com> (M) wrote:
> >>>> M> I wonder whether one of the workers is raising an exception, perhaps due
> >>>> M> to lack of memory, when there are large number of jobs to process.
> >>> But that wouldn't prevent the join. And you would probably get an
> >>> exception traceback printed.
> >>> I wonder if something fishy is happening in the multiprocessing
> >>> infrastructure. Or maybe the Fortran code goes wrong because it has no
> >>> protection against buffer overruns and similar problems, I think.
> >>> --
> >>> Piet van Oostrum <p... at cs.uu.nl>
> >>> URL:http://pietvanoostrum.com[PGP8DAE142BE17999C4]
> >>> Private email: p... at vanoostrum.org
> >> I don't think it's a memory problem, the reason for the hard and easy
> >> queue is because for larger examples it uses far more RAM. If I run
> >> all of workers with harder problems I do begin to run out of RAM and
> >> end up spending all my time switching in and out of swap so I limit
> >> the number of harder problems I run at the same time. I've watched it
> >> run to the end (a very boring couple of hours) and it stays out of my
> >> swap space and everything appears to be staying in RAM. Just hangs
> >> after all "poison" has been printed for each process.
>
> >> The other thing is that I get the message "here" telling me I broke
> >> out of the loop after seeing the poison pill in the process and I get
> >> all the things queued listed as output surely if I were to run out of
> >> memory I wouldn't expect all of the jobs to be listed as output.
>
> >> I have a serial script that works fine so I know individually for each
> >> example the fortran code works.
>
> >> Thanks
>
> >> Matt
>
> > Any ideas for a solution?
>
> A workaround is to do them in small batches.
>
> You could put each job in a queue with a flag to say whether it's hard
> or easy, then:
>
>      while have more jobs:
>          move up to BATCH_SIZE jobs into worker queues
>          create and start workers
>          wait for workers to finish
>          discard workers

Yeah, I was hoping for something with a bit more finesse. In the end I
used pool instead with a callback function and that has solved the
problem. I did today find this snippet;

Joining processes that use queues

    Bear in mind that a process that has put items in a queue will
wait before terminating until all the buffered items are fed by the
“feeder” thread to the underlying pipe. (The child process can call
the Queue.cancel_join_thread() method of the queue to avoid this
behaviour.)

    This means that whenever you use a queue you need to make sure
that all items which have been put on the queue will eventually be
removed before the process is joined. Otherwise you cannot be sure
that processes which have put items on the queue will terminate.
Remember also that non-daemonic processes will be automatically be
joined.


I don't know (not a computer scientist) but could it have been the
pipe getting full?

In case anyway else is effected by this I've attached the new code to
see the changes I made to fix it.

Thanks for all your help

Matt

============================================================================================================================
parallel.py
============================================================================================================================
import GaussianProcessRegression as GP
import numpy as np
import networkx as nx
import pickle
from multiprocessing import Pool

global result

def cb(r):
    global result
    print r
    result[r[0]] = r[1]


############################################################################################
# Things You Can Change
############################################################################################
#savefiles
savefile = "powerlaw"
graphfile = "powerlawgraph"
#sample sizes
num_graphs = 5
num_sets_of_data = 10
#other things...
intervals = np.ceil(np.logspace(-2,1,50)*500)
noise = [np.sqrt(0.1),np.sqrt(0.01),np.sqrt(0.001),np.sqrt(0.0001)]
num_hard_workers = 5
hard_work_threshold = 4000
############################################################################################
#generate graphs
graphs  = []
for i in range(0,num_graphs):
    graphs.append(nx.powerlaw_cluster_graph(500,0.1,0.05))
#save them for later reference
filehandler = open(graphfile,'w')
pickle.dump(graphs,filehandler,-1)
filehandler.close()

#queues
easy_work = []
hard_work = []

#construct the items in the hard queue
l=0
for j in range(0,len(intervals)):
    for i in range(0,len(noise)):
        for k in range(0,num_graphs):
            if int(intervals[j]) <=hard_work_threshold:
                easy_work.append({'datapt': l,'graph': graphs
[k],'noise': noise[i],'number_of_sets_of_data':
num_sets_of_data,'number_of_data_points':int(intervals[j])})
            else:
                hard_work.append({'datapt': l,'graph': graphs
[k],'noise': noise[i],'number_of_sets_of_data':
num_sets_of_data,'number_of_data_points':int(intervals[j])})
            l+=1

result = np.zeros(l)

#create pool with all cores possible
worker_pool = Pool()

for i in xrange(0,len(easy_work)):
    worker_pool.apply_async(GP.RandomWalkGeneralizationErrorParallel,
(easy_work[i],),callback=cb)

worker_pool.close()
worker_pool.join()

#create hard work queue
worker_pool = Pool(processes = num_hard_workers)

for i in xrange(0,len(hard_work)):
    worker_pool.apply_async(GP.RandomWalkGeneralizationErrorParallel,
(hard_work[i],),callback=cb)

worker_pool.close()
worker_pool.join()


finaldata = result.reshape((len(intervals),len(noise),num_graphs))
np.save(savefile,finaldata)

================================================================================================================================================
GaussianProcessRegression.py
================================================================================================================================================
import CovarianceFunction as CF
import networkx as nx
import numpy as np
import scipy.linalg as sp
#fortran code from lapack-blas (hopefully when scipy updated this wont
be needed)
import dtrsv

#Currently we assume Gaussian noise TODO change to general noise
#Assume 0 mean TODO change to general mean Gaussian Process
class GaussianProcessRegression:
    def __init__(self,covariance_function,sigma):
        #a covariance function object defined in CovarianceFunction
class
        #note this uses the parent class but any children can be used
        self.C = covariance_function
        #a list of pts that are known and their values
        self.pts = []
        self.vals = []
        #the inverse of K as defined in
        #@book{coolen05:theoryofneural,
	    #ISBN = {0-19-853024-2},
	    #publisher = {Oxford University Press, USA},
	    #author = {Coolen, A. C. C. and K{\"{u}}hn, R. and Sollich, P.},
	    #title = {Theory of neural information processing systems},
	    #year = {2005},
        #}
        self.K = np.array([])
        #gaussian noise variable
        self.sigma = float(sigma)
        self.cholL = np.array([])


    def add_data_points(self,points,vals):
       #add all points to list
       self.pts += points
       self.vals += vals
       arraysize = len(self.pts)
       #construct K
       K = np.zeros((arraysize,arraysize))
       #for speed
       pts = self.pts
       between_points = self.C.between_points
       if len(self.K):
            K[:-1,:-1] = self.K
       for i in xrange(0,arraysize):
           for j in xrange(arraysize-len(points),arraysize):
                 K[i,j] = between_points(pts[i],pts[j])
                 K[j,i] = K[i,j]
       K[arraysize-len(points):arraysize,arraysize-len
(points):arraysize] =  K[arraysize-len(points):arraysize,arraysize-len
(points):arraysize] + self.sigma**2 * np.eye(len(points))
       self.K = K

    #calculate the prediction of a point based on data previously
given
    def point_prediction(self,points):
        mean = []
        variance =[]
        arraysize = len(self.pts)
        #cholesky
        #if self.cholL.shape[0] < arraysize:
        L=np.linalg.cholesky(self.K)
        #    self.cholL = L
        #else:
        #    L = self.cholL

        alpha = sp.cho_solve((L,1),self.vals)
        #create L in banded form
        k=np.zeros((arraysize,len(points)))
 
##################################################################
        #for speed get ref to functions im going to use and save them
        between_points = self.C.between_points
        pts = self.pts
        dot = np.dot
 
##################################################################
        for j in xrange(0,len(points)):
            #create k
            for i in xrange(0,arraysize):
                k[i,j] = between_points(pts[i],points[j])

        #calculate mean and variance
        #call the command for forward substitution
        ###############fortran
call#######################################
        v = dtrsv.dtrsv('L','N',arraysize,L,k)
 
##################################################################

        #result
        mean=dot(alpha,k)
        for i in xrange(0,len(points)):
            variance.append(between_points(points[i],points[i]) - dot(v
[:,i],v[:,i]))
        #return it in dictionary form
        return {'mean':mean,'variance':variance}


    # calculate the error for data given, where function is a vector
    # of the function evaluated at a sufficiently large number of
points
    # that the GPregression has been trying to learn
    def error(self,function):
        total = 0
        #sum up variances
        result = self.point_prediction(function[::2])
        total = np.sum(result['variance'])
        total = (1/float(len(function)/2))*total
        return total

    #clear what has been learnt so far
    def clear(self):
        self.pts = []
        self.vals = []
        self.K = np.array([])

    #calculate the average error for a function defined in function
when give
    #number_of_examples examples
    def average_error_over_samples(self,function, sample_size,
number_of_examples):
        avg = 0
        numberofpoints = len(function)/2
        for i in range(0,sample_size):
            self.clear()
            #generate points of the function
            permpts = np.random.randint
(0,numberofpoints,number_of_examples)
            #create the vectors
            pts = []
            vals = []
            for j in range(0,number_of_examples):
                pts.append(function[permpts[j]*2])
                vals.append(function[permpts[j]*2+1])

            #learn these points
            self.add_data_points(pts,vals)
            #print("points added")
            avg = avg + self.error(function)
        avg = avg/sample_size
        return avg






More information about the Python-list mailing list