[pypy-svn] r20354 - in pypy/dist/pypy/module/_socket: . test

nik at codespeak.net nik at codespeak.net
Mon Nov 28 14:46:03 CET 2005


Author: nik
Date: Mon Nov 28 14:46:02 2005
New Revision: 20354

Modified:
   pypy/dist/pypy/module/_socket/interp_socket.py
   pypy/dist/pypy/module/_socket/test/test_socket2.py
Log:
_socket.inet_pton for IPv6. looks ugly, but so does most parsing code ...


Modified: pypy/dist/pypy/module/_socket/interp_socket.py
==============================================================================
--- pypy/dist/pypy/module/_socket/interp_socket.py	(original)
+++ pypy/dist/pypy/module/_socket/interp_socket.py	Mon Nov 28 14:46:02 2005
@@ -300,6 +300,8 @@
     Convert an IP address in string format (123.45.67.89) to the 32-bit packed
     binary format used in low-level network functions.
     """
+    # XXX POSIX says inet_aton/inet_addr shall accept octal- and hexadecimal-
+    # dotted notation as well as CIDR, which we don't do currently.
     packed = inet_pton_ipv4(space, ip)
     if len(packed) == 0 or packed == "\xff\xff\xff\xff":
         # NB: The broadcast address 255.255.255.255 is illegal because it's the
@@ -337,8 +339,12 @@
         else:
             return space.wrap(packed)
     elif socket.has_ipv6 and family == socket.AF_INET6:
-        # XXX implement pton for IPv6
-        return space.wrap(socket.inet_pton(family, ip))
+        packed = inet_pton_ipv6(space, ip)
+        if len(packed) == 0:
+            raise w_get_socketerror(space,
+                            "illegal IP address string passed to inet_pton")
+        else:
+            return space.wrap(packed)
     else:
         raise w_get_socketerror(space,
             "Address family not supported by protocol family", errno.EAFNOSUPPORT)
@@ -355,7 +361,7 @@
     for char in ip:
         char_ord = ord(char)
         if char == ".":
-            if len(packed) >= 3 or pos == 0:
+            if len(packed) >= IPV4_ADDRESS_SIZE - 1 or pos == 0:
                 return ""
             packed += chr(number)
             number = 0
@@ -370,6 +376,70 @@
     packed += chr(number)
     return packed
 
+def inet_pton_ipv6(space, ip):
+    zero_ord = ord("0")
+    a_ord = ord("a")
+    A_ord = ord("A")
+    packed = ""
+    number = 0
+    pos = 0
+    abbrev_index = -1
+    i = 0
+
+    if ip[:2] == "::":
+        abbrev_index = 0
+        i = 2
+
+    while i < len(ip):
+        char = ip[i]
+        char_ord = ord(char)
+        if char == ":":
+            if pos == 0: # Abbrevation
+                if abbrev_index >= 0:
+                    return ""
+                abbrev_index = len(packed)
+            else:
+                if len(packed) >= IPV6_ADDRESS_SIZE - 2:
+                    return ""
+                packed += chr((number & 0xff00) >> 8) + chr(number & 0xff)
+                number = 0
+                pos = 0
+        elif char_ord >= zero_ord and char_ord < zero_ord + 10:
+            number = number * 16 + char_ord - zero_ord
+            pos += 1
+        elif char_ord >= a_ord and char_ord < a_ord + 6:
+            number = number * 16 + (char_ord - a_ord) + 10
+            pos += 1
+        elif char_ord >= A_ord and char_ord < A_ord + 6:
+            number = number * 16 + (char_ord - A_ord) + 10
+            pos += 1
+        elif char == ".":
+            if i - pos == 0:
+                return ""
+            packed_v4 = inet_pton_ipv4(space, ip[i - pos:])
+            if len(packed_v4) == 0:
+                return ""
+            packed += packed_v4
+            break
+        else:
+            return ""
+        if pos > 4:
+            return ""
+        i += 1
+
+    if i == len(ip) and not (abbrev_index == len(packed) and pos == 0):
+        # The above means: No IPv4 compatibility and abbrevation not at end.
+        # We need to append the last 16 bits.
+        packed += chr((number & 0xff00) >> 8) + chr(number & 0xff)
+
+    if abbrev_index >= 0:
+        if len(packed) >= IPV6_ADDRESS_SIZE:
+            return ""
+        zeros = "\x00" * (IPV6_ADDRESS_SIZE - len(packed))
+        packed = packed[:abbrev_index] + zeros + packed[abbrev_index:]
+
+    return packed
+
 def inet_ntop(space, family, packed):
     """inet_ntop(family, packed_ip) -> string formatted IP address
 

Modified: pypy/dist/pypy/module/_socket/test/test_socket2.py
==============================================================================
--- pypy/dist/pypy/module/_socket/test/test_socket2.py	(original)
+++ pypy/dist/pypy/module/_socket/test/test_socket2.py	Mon Nov 28 14:46:02 2005
@@ -163,6 +163,28 @@
             "(_socket, ip): return _socket.inet_pton(_socket.AF_INET6, ip)")
         assert space.unwrap(w_packed) == packed
 
+def test_pton_ipv6():
+    if not hasattr(socket, 'inet_pton'):
+        py.test.skip('No socket.(inet_pton|inet_ntop) on this platform')
+    if not socket.has_ipv6:
+        py.test.skip("No IPv6 on this platform")
+    tests = [
+        ("\x00" * 16, "::"),
+        ("\x01" * 16, ":".join(["101"] * 8)),
+        ("\x00\x01" + "\x00" * 12 + "\x00\x02", "1::2"),
+        ("\x00" * 4 + "\x00\x01" * 6, "::1:1:1:1:1:1"),
+        ("\x00\x01" * 6 + "\x00" * 4, "1:1:1:1:1:1::"),
+        ("\xab\xcd\xef\00" + "\x00" * 12, "ABCD:EF00::"),
+        ("\xab\xcd\xef\00" + "\x00" * 12, "abcd:ef00::"),
+        ("\x00\x00\x10\x10" * 4, "::1010:" + ":".join(["0:1010"] * 3)),
+        ("\x00" * 12 + "\x01\x02\x03\x04", "::1.2.3.4"),
+        ("\x00" * 10 + "\xff\xff\x01\x02\x03\x04", "::ffff:1.2.3.4"),
+    ]
+    for packed, ip in tests:
+        w_packed = space.appexec([w_socket, space.wrap(ip)],
+            "(_socket, ip): return _socket.inet_pton(_socket.AF_INET6, ip)")
+        assert space.unwrap(w_packed) == packed
+
 def app_test_ntoa_exception():
     import _socket
     raises(_socket.error, _socket.inet_ntoa, "ab")
@@ -192,6 +214,13 @@
         (_socket.AF_INET, "127.2b.0.0"),
         (_socket.AF_INET, "127.2.0.0.1"),
         (_socket.AF_INET, "127.2..0"),
+        (_socket.AF_INET6, "127.0.0.1"),
+        (_socket.AF_INET6, "1::2::3"),
+        (_socket.AF_INET6, "1:1:1:1:1:1:1:1:1"),
+        (_socket.AF_INET6, "1:1:1:1:1:1:1:1::"),
+        (_socket.AF_INET6, "1:1:1::1:1:1:1:1"),
+        (_socket.AF_INET6, "1::22222:1"),
+        (_socket.AF_INET6, "1::eg"),
     ]
     for family, ip in tests:
         raises(_socket.error, _socket.inet_pton, family, ip)



More information about the Pypy-commit mailing list