asyncore based port splitter code questions

George Trojan george.trojan at noaa.gov
Mon Jan 4 12:58:31 EST 2010


The following code is a attempt at port splitter: I want to forward data 
coming on tcp connection to several host/port addresses. It sort of 
works, but I am not happy with it. asyncore based code is supposed to be 
simple, but I need while loops and a lot of try/except clauses. Also, I 
had to add suspend/activate_channel methods in the Writer class that use 
variables with leading underscores. Otherwise the handle_write() method 
is called in a tight loop. I designed the code by looking at Python 2.3 
source for asyncore and originally wanted to use add_channel() and 
del_channel() methods. However in Python 2.6 del_channel() closes the 
socket in addition to deleting it from the map. I do not want to have 
one connection per message, the traffic may be high and there are no 
message delimiters. The purpose of this exercise is to split incoming 
operational data so I can test a new version of software.
Comments please - I have cognitive dissonance about the code, my little 
yellow rubber duck is of no help here.
The code is run as:

python2.6 afwdport.py 50002 localhost 50003 catbert 50001

where 50002 is the localhost incoming data port, (localhost, 50003) and 
(catbert, 50001) are destinations.

George

import asyncore, os, socket, sys, time

TMOUT = 10

#----------------------------------------------------------------------
def log_msg(msg):
     print >> sys.stderr, '%s: %s' % (time.ctime(), msg)

#----------------------------------------------------------------------
class Reader(asyncore.dispatcher):
     def __init__(self, sock, writers):
         asyncore.dispatcher.__init__(self, sock)
         self.writers = writers

     def handle_read(self):
         data = self.recv(1024)
         for writer in self.writers:
             writer.add_data(data)

     def handle_expt(self):
         self.handle_close()

     def handle_close(self):
         log_msg('closing reader connection')
         self.close()

     def writable(self):
         return False

#----------------------------------------------------------------------
class Writer(asyncore.dispatcher):
     def __init__(self, address):
         asyncore.dispatcher.__init__(self)
         self.address = address
         self.data = ''
         self.mksocket()

     def suspend_channel(self, map=None):
         fd = self._fileno
         if map is None:
             map = self._map
         if fd in map:
             del map[fd]

     def activate_channel(self):
         if self._fileno not in self._map:
             self._map[self._fileno] = self

     def mksocket(self):
         self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
         self.set_reuse_addr()
         self.connect(self.address)
         log_msg('connected to %s' % str(self.address))

     def add_data(self, data):
         self.data += data
         self.activate_channel()

     def handle_write(self):
         while self.data:
             log_msg('sending data to %s' % str(self.address))
             sent = self.send(self.data)
             self.data = self.data[sent:]
         self.suspend_channel()

     def handle_expt(self):
         err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
         log_msg(asyncore._strerror(err))
         self.handle_close()

     def handle_close(self):
         log_msg('closing writer connection')
         self.close()
         # try to reconnect
         time.sleep(TMOUT)
         self.mksocket()

     def readable(self):
         return False

#----------------------------------------------------------------------
class Dispatcher(asyncore.dispatcher):
     def __init__(self, port, destinations):
         asyncore.dispatcher.__init__(self)
         self.address = socket.gethostbyname(socket.gethostname()), port
         self.writers = [Writer(_) for _ in destinations]
         self.reader = None
         self.handle_connect()

     def handle_connect(self):
         self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
         self.bind(self.address)
         self.listen(1)
         log_msg('listening on %s' % str(self.address))

     def handle_accept(self):
         conn, addr = self.accept()
         log_msg('connection from %s' % str(addr))
         # current read connection not closed for some reason
         if self.reader:
             self.reader.close()
         self.reader = Reader(conn, self.writers)

     def cleanup(self):
         try:
             if self.reader:
                 self.reader.close()
         except socket.error, e:
             log_msg('error closing reader connection %s' % e)
         # writer might have unwatched connections
         for w in self.writers:
             try:
                 w.close()
             except socket.error, e:
                 log_msg('error closing writer connection %s' % e)

#----------------------------------------------------------------------
def main(port, destinations):
     disp = None
     try:
         # asyncore.loop() exits when input connection closes
         while True:
             try:
                 disp = Dispatcher(port, destinations)
		asyncore.loop(timeout=TMOUT, use_poll=True)
             except socket.error, (errno, e):
                 if errno == 98:
                     log_msg('sleeping %d s: %s', (30, e))
                     time.sleep(30)
     except BaseException, e:
         log_msg('terminating - uncaught exception: %s' % e)
         raise SystemExit
     finally:
         if disp:
             disp.cleanup()

#----------------------------------------------------------------------
if __name__ == '__main__':
     nargs = len(sys.argv)
     try:
         assert nargs > 3 and nargs % 2 == 0
         port = int(sys.argv[1])
         destinations = [(sys.argv[n], int(sys.argv[n+1])) \
             for n in range(2, nargs-1, 2)]
         main(port, destinations)
     except (AssertionError, ValueError), e:
         print 'Error: %s' % e
         print 'Usage: python %s local-port host port ...' % sys.argv[0]
         raise SystemExit(1)



More information about the Python-list mailing list