UPnP client

Nikos Fotoulis nikofot at gmx.com
Tue Jun 14 05:35:24 EDT 2011


Hi.

Recently i needed some code to be able to listen on the public IP
address outside my modem router. Eventually, i came up with a
minimal UPnP implementation and because it seems to work and i'm
happy about it, i've decided to post it here at clpy in case
anybody else may have a use for it. You never know....

------------
# NAT Traversal via UPnP Port Mapping
# Written by Nikos Fotoulis <nikofot at gmx.com>
# This code is public domain.
#
# Tested on Thomsom TG858v7 modem router.
# UPnP is hairy. May not work with other routers
# Feedback is welcome.

import re, thread, socket, traceback as tb, random
from time import sleep
from urlparse import urlparse
from urllib import urlopen
import urllib2

VERBOSE = VVERBOSE = False
DEFAULT_ADDR = UPNPS = None

# regexes
rWANIP = re.compile (r"ST:[^\n]*(WAN(IP|PPP)Connection:\d+)", re.I).search
rLOCATION = re.compile (r"LoCaTiON:([^\n]+)", re.I).search
def rTAG (t):
	return re.compile ("<%s>(.+?)</%s>"%(t, t), re.I|re.DOTALL)
rSERVICE = rTAG ("service").findall

for tag in ["controlURL", "URLBase", "NewExternalIPAddress", "NewLeaseDuration", "NewProtocol",
	"NewInternalClient", "NewExternalPort", "NewInternalPort"]:
	def f (txt, r=rTAG (tag).search):
		x = r (txt)
		if x:
			return x. groups ()[0].strip ()
	if tag.startswith ("New"):
		tag = tag [3:]
	globals () ["r" + tag.upper ()] = f

# multicast and discover UPnP gateways
# Returns a dictionary where the keys are our "external IP" addresses
def DiscoverUPnP ():
	global UPNPS, DEFAULTGW, DEFAULTIFACE, DEFAULT_ADDR
	S = {}
	UPNPS = {}

	s = socket.socket (socket.AF_INET, socket.SOCK_DGRAM)
	s.setsockopt (socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
	#s.setsockopt (socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
	R = "M-SEARCH * HTTP/1.1\r\nHOST: 239.255.255.250:1900\r\nMAN: ssdp:discover\r\nMX: 10\r\nST: ssdp:all\r\n\r\n"
	try: s.sendto (R, ("239.255.255.250", 1900))
	except:
		print "UPnP gateways unreachable"
		return

	timeout = 5
	while 1:
		s.settimeout (timeout)
		try:
			data, addr = s.recvfrom (4096)
		except:
			break
		timeout = max (timeout * 0.5, 0.01)
		r = rWANIP (data)
		if r:
			service = r.groups ()[0]
			r = rLOCATION (data)
			if r:
				location = r.groups () [0].strip ()
				if VERBOSE:
					print "server:", addr, "supports", service, "at", location
				S [addr] = service, location
		if VVERBOSE: print "+"

	for userver, (service, location) in S.items ():
		up = urlparse (location)
		netloc = up.netloc
		if ":" in netloc: server, _, port = netloc.partition (":")
		else: server, port = netloc, "80"
		data = urlopen (location).read ()

		URLBase = rURLBASE (data) or "http://%s:%s"%(server, port)
		controlURL = None
		for x in rSERVICE (data):
			if service in x:
				controlURL = rCONTROLURL (x)
				break
		if controlURL:
			addr = GetExternalIP (service, URLBase + controlURL)
			if addr:
				s = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
				s.connect ((server, int (port)))
				thishost = s.getsockname () [0]
				s.close ()
				UPNPS [server] = addr, service, URLBase + controlURL, thishost
				if VERBOSE:
					print "for server:", server, "controlURL:", controlURL
		else:
			print "No controlURL found for server:", server

	# set defaults
	if len (UPNPS) == 1:
		k = UPNPS.items ()[0]
		DEFAULT_ADDR, DEFAULTGW, DEFAULTIFACE = k [1][0], k [0], k [1][3]
	else:
		print "Multiple UPnP gateways!"

	return UPNPS


# generic request POST data
def envelope (request, service, **kw):
	return """<?xml version="1.0"?>
<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<s:Body>
<u:%s xmlns:u="urn:schemas-upnp-org:service:%s">
"""%(request, service) + "\n".join (["<%s>%s</%s>"%(k,v,k) for k, v in kw.items ()]) + """ </u:%s>
</s:Body>
</s:Envelope>"""%request

def Request (service, URL, request, **kw):
	req = urllib2.Request (URL)
	req.add_header ("content-type",'text/xml; charset="utf-8"')
	req.add_header ("SOAPACTION", '"urn:schemas-upnp-org:service:%s#%s"'%(service, request))
	req.add_data (envelope (request, service, **kw))
	try: return urllib2.build_opener ().open (req).read ()
	except: return

def GetExternalIP (service, URL):
	answer = Request (service, URL, "GetExternalIPAddress")

	addr = answer and rEXTERNALIPADDRESS (answer)
	if not addr:
		print "Couldn't get external IP address!"
	return addr

## The 3 basic actions of UPnP : list entries, add a mapping, delete a mapping
## Notes (tested on Thomson TG585v7):
## - Some times AddMapping returns a fail code (500) but the
##   mapping *is* done and that can be seen by listing the entries (?!)
##   So, the only way to be sure is to: list entries, add mapping, list entries
##   and see the difference.
## - Returned LeaseDuration seems to be in deci-seconds

def getEntries (service, URL):
	pmi = 0
	while 1:
		answer = Request (service, URL, "GetGenericPortMappingEntry", NewPortMappingIndex=pmi)
		if not answer:
			break
		yield answer
		pmi += 1

def listMappings (gw=None):
	_, service, URL, iface = UPNPS [gw or DEFAULTGW]
	L = []
	for a in getEntries (service, URL):
		if rPROTOCOL (a) == "TCP" and rINTERNALCLIENT (a) == iface:
			L.append ((int (rEXTERNALPORT (a)), int (rINTERNALPORT (a)),
				 int (rLEASEDURATION (a)) / 10.0))
		else: print "strange entry response!", a
	return L

def addMapping (local_port, public_port, ttl, gw=None):
	_, service, URL, iface = UPNPS [gw or DEFAULTGW]

	# test if port already mapped. Result of AddMapping is unreliable
	for eport, iport, _ in listMappings (gw):
		if eport == public_port and iport != local_port:
			return

	answer = Request (service, URL, "AddPortMapping",
		NewEnabled="1", NewRemoteHost="", NewLeaseDuration=ttl, NewInternalPort=local_port,
		NewExternalPort=public_port, NewProtocol="TCP", NewInternalClient=iface,
		NewPortMappingDescription="IndependNet")
	if answer:
		return True

	# test if mapped. Result of AddMapping is unreliable
	for eport, iport, _ in listMappings (gw):
		if eport == public_port and iport == local_port:
			return True

def delMapping (public_port, gw=None):
	_, service, URL, _ = UPNPS [gw or DEFAULTGW]
	if public_port != "all":
		Request (service, URL, "DeletePortMapping",
			NewRemoteHost="", NewExternalPort=public_port, NewProtocol="TCP")
	else:
		for public_port, _, _ in listMappings (gw):
			Request (service, URL, "DeletePortMapping",
				NewRemoteHost="", NewExternalPort=public_port, NewProtocol="TCP")

##
## Socket compatible interface for accepting connections on an external port.
## Does mapping keepalive every 60sec to make sure the mapping is not kept
## indefinately if our application crashes and didn't manage to remove it.
##

LEASE_DURATION = 60

def Accept (port):
	if not port:
		port = random.randint (2000, 60000)

	if UPNPS is None:
		DiscoverUPnP ()

	if not UPNPS:
		raise Error ("No UPnP gateway found. Can't listen ouside the modem")

	s = socket.socket ()
	s.bind ((DEFAULTIFACE, 0))
	inport = s.getsockname ()[1]
	if not addMapping (inport, port, LEASE_DURATION):
		raise Error ("Port Mapping to external port %i Failed"%port)
	s.listen (2)
	return Acceptor (s, port, inport)

class UPnPError:
	pass

class Acceptor:
	def __init__ (self, sock, eport, iport):
		self.sock, self.eport, self.iport = sock, eport, iport
		self.port = eport
		self.active = True
		thread.start_new_thread (self.keepalive, ())

	def __iter__ (self):
		while self.active:
			yield self.sock.accept ()

	def keepalive (self):
		while 1:
			ttl = None
			for eport, iport, ttl in listMappings ():
				if eport == self.eport and iport == self.iport:
					break
			##print "Lease up for:", ttl
			st = 0.1
			if ttl is not None:
				st = max (ttl - 0.1, 0.1)
			sleep (st)
			if not self.active: break
			if not addMapping (self.iport, self.eport, LEASE_DURATION):
				if ttl is None:
					self.active = False
					print "Failed to Keepalive the lease"

	def __del__ (self):
		self.active = False
		self.sock.close ()
		delMapping (self.eport)

## main. UPnP manager & testing

USAGE = """UPnP NAT Traversal (port mapping) test
Usage: python upnp.py [-gw gw] {list|bind|del} <arguments>

 upnp list
	list mappings
 upnp bind internal-port external-port time-to-live
	map public port to local port for some time
 upnp del external-port|"all"
	remove a port mapping
 upnp
	discover gateways and external IP addresses

Common options:
 -gw 		: select UPnP gateway (if more than one -- NOT IMPLEMENTED)
"""

if __name__ == "__main__":
	import sys
	args = sys.argv [1:]
	if "--help"  in args:
		print USAGE
		exit ()

	VERBOSE = True
	print "Discovering UPnP gateways..."
	DiscoverUPnP ()
	for gw, v in UPNPS.items ():
		ip, service, URL, iface = v
		print "External IP:", ip 
		print "\tgateway:", gw
		print "\tservice:", service
		print "\tcontrol URL:", URL
		print "\tinterface:", iface


	if not UPNPS:
		exit ("No UPnP gateway found")
	if not args:
		exit ()

	cmd = args.pop (0)
	gw = None

	if cmd == "list":
		print "Port Mappings:"
		for ep, ip, ttl in listMappings (gw):
			print "\t%i <- %i (ttl=%i)"%(ip, ep, ttl)
	elif cmd == "bind":
		iport, eport, ttl = args
		iport, eport, ttl = int (iport), int (eport), int (ttl)
		if addMapping (iport, eport, ttl, gw):
			print "OK"
		else: print "Failed. Port already used, or implementation error"
	elif cmd == "del":
		eport, = args
		delMapping (eport, gw)
	else:
		print USAGE




More information about the Python-list mailing list