[Pypi-checkins] r922 - trunk/pypi
martin.von.loewis
python-checkins at python.org
Sun Jun 19 12:43:59 CEST 2011
Author: martin.von.loewis
Date: Sun Jun 19 12:43:59 2011
New Revision: 922
Modified:
trunk/pypi/openid2rp.py
Log:
Update to openid2rp 1.9: support XRIs.
Modified: trunk/pypi/openid2rp.py
==============================================================================
--- trunk/pypi/openid2rp.py (original)
+++ trunk/pypi/openid2rp.py Sun Jun 19 12:43:59 2011
@@ -157,6 +157,42 @@
if attrs.get('http-equiv','').lower() == 'x-xrds-location':
self.xrds_location = attrs['content']
+def _extract_services(doc):
+ for svc in doc.findall(".//{xri://$xrd*($v*2.0)}Service"):
+ services = [x.text for x in svc.findall("{xri://$xrd*($v*2.0)}Type")]
+ if 'http://specs.openid.net/auth/2.0/server' in services:
+ # 7.3.2.1.1 OP Identifier Element
+ uri = svc.find("{xri://$xrd*($v*2.0)}URI")
+ if uri is not None:
+ op_local = None
+ op_endpoint = uri.text
+ break
+ elif 'http://specs.openid.net/auth/2.0/signon' in services:
+ # 7.3.2.1.2. Claimed Identifier Element
+ op_local = svc.find("{xri://$xrd*($v*2.0)}LocalID")
+ if op_local is not None:
+ op_local = op_local.text
+ uri = svc.find("{xri://$xrd*($v*2.0)}URI")
+ if uri is not None:
+ op_endpoint = uri.text
+ break
+ elif 'http://openid.net/server/1.0' in services or \
+ 'http://openid.net/server/1.1' in services or \
+ 'http://openid.net/signon/1.0' in services or \
+ 'http://openid.net/signon/1.1' in services:
+ # 14.2.1 says we also need to check for the 1.x types;
+ # XXX should check 1.x only if no 2.0 service is found
+ op_local = svc.find("{http://openid.net/xmlns/1.0}Delegate")
+ if op_local is not None:
+ op_local = op_local.text
+ uri = svc.find("{xri://$xrd*($v*2.0)}URI")
+ if uri is not None:
+ op_endpoint = uri.text
+ break
+ else:
+ return None # No OpenID 2.0 service found
+ return services, op_endpoint, op_local
+
def discover(url):
'''Perform service discovery on the OP URL.
Return list of service types, and the auth/2.0 URL,
@@ -226,44 +262,41 @@
elif content_type == 'application/xrds+xml':
# Yadis 6.2.5 option 4
doc = ElementTree.fromstring(data)
- for svc in doc.findall(".//{xri://$xrd*($v*2.0)}Service"):
- services = [x.text for x in svc.findall("{xri://$xrd*($v*2.0)}Type")]
- if 'http://specs.openid.net/auth/2.0/server' in services:
- # 7.3.2.1.1 OP Identifier Element
- uri = svc.find("{xri://$xrd*($v*2.0)}URI")
- if uri is not None:
- op_local = None
- op_endpoint = uri.text
- break
- elif 'http://specs.openid.net/auth/2.0/signon' in services:
- # 7.3.2.1.2. Claimed Identifier Element
- op_local = svc.find("{xri://$xrd*($v*2.0)}LocalID")
- if op_local is not None:
- op_local = op_local.text
- uri = svc.find("{xri://$xrd*($v*2.0)}URI")
- if uri is not None:
- op_endpoint = uri.text
- break
- elif 'http://openid.net/server/1.0' in services or \
- 'http://openid.net/server/1.1' in services or \
- 'http://openid.net/signon/1.0' in services or \
- 'http://openid.net/signon/1.1' in services:
- # 14.2.1 says we also need to check for the 1.x types;
- # XXX should check 1.x only if no 2.0 service is found
- op_local = svc.find("{http://openid.net/xmlns/1.0}Delegate")
- if op_local is not None:
- op_local = op_local.text
- uri = svc.find("{xri://$xrd*($v*2.0)}URI")
- if uri is not None:
- op_endpoint = uri.text
- break
- else:
- return None # No OpenID 2.0 service found
+ return _extract_services(doc)
else:
# unknown content type
return None
return services, op_endpoint, op_local
+def resolve_xri(xri, proxy='xri.net'):
+ '''Perform XRI resolution of xri using a proxy resolver.
+ Return canonical ID, services, op endpoint, op local;
+ return None if an error occurred'''
+ xri = urllib.quote(xri, safe='=@*!+()')
+ conn = httplib.HTTPConnection(proxy)
+ try:
+ conn.connect()
+ except:
+ # DNS or TCP error
+ return None
+ conn.putrequest("GET", '/'+xri+'?_xrd_r=application/xrds+xml')
+ conn.endheaders()
+
+ res = conn.getresponse()
+ data = res.read()
+ conn.close()
+
+ doc = ElementTree.fromstring(data)
+ res = _extract_services(doc)
+ if res is None:
+ # No OpenID service found
+ return None
+ services, op_endpoint, op_local = res
+ canonical_id = doc.find(".//{xri://$xrd*($v*2.0)}CanonicalID")
+ if canonical_id is None:
+ return None
+ return canonical_id.text, services, op_endpoint, op_local
+
def is_compat_1x(services):
for uri in ('http://specs.openid.net/auth/2.0/signon',
'http://specs.openid.net/auth/2.0/server'):
@@ -339,6 +372,8 @@
data['openid.session_type'] = ''
del data['openid.ns']
res = urllib.urlopen(url, urllib.urlencode(data))
+ if res.getcode() != 200:
+ raise ValueError, "OpenID provider refuses connection with status %d" % res.getcode()
data = parse_response(res.read())
if 'error' in data:
raise ValueError, "associate failed: "+data['error']
@@ -407,7 +442,7 @@
if claimed is None:
claimed = "http://specs.openid.net/auth/2.0/identifier_select"
if op_local is None:
- op_local = "http://specs.openid.net/auth/2.0/identifier_select"
+ op_local = claimed
if realm is None:
realm = return_to
data = {
@@ -528,7 +563,7 @@
return signed
def parse_nonce(nonce):
- '''Split a nonce into a (timestamp, ID) pair'''
+ '''Extract a datetime.datetime stamp from the nonce'''
stamp = nonce.split('Z', 1)[0]
stamp = time.strptime(stamp, "%Y-%m-%dT%H:%M:%S")[:6]
stamp = datetime.datetime(*stamp)
More information about the Pypi-checkins
mailing list