[Pypi-checkins] r922 - trunk/pypi

martin.von.loewis python-checkins at python.org
Sun Jun 19 12:43:59 CEST 2011


Author: martin.von.loewis
Date: Sun Jun 19 12:43:59 2011
New Revision: 922

Modified:
   trunk/pypi/openid2rp.py
Log:
Update to openid2rp 1.9: support XRIs.


Modified: trunk/pypi/openid2rp.py
==============================================================================
--- trunk/pypi/openid2rp.py	(original)
+++ trunk/pypi/openid2rp.py	Sun Jun 19 12:43:59 2011
@@ -157,6 +157,42 @@
         if attrs.get('http-equiv','').lower() == 'x-xrds-location':
             self.xrds_location = attrs['content']
 
+def _extract_services(doc):
+    for svc in doc.findall(".//{xri://$xrd*($v*2.0)}Service"):
+        services = [x.text for x in svc.findall("{xri://$xrd*($v*2.0)}Type")]
+        if 'http://specs.openid.net/auth/2.0/server' in services:
+            # 7.3.2.1.1 OP Identifier Element
+            uri = svc.find("{xri://$xrd*($v*2.0)}URI")
+            if uri is not None:
+                op_local = None
+                op_endpoint = uri.text
+                break
+        elif 'http://specs.openid.net/auth/2.0/signon' in services:
+            # 7.3.2.1.2.  Claimed Identifier Element
+            op_local = svc.find("{xri://$xrd*($v*2.0)}LocalID")
+            if op_local is not None:
+                op_local = op_local.text
+            uri = svc.find("{xri://$xrd*($v*2.0)}URI")
+            if uri is not None:
+                op_endpoint = uri.text
+                break
+        elif 'http://openid.net/server/1.0' in services or \
+                'http://openid.net/server/1.1' in services or \
+                'http://openid.net/signon/1.0' in services or \
+                'http://openid.net/signon/1.1' in services:
+            # 14.2.1 says we also need to check for the 1.x types;
+            # XXX should check 1.x only if no 2.0 service is found
+            op_local = svc.find("{http://openid.net/xmlns/1.0}Delegate")
+            if op_local is not None:
+                op_local = op_local.text
+            uri = svc.find("{xri://$xrd*($v*2.0)}URI")
+            if uri is not None:
+                op_endpoint = uri.text
+                break
+    else:
+        return None # No OpenID 2.0 service found
+    return services, op_endpoint, op_local
+
 def discover(url):
     '''Perform service discovery on the OP URL.
     Return list of service types, and the auth/2.0 URL,
@@ -226,44 +262,41 @@
     elif content_type == 'application/xrds+xml':
         # Yadis 6.2.5 option 4
         doc = ElementTree.fromstring(data)
-        for svc in doc.findall(".//{xri://$xrd*($v*2.0)}Service"):
-            services = [x.text for x in svc.findall("{xri://$xrd*($v*2.0)}Type")]
-            if 'http://specs.openid.net/auth/2.0/server' in services:
-                # 7.3.2.1.1 OP Identifier Element
-                uri = svc.find("{xri://$xrd*($v*2.0)}URI")
-                if uri is not None:
-                    op_local = None
-                    op_endpoint = uri.text
-                    break
-            elif 'http://specs.openid.net/auth/2.0/signon' in services:
-                # 7.3.2.1.2.  Claimed Identifier Element
-                op_local = svc.find("{xri://$xrd*($v*2.0)}LocalID")
-                if op_local is not None:
-                    op_local = op_local.text
-                uri = svc.find("{xri://$xrd*($v*2.0)}URI")
-                if uri is not None:
-                    op_endpoint = uri.text
-                    break
-            elif 'http://openid.net/server/1.0' in services or \
-                 'http://openid.net/server/1.1' in services or \
-                 'http://openid.net/signon/1.0' in services or \
-                 'http://openid.net/signon/1.1' in services:
-                # 14.2.1 says we also need to check for the 1.x types;
-                # XXX should check 1.x only if no 2.0 service is found
-                op_local = svc.find("{http://openid.net/xmlns/1.0}Delegate")
-                if op_local is not None:
-                    op_local = op_local.text
-                uri = svc.find("{xri://$xrd*($v*2.0)}URI")
-                if uri is not None:
-                    op_endpoint = uri.text
-                    break
-        else:
-            return None # No OpenID 2.0 service found
+        return _extract_services(doc)
     else:
         # unknown content type
         return None
     return services, op_endpoint, op_local
 
+def resolve_xri(xri, proxy='xri.net'):
+    '''Perform XRI resolution of xri using a proxy resolver.
+    Return canonical ID, services, op endpoint, op local;
+    return None if an error occurred'''
+    xri = urllib.quote(xri, safe='=@*!+()')
+    conn = httplib.HTTPConnection(proxy)
+    try:
+        conn.connect()
+    except:
+        # DNS or TCP error
+        return None
+    conn.putrequest("GET", '/'+xri+'?_xrd_r=application/xrds+xml')
+    conn.endheaders()
+
+    res = conn.getresponse()
+    data = res.read()
+    conn.close()
+
+    doc = ElementTree.fromstring(data)
+    res = _extract_services(doc)
+    if res is None:
+        # No OpenID service found
+        return None
+    services, op_endpoint, op_local = res
+    canonical_id = doc.find(".//{xri://$xrd*($v*2.0)}CanonicalID")
+    if canonical_id is None:
+        return None
+    return canonical_id.text, services, op_endpoint, op_local
+
 def is_compat_1x(services):
     for uri in ('http://specs.openid.net/auth/2.0/signon',
                 'http://specs.openid.net/auth/2.0/server'):
@@ -339,6 +372,8 @@
             data['openid.session_type'] = ''
         del data['openid.ns']
     res = urllib.urlopen(url, urllib.urlencode(data))
+    if res.getcode() != 200:
+        raise ValueError, "OpenID provider refuses connection with status %d" % res.getcode()
     data = parse_response(res.read())
     if 'error' in data:
         raise ValueError, "associate failed: "+data['error']
@@ -407,7 +442,7 @@
     if claimed is None:
         claimed = "http://specs.openid.net/auth/2.0/identifier_select"
     if op_local is None:
-        op_local = "http://specs.openid.net/auth/2.0/identifier_select"
+        op_local = claimed
     if realm is None:
         realm = return_to
     data = {
@@ -528,7 +563,7 @@
     return signed
 
 def parse_nonce(nonce):
-    '''Split a nonce into a (timestamp, ID) pair'''
+    '''Extract a datetime.datetime stamp from the nonce'''
     stamp = nonce.split('Z', 1)[0]
     stamp = time.strptime(stamp, "%Y-%m-%dT%H:%M:%S")[:6]
     stamp = datetime.datetime(*stamp)


More information about the Pypi-checkins mailing list