Threading Pool Event()

Graeme Matthew gsmatthew at ozemail.com.au
Sat Jul 19 22:02:45 CEST 2003


guys thanks so much you have all been a great help

can t belive how easy it was in the end

here is my code, please criticise it to death i wont take offence

import threading, socket, sys
from Queue import Queue
from string import strip

#TODO:  Exception Handling

#ON A MULTIPLE CPU MACHINE 2 SEPERATE SERVERS SHOULD BE RUN
#THE INCOMING REQUEST WILL NEED TO BE SWITCHED BETWEEN SERVERS
#THIS IS DUE TO THE GLOBAL INTERPRETER LOCK (GIL) IN PYTHON

#-------------- global constants --------------------------------
THREAD_POOL_SIZE    = 5             #The number of threads to run
                                    #WARNING increasing threads does not
mean increased performance
BUFFER_SIZE         = 1024          #Buffer size for socket
PORT                = 6000          #Port server will listen on
HOST                = "localhost"
ADDR                = (HOST, PORT)
BACKLOG             = 5             #Backlog for server listen
#----------------------------------------------------------------

class Job:

    def __init__(self, csocket, addr):

        self.csocket = csocket      #Client Socket
        self.address = str(addr)    #Address (Host:Port) Pair

class ThreadPool:

    #TODO: investigate if Queue can be inherited ????

    def __init__(self):

        self.pool           = []        #Pool of threads
        self.queue          = Queue()   #Job stack FIFO

        #Initialise each worker thread and start
        for i in range(THREAD_POOL_SIZE):

            uq = UserRequestHandler(i,self)
            self.pool.append(uq)
            self.pool[i].start()

class UserRequestHandler(threading.Thread):

    def __init__(self, tid, tpool):

        threading.Thread.__init__(self)

        self.tid = tid                      #thread id so i can test what is
executing
        self.tpool = tpool                  #Reference to the threading pool

    def run(self):

        while 1:

            #keep calling queue, get method is blocking
            job = self.tpool.queue.get()
            self.execute(job)

    def execute(self,job):

        bufferChars = []
        data        = ''
        length      = 0
        recieved    = 0

        print "%d CONNECTED %s" % (self.tid , job.address)

        #Read in the first 20 bytes, all communication with this server
needs
        #to place a 20 byte header into it, i.e. netstring type concept

        header = job.csocket.recv(20)

        #if a possible header was retrieved
        if header:

            #remove any pads to make up 20 bytes
            header = strip(header)

            #calculate length of the header. if error occurs
            #then its an invalid header
            #TO DO: If an error occurs create XML error string
            #       send to view to produce error html
            try:
                length = long(header)
            except:
                job.csocket.send('Invalid Header')
            else:

                #Go into a continous loop until all the
                #bytes are recieved from the client

                while recieved != length: #keep sucking in data until header
len matches

                    buffer = job.csocket.recv(BUFFER_SIZE)
                    recieved += len(buffer)   #Record total length received
                    bufferChars.append(buffer)#add buffer to array

                data = data.join(bufferChars) #Convert buffer array into
string
                buffersChars = None           #Destroy buffer array, we dont
need it no more

                job.csocket.send("Server received")


        if job.csocket:
            job.csocket.shutdown(2)
            job.csocket.close()


def startServer():

    tpool = ThreadPool()
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(ADDR)
    server.listen(BACKLOG)

    print """
    Server running on %s
    Python %s
    Server listening for requests with %d threads in pool

    """ % (sys.platform,sys.version,THREAD_POOL_SIZE)

    while 1:

        (s, addr) = server.accept()
        job = Job(s,addr)
        tpool.queue.put(job)



if __name__ == '__main__':

    startServer()


"Peter Hansen" <peter at engcorp.com> wrote in message
news:3F195170.E60FB1A6 at engcorp.com...
> Graeme Matthew wrote:
> >
> > ok so code like this is perfectly safe
> >
> > def run(self):
> >
> >     while 1:
> >
> >         job = queue.get()
> >
> >         __processjob()
>
> It's almost like you aren't even seeing Aahz' replies. ;-)
>
> The above is certainly safe, but can't be terminated easily.
> Just use the loop Aahz showed, which is the above plus the
> ability to terminate.
>
> -Peter






More information about the Python-list mailing list