[Medusa-dev] Improving main loop

Alexander Semenov sav at ulj.menatepspb.com
Wed Dec 11 17:41:18 EST 2002


Hi,

I work on tcp/ip server. It must serve about 100 client connections,
and make two connections to upstream server to suck some info.
Medusa looks like ideal solution for it. Perfomance is perfect, I think
it is because of single threaded server

But I want some additional functionality from asyncore/asynchat/medusa
1. Timers. I need callbacks in my channels which called every number of
   seconds. Something like standart module sched tied with asyncore.loop()
   I need timer to send heartbeat messsages, and looks for clients which was
   timedout.
2. Autoreconnect. I want my outgoing connections to upstream server
   reconnects if become broken. My application is a server, but it must
   maintain client connection to upstream server, and it must run forever.
3. Simplify channel protocol. Why we write accepting server for each of
   channel types? They all looks the same. I think, most channels need
   four callbacks: On_Init(self), On_Connected(self), On_Disconnected(self)
   and On_DataArrived(self, data) plus timer signals. All network API
   for channels is Send(self, data) and Close(self). Channel can be started
   in two modes: Listen(addr, channel) and Connect(addr, channel).

Maybe sombody succesful in realizing with features in medusa?
I made quick hacks and now have working server, but dislike to it.Then, I
attempt to write my own framework stealing ideas from asyncore, asynchat
and sched. Now it is very slow. I have no idea why.

I attached it. Maybe sombody will look to it and send me some
critics/comments?
Why it is so slow? How I can make this functionality in medusa? What
calls/callbacks I forget in my protocol (described in paragraph 3)?

Sorry for terrible English,
Alexeander Semenov.
-------------- next part --------------
'''TCPDispatcher - tcp/ip server which dispatchs channels
Ideas stealed from asyncore, asynchat and sched

User must derive his channels from Channel. It reacts on 
    def On_Init(self): pass
    def On_Connected(self): pass
    def On_Disconnected(self): pass
    def On_DataArrived(self, data): pass
and can use:
    def Close(self): 
    def Send(self, data): 
then, user can install his channel with
    Listen()
    Connect()
and finally, run loop:
    Serve()
'''

import time
import socket
from bisect import insort
from select import select, error
from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, EISCONN, EINTR


TIMEOUT = 15 # Timeout for connection in seconds

_sockets = {}
_crontab = []


class Channel:
    def __init__(self, sock, addr):
        self._addr = addr
        self._newsocket(sock)
        self._outbuff = []
        self._inbuff = []
        self._terminator = None
        self.On_Init()

    def __repr__(self):
        ret = 'to %s:%d' % self._addr
        if self._connected: ret = 'connected '+ret
        ret = '<%s %s at %#x>' % (self.__class__.__name__, ret, id(self))
        return ret

    def _newsocket(self, sock=0):
        global _sockets

        try: self._socket.close()
        except: pass
        try: del _sockets[self._socket.fileno()]
        except: pass

        self._connected = 1
        if not sock:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self._connected = 0
        self._socket = sock
        _sockets[sock.fileno()] = self
        self._disconnected = time.time() - TIMEOUT

    def _killtimers(self):
        global _crontab
        _crontab = [t for t in _crontab if not t[2].im_self is self]

    def handle_read_event(self):
        if not self._connected:
            self._connected = 1
            self.On_Connected()
        try:
            data = self._socket.recv(4096)
        except socket.error:
            data = ''
        if data: 
            if not self._terminator:
                self.On_DataArrived(data)
                return
            pos = data.find(self._terminator[-1:])
            if pos < 0:
                self._inbuff.append(data)
                return
            data = ''.join(self._inbuff)+data
            while 1:
                pos = data.find(self._terminator)
                if pos < 0: 
                    break
                else:
                    l = len(self._terminator)
                    self.On_DataArrived(data[:pos+l])
                    data = data[pos+l:]
            self._inbuff = [data]
        else:
            data = ''.join(self._inbuff)
            if data: self.On_DataArrived(data)
            self._connected = 0
            self.On_Disconnected()
            try: del _sockets[self._socket.fileno()]
            except: pass
            try: self._socket.close()
            except: pass
            if self._reconnectable:
                self._newsocket()
            else:
                self._killtimers()

    def handle_write_event(self):
        if not self._connected:
            self._connected = 1
            self.On_Connected()
        out = ''.join(self._outbuff)
        sent = self._socket.send(out)
        out = out[sent:]
        if out:
            self._outbuff = [out]
        else:
            self._outbuff = []

    # API for Cahannel
    def Close(self): 
        global _sockets

        try:
            self._socket.send(''.join(self._outbuff))
        except:
            pass
        self._killtimers()
        self._reconnectable = 0
        try:
            del _sockets[self._socket.fileno()]
        except KeyError:
            pass
        except socket.error:
            pass
        try:
            self._socket.close()
        except:
            pass

    def Send(self, data): 
        self._outbuff.append(data)

    def SetTerminator(self, term=None):
        self._terminator = term

    # Channel callbacks
    def On_Init(self): pass
    def On_Connected(self): pass
    def On_Disconnected(self): pass
    def On_DataArrived(self, data): pass


def Shutdown():
    global _sockets, _crontab
    #XXX close connections here
    for srv in _sockets.values():
        try:
            srv._socket.close()
        except:
            pass
    _sockets = _crontab = {}

class Listen:
    def __init__(self, host, port, channel):
        global _sockets

        self._addr = (host, port)
        self._channel = channel
        self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self._socket.setblocking(0)
        _sockets[self._socket.fileno()] = self
        self._outbuff = []
        try:
            self._socket.setsockopt(
                socket.SOL_SOCKET, socket.SO_REUSEADDR,
                self._socket.getsockopt(socket.SOL_SOCKET, 
                    socket.SO_REUSEADDR) | 1
            )
        except: pass
        self._socket.bind(self._addr)
        self._reconnectable = 0
        self._socket.listen(5)

    def __repr__(self):
        return '<Listener for %s on %s:%d at %#x>' % (
            self._channel.__name__, self._addr[0], self._addr[1], id(self))

    def handle_read_event(self):
        srv = self._channel(*self._socket.accept())
        srv._connected = 1
        srv._reconnectable = 0
        srv.On_Connected()


def Connect(host, port, channel):
    global _sockets

    srv = channel(0, (host, port))
    srv._connected, srv._disconnected, srv._reconnectable = 0, 0, 1


def Schedule(time_, repeat, func, *args, **kwargs):
    global _crontab
    if not time_: 
        time_ = time.time()+repeat
    insort(_crontab, (time_, repeat, func, args, kwargs))

def process_timers():
    global _crontab    

    while _crontab:
        if _crontab[0][0]<time.time():
            time_, repeat, func, args, kwargs = _crontab.pop(0)
            if repeat:
                Schedule(time_+repeat, repeat, func, *args, **kwargs)
            func(*args, **kwargs)
        else:
            return _crontab[0][0]-time.time()
    return 0

def watch_connections():
    global _sockets

    dead = []
    for fd, srv in _sockets.items():
        if not srv._reconnectable or \
           srv._connected or \
           time.time()-srv._disconnected < TIMEOUT:
            continue
        err = srv._socket.connect_ex(srv._addr)
        if err in (EINPROGRESS, EALREADY, EWOULDBLOCK):
            continue
        if err in (0, EISCONN):
            srv._connected = 1
            srv.On_Connected()
        else:
            dead.append(srv)
    for srv in dead:
        srv._newsocket()

def Serve(): # Main loop
    global _sockets, _crontab

    while _sockets or _crontab:

        # Connect new and reconnect broken channels
        watch_connections()

        # Do timer events
        delay = process_timers()

        wsocks = [fd for fd, srv in _sockets.items() if srv._outbuff]

        try:
            rsocks, wsocks, _ = select(_sockets.keys(), wsocks, [], delay)
        except error, err:
            if err[0] == EINTR: continue
            if err[0] == 10038: continue
            raise

        [_sockets[rsock].handle_read_event() for rsock in rsocks]
        [_sockets[wsock].handle_write_event() for wsock in wsocks]


More information about the Medusa-dev mailing list