Running a python farm
Ian McConnell
ian at emit.demon.co.ukx
Mon Nov 3 07:23:39 EST 2003
Ian McConnell <ian at emit.demon.co.ukx> writes:
> What's the pythonic way of sending out a set of requests in parallel?
>
> My program throws an image at the server and then waits for the result. I'm
> currently using this bit of socket code to send an image to server on
> another machine.
Once again, thanks to all those who replied.
In the end I went with the select and poll method as I only have a small
number of machines to run my code on and this method fitted in easiest with
my existing program.
Actually, it also turns out that my code isn't quite as parallelised as I
thought it was and having results coming back in a different order confused
some of the later bookkeeping, so I've also done a version (pclient2) that
maintains the image order. Even with this limitation, I do get a good speed
up.
I'm sure this code can be made more efficient and is probably fairly easy to
deadlock, but it works for me.
import sys
import socket
import cPickle
import select
import string
class Machine:
def __init__(self, hostname):
colon = string.rfind(hostname, ':')
if colon >= 0:
self.host = hostname[0:colon]
self.port = int(hostname[colon+1:])
else:
self.host = hostname
self.port = 10000 # default port
# Should check for host being alive and possible endian issues
def connect(self):
return (self.host, self.port)
def __repr__(self):
return "%s:%d" % (self.host, self.port)
class Client:
def __init__(self, host, array, opts):
print 'NEW CLIENT', host, array.shape
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.host = host
sock.connect(host.connect())
sent = sock.sendall(cPickle.dumps((array, opts), 0))
sock.shutdown(1)
self.rfile = sock.makefile('rb')
def result(self):
r,w,e = select.select([self.rfile], [], [], 1)
if not r:
return None
else:
return cPickle.loads(self.rfile.read())
def __repr__(self):
return "%s" % self.host
def name(self):
return self.host
def pclient(hostnames, all_images, opts):
"""
hostnames is a list of machine names (as host:port) that are willing
to process data
all_image is a list of images
opts is a dictionary of processing options.
"""
client_list = []
result = []
# assign images to hosts
hosts = []
while len(hostnames) > 0:
host = Machine(hostnames.pop())
if host is not None:
hosts.append(host)
for host in hosts:
client_list.append(Client(host, all_images.pop(), opts))
# process
while client_list:
print len(client_list), 'clients busy'
next_client_list = []
for o in client_list:
res = o.result()
if res is not None:
result.append(res)
if all_images: # if we have images assign to completed host
next_client_list.append(Client(o.host, all_images.pop(),
opts))
else:
next_client_list.append(o)
client_list = next_client_list
return result
#
#
#
# Extra code to ensure results return in same order as submitted.
#
#
#
class fifo:
"""Simple implementation of a First-In-First-Out structure."""
def __init__(self):
self.in_stack = []
def push(self, obj):
self.in_stack.append(obj)
# print 'fifo push', self.in_stack
def pop(self):
return self.in_stack.pop(0)
def __repr__(self):
return str(self.in_stack)
def __len__(self):
return len(self.in_stack)
def head(self):
return self.in_stack[0]
class Workers:
def __init__(self, hosts, opts):
self.idle = hosts
self.busy = fifo()
self.opts = opts
def free(self):
return len(self.idle)
def newjob(self, array):
if array == [] or array == None:
# Delete idle list
self.idle = []
return
host = self.idle.pop()
self.busy.push(Client(host, array, self.opts))
def poll(self):
if len(self.busy) == 0:
return None
host = self.busy.head()
return host.result()
def done(self):
if len(self.busy) == 0:
return None
res = None
while res == None:
res = self.poll()
host = self.busy.pop()
self.idle.append(host.name())
print ' idle:', self.idle, ' busy:', self.busy
return res
def pclient2(hostnames, all_images, opts):
"""
hostnames is a list of machine names (as host:port) that are willing
to process data
all_image is a list of images
opts is a dictionary of processing options.
Returns results in same order as input.
"""
result = []
hosts = []
while len(hostnames) > 0:
host = Machine(hostnames.pop())
if host is not None:
hosts.append(host)
workers = Workers(hosts, opts)
while workers.free() > 0:
workers.newjob(all_images.pop())
print 'PCLIENT2 idle:', workers.idle, ' busy:', workers.busy
while 1:
res = workers.done()
if res == None:
break
result.append(res)
# Queue up another job if there are images left to process
if all_images:
workers.newjob(all_images.pop())
return result
--
"Thinks: I can't think of a thinks. End of thinks routine": Blue Bottle
** Aunty Spam says: Remove the trailing x from the To: field to reply **
More information about the Python-list
mailing list