Running a python farm

Ian McConnell ian at emit.demon.co.ukx
Mon Nov 3 07:23:39 EST 2003


Ian McConnell <ian at emit.demon.co.ukx> writes:

> What's the pythonic way of sending out a set of requests in parallel?
>
> My program throws an image at the server and then waits for the result. I'm
> currently using this bit of socket code to send an image to server on
> another machine.

Once again, thanks to all those who replied. 

In the end I went with the select and poll method as I only have a small
number of machines to run my code on and this method fitted in easiest with
my existing program.

Actually, it also turns out that my code isn't quite as parallelised as I
thought it was and having results coming back in a different order confused
some of the later bookkeeping, so I've also done a version (pclient2) that
maintains the image order. Even with this limitation, I do get a good speed
up.

I'm sure this code can be made more efficient and is probably fairly easy to
deadlock, but it works for me.




import sys
import socket
import cPickle
import select
import string

class Machine:
    def __init__(self, hostname):
        colon = string.rfind(hostname, ':')
        if colon >= 0:
            self.host = hostname[0:colon]
            self.port = int(hostname[colon+1:])
        else:
            self.host = hostname
            self.port = 10000 # default port
        # Should check for host being alive and possible endian issues
    def connect(self):
        return (self.host, self.port)
    def __repr__(self):
        return "%s:%d" % (self.host, self.port)

class Client:
    def __init__(self, host, array, opts):
        print 'NEW CLIENT', host, array.shape
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.host = host
        sock.connect(host.connect())
        sent = sock.sendall(cPickle.dumps((array, opts), 0))
        sock.shutdown(1)
        self.rfile = sock.makefile('rb')

    def result(self):
        r,w,e = select.select([self.rfile], [], [], 1)
        if not r:
            return None
        else:
            return cPickle.loads(self.rfile.read())
    def __repr__(self):
        return "%s" % self.host
    def name(self):
        return self.host


def pclient(hostnames, all_images, opts):
    """
hostnames is a list of machine names (as host:port) that are willing
          to process data
all_image is a list of images
opts      is a dictionary of processing options.
"""
      
    client_list = []
    result = []
    # assign images to hosts
    hosts = []
    while len(hostnames) > 0:
        host = Machine(hostnames.pop())
        if host is not None:
            hosts.append(host)
    for host in hosts:
        client_list.append(Client(host, all_images.pop(), opts))

    # process
    while client_list:
        print len(client_list), 'clients busy'
        next_client_list = []
        for o in client_list:
            res = o.result()
            if res is not None:
                result.append(res)
                if all_images: # if we have images assign to completed host
                    next_client_list.append(Client(o.host, all_images.pop(),
                                                   opts))
            else:
                next_client_list.append(o)
            client_list = next_client_list
    return result



#
#
#
# Extra code to ensure results return in same order as submitted.
#
#
#


class fifo: 
    """Simple implementation of a First-In-First-Out structure."""
    def __init__(self):
        self.in_stack = []
    def push(self, obj):
        self.in_stack.append(obj)
        # print 'fifo push', self.in_stack
    def pop(self):
        return self.in_stack.pop(0)
    def __repr__(self):
        return str(self.in_stack)
    def __len__(self):
        return len(self.in_stack)
    def head(self):
        return self.in_stack[0]

class Workers:
    def __init__(self, hosts, opts):
        self.idle = hosts
        self.busy = fifo()
        self.opts = opts

    def free(self):
        return len(self.idle)

    def newjob(self, array):
        if array == [] or array == None:
            # Delete idle list 
            self.idle = []
            return
        host = self.idle.pop()
        self.busy.push(Client(host, array, self.opts))

    def poll(self):
        if len(self.busy) == 0:
            return None
        host = self.busy.head()
        return host.result()

    def done(self):
        if len(self.busy) == 0:
            return None
        res = None
        while res == None:
            res = self.poll()
        host = self.busy.pop()
        self.idle.append(host.name())
        print '  idle:', self.idle, '    busy:', self.busy
        return res

def pclient2(hostnames, all_images, opts):
    """
hostnames is a list of machine names (as host:port) that are willing
          to process data
all_image is a list of images
opts      is a dictionary of processing options.
Returns results in same order as input.
"""
    result = []
    hosts = []
    while len(hostnames) > 0:
        host = Machine(hostnames.pop())
        if host is not None:
            hosts.append(host)
    workers = Workers(hosts, opts)

    while workers.free() > 0:
        workers.newjob(all_images.pop())

    print 'PCLIENT2 idle:', workers.idle, '    busy:', workers.busy
    while 1:
        res = workers.done()
        if res == None:
            break
        result.append(res)
        # Queue up another job if there are images left to process
        if all_images:
            workers.newjob(all_images.pop())
    return result


-- 
 "Thinks: I can't think of a thinks. End of thinks routine": Blue Bottle

** Aunty Spam says: Remove the trailing x from the To: field to reply **




More information about the Python-list mailing list