[CentralOH] sockets auxiliary code

Neil Ludban nludban at osc.edu
Mon May 24 16:22:04 CEST 2010


Save to a file and bring to the Hands-On Python meeting tonight.



#!/usr/bin/env python

import socket
import sys

NUL	= chr(0)
BS	= chr(8)
LF	= chr(10)
CR	= chr(13)

IAC	= chr(255)	# interpret as command

WILL	= chr(251)
WONT	= chr(252)
DO	= chr(253)
DONT	= chr(254)

OPT_ECHO = chr(1)	# echoing for remote end
OPT_SGA	= chr(3)	# suppress-go-ahead


class TelnetProtocol(object):

    def __init__(self):
        self._buf = ''

    def recv_data(self, data):
        buf = self._buf + data
        while buf:
            if buf[0] == IAC:
                if len(buf) < 2:
                    break
                command = buf[1]
                if command in ( WILL, WONT, DO, DONT ):
                    if len(buf) < 3:
                        break
                    option = buf[2]
                    self.got_option(command, option)
                    buf = buf[3:]
                else:
                    raise NotImplementedError(ord(command))
            elif buf[0] == CR:
                if len(buf) < 2:
                    break
                assert buf[1] in ( NUL, LF )
                self.got_newline()
                buf = buf[2:]
            elif buf[0] == LF:
                self.got_newline()
                buf = buf[1:]
            elif buf[0] == NUL:
                buf = buf[1:]
            elif chr(0x20) <= buf[0] < chr(0x7f):
                self.got_ascii(buf[0])
                buf = buf[1:]
            else:
                self.got_control(buf[0])
                buf = buf[1:]
        self._buf = buf
        return

    def got_option(self, command, option):
        print 'TS: command=%i option=%i' % ( ord(command),
                                             ord(option) )

    def got_newline(self):
        print 'TS: <NL>'
        #sock.send(CR + NUL)
        #sock.send(LF)
        sock.send(CR + LF)

    def got_control(self, c):
        print 'TS: %r' % c
        if c == chr(0x1b):
            sys.exit(0)
        elif c == chr(0x7f):
            sock.send(BS + ' ' + BS)

    def got_ascii(self, c):
        print 'TS: "%c"' % c
        sock.send(c.encode('rot13'))


class TelnetServer(object):

    def __init__(self, host='127.0.0.1', port=0):
        self._listen_sock = socket.socket(socket.AF_INET,
                                          socket.SOCK_STREAM)
        self._listen_sock.bind(( host, port ))
        self._listen_sock.listen(5)
        return

    def process_connections(self):
        print 'Waiting for connections to:', \
              self._listen_sock.getsockname()
        while True:
            ( accept_sock, peername ) = self._listen_sock.accept()
            print 'Connection from:', peername
            self.process_client(accept_sock)

    def process_client(self, _sock):
        global sock; sock = _sock
        ##sock.send(IAC + DONT + OPT_ECHO)
        #sock.send(IAC + WILL + OPT_ECHO)
        ##sock.send(IAC + DO + OPT_SGA)
        #sock.send(IAC + WILL + OPT_SGA)
        #tp = TelnetProtocol()
        while True:
            data = sock.recv(100)
            if not data:
                break
            sock.send('You said: %r' % data + CR + LF)
            #tp.recv_data(data)
        print 'Connection ended.'
        sock.close()
        return


ts = TelnetServer()
ts.process_connections()

#--#



More information about the CentralOH mailing list