[CentralOH] sockets auxiliary code
Neil Ludban
nludban at osc.edu
Mon May 24 16:22:04 CEST 2010
Save to a file and bring to the Hands-On Python meeting tonight.
#!/usr/bin/env python
import socket
import sys
NUL = chr(0)
BS = chr(8)
LF = chr(10)
CR = chr(13)
IAC = chr(255) # interpret as command
WILL = chr(251)
WONT = chr(252)
DO = chr(253)
DONT = chr(254)
OPT_ECHO = chr(1) # echoing for remote end
OPT_SGA = chr(3) # suppress-go-ahead
class TelnetProtocol(object):
def __init__(self):
self._buf = ''
def recv_data(self, data):
buf = self._buf + data
while buf:
if buf[0] == IAC:
if len(buf) < 2:
break
command = buf[1]
if command in ( WILL, WONT, DO, DONT ):
if len(buf) < 3:
break
option = buf[2]
self.got_option(command, option)
buf = buf[3:]
else:
raise NotImplementedError(ord(command))
elif buf[0] == CR:
if len(buf) < 2:
break
assert buf[1] in ( NUL, LF )
self.got_newline()
buf = buf[2:]
elif buf[0] == LF:
self.got_newline()
buf = buf[1:]
elif buf[0] == NUL:
buf = buf[1:]
elif chr(0x20) <= buf[0] < chr(0x7f):
self.got_ascii(buf[0])
buf = buf[1:]
else:
self.got_control(buf[0])
buf = buf[1:]
self._buf = buf
return
def got_option(self, command, option):
print 'TS: command=%i option=%i' % ( ord(command),
ord(option) )
def got_newline(self):
print 'TS: <NL>'
#sock.send(CR + NUL)
#sock.send(LF)
sock.send(CR + LF)
def got_control(self, c):
print 'TS: %r' % c
if c == chr(0x1b):
sys.exit(0)
elif c == chr(0x7f):
sock.send(BS + ' ' + BS)
def got_ascii(self, c):
print 'TS: "%c"' % c
sock.send(c.encode('rot13'))
class TelnetServer(object):
def __init__(self, host='127.0.0.1', port=0):
self._listen_sock = socket.socket(socket.AF_INET,
socket.SOCK_STREAM)
self._listen_sock.bind(( host, port ))
self._listen_sock.listen(5)
return
def process_connections(self):
print 'Waiting for connections to:', \
self._listen_sock.getsockname()
while True:
( accept_sock, peername ) = self._listen_sock.accept()
print 'Connection from:', peername
self.process_client(accept_sock)
def process_client(self, _sock):
global sock; sock = _sock
##sock.send(IAC + DONT + OPT_ECHO)
#sock.send(IAC + WILL + OPT_ECHO)
##sock.send(IAC + DO + OPT_SGA)
#sock.send(IAC + WILL + OPT_SGA)
#tp = TelnetProtocol()
while True:
data = sock.recv(100)
if not data:
break
sock.send('You said: %r' % data + CR + LF)
#tp.recv_data(data)
print 'Connection ended.'
sock.close()
return
ts = TelnetServer()
ts.process_connections()
#--#
More information about the CentralOH
mailing list