[pypy-svn] r20312 - in pypy/dist/pypy/module/_socket: . test

nik at codespeak.net nik at codespeak.net
Sun Nov 27 21:47:06 CET 2005


Author: nik
Date: Sun Nov 27 21:47:05 2005
New Revision: 20312

Modified:
   pypy/dist/pypy/module/_socket/interp_socket.py
   pypy/dist/pypy/module/_socket/test/test_socket2.py
Log:
inet_ntop with IPv6 support


Modified: pypy/dist/pypy/module/_socket/interp_socket.py
==============================================================================
--- pypy/dist/pypy/module/_socket/interp_socket.py	(original)
+++ pypy/dist/pypy/module/_socket/interp_socket.py	Sun Nov 27 21:47:05 2005
@@ -333,8 +333,8 @@
     """
     if family == socket.AF_INET:
         return inet_ntop_ipv4(space, packed)
-    elif family == socket.AF_INET6:
-        raise NotImplementedError()
+    elif socket.has_ipv6 and family == socket.AF_INET6:
+        return inet_ntop_ipv6(space, packed)
     else:
         raise OperationError(space.w_ValueError,
                              space.wrap("unknown address family %s" % family))
@@ -347,6 +347,50 @@
     numbers = ord(packed[0]), ord(packed[1]), ord(packed[2]), ord(packed[3])
     return space.wrap("%d.%d.%d.%d" % numbers)
 
+def inet_ntop_ipv6(space, packed):
+    # XXX Currently does abbrevation of consecutive zeros only for leading zeros
+    if len(packed) != IPV6_ADDRESS_SIZE:
+        raise OperationError(space.w_ValueError,
+                             space.wrap("invalid length of packed IP address string"))
+    numbers = []
+    for byte in packed:
+        numbers.append(ord(byte))
+    
+    # Skip leading zeros
+    pos = 0
+    part = 0
+    while part == 0 and pos < IPV6_ADDRESS_SIZE:
+        part = (numbers[pos] << 8) + numbers[pos + 1]
+        if part == 0:
+            pos += 2
+
+    # All zeros
+    if pos == IPV6_ADDRESS_SIZE:
+        return space.wrap("::")
+
+    # IPv4-compatible address
+    elif pos >= IPV6_ADDRESS_SIZE - 4:
+        ipv4 = space.unwrap(inet_ntop_ipv4(space, packed[IPV6_ADDRESS_SIZE - 4:]))
+        return space.wrap("::" + ipv4)
+
+    # IPv4-mapped IPv6 address
+    elif pos == IPV6_ADDRESS_SIZE - 6 and numbers[pos] == 0xff and numbers[pos + 1] == 0xff:
+        ipv4 = space.unwrap(inet_ntop_ipv4(space, packed[IPV6_ADDRESS_SIZE - 4:]))
+        return space.wrap("::ffff:" + ipv4)
+
+    # Standard IPv6 address
+    else:
+        if pos > 0:
+            ip = ":" # there were leading zeros
+        else:
+            ip = ""
+        for pos in range(pos, IPV6_ADDRESS_SIZE, 2):
+            if len(ip) > 0:
+                ip += ":"
+            part = (numbers[pos] << 8) + numbers[pos + 1]
+            ip += "%x" % part
+        return space.wrap(ip)
+
 def enumerateaddrinfo(space, addr):
     result = []
     while True:

Modified: pypy/dist/pypy/module/_socket/test/test_socket2.py
==============================================================================
--- pypy/dist/pypy/module/_socket/test/test_socket2.py	(original)
+++ pypy/dist/pypy/module/_socket/test/test_socket2.py	Sun Nov 27 21:47:05 2005
@@ -139,6 +139,26 @@
                              "(_socket, p): return _socket.inet_ntop(_socket.AF_INET, p)")
         assert space.unwrap(w_ip) == ip
 
+def test_ntop_ipv6():
+    if not hasattr(socket, 'inet_pton'):
+        py.test.skip('No socket.(inet_pton|inet_ntop) on this platform')
+    if not socket.has_ipv6:
+        py.test.skip("No IPv6 on this platform")
+    tests = [
+        ("\x00" * 16, "::"),
+        ("\x01" * 16, ":".join(["101"] * 8)),
+        ("\x00\x00\x10\x10" * 4, "::1010:" + ":".join(["0:1010"] * 3)),
+        ("\x00" * 12 + "\x01\x02\x03\x04", "::1.2.3.4"),
+        ("\x00" * 10 + "\xff\xff\x01\x02\x03\x04", "::ffff:1.2.3.4"),
+    ]
+    for packed, ip in tests:
+        w_ip = space.appexec([w_socket, space.wrap(packed)],
+            "(_socket, packed): return _socket.inet_ntop(_socket.AF_INET6, packed)")
+        assert space.unwrap(w_ip) == ip
+        w_packed = space.appexec([w_socket, w_ip],
+            "(_socket, ip): return _socket.inet_pton(_socket.AF_INET6, ip)")
+        assert space.unwrap(w_packed) == packed
+
 def app_test_ntoa_exception():
     import _socket
     raises(_socket.error, _socket.inet_ntoa, "ab")
@@ -148,6 +168,7 @@
     for family, packed, exception in \
                 [(_socket.AF_INET + _socket.AF_INET6, "", ValueError),
                  (_socket.AF_INET, "a", ValueError),
+                 (_socket.AF_INET6, "a", ValueError),
                  (_socket.AF_INET, u"aa\u2222a", UnicodeEncodeError)]:
         raises(exception, _socket.inet_ntop, family, packed)
 



More information about the Pypy-commit mailing list