[pypy-svn] r20334 - in pypy/dist/pypy/module/_socket: . test

nik at codespeak.net nik at codespeak.net
Mon Nov 28 11:09:09 CET 2005


Author: nik
Date: Mon Nov 28 11:09:08 2005
New Revision: 20334

Modified:
   pypy/dist/pypy/module/_socket/interp_socket.py
   pypy/dist/pypy/module/_socket/test/test_socket2.py
Log:
interp level implemenation of _socket.inet_aton/pton (IPv4 only for the moment)


Modified: pypy/dist/pypy/module/_socket/interp_socket.py
==============================================================================
--- pypy/dist/pypy/module/_socket/interp_socket.py	(original)
+++ pypy/dist/pypy/module/_socket/interp_socket.py	Mon Nov 28 11:09:08 2005
@@ -99,6 +99,14 @@
                           space.wrap(errno),
                           space.wrap(msg))
 
+def w_get_socketerror(space, message, errno=-1):
+    w_module = space.getbuiltinmodule('_socket')
+    w_errortype = space.getattr(w_module, space.wrap('error'))
+    if errno > -1:
+        return OperationError(w_errortype, space.wrap(errno), space.wrap(message))
+    else:
+        return OperationError(w_errortype, space.wrap(message))
+
 def wrap_timeouterror(space):
 
     w_module = space.getbuiltinmodule('_socket')
@@ -292,10 +300,14 @@
     Convert an IP address in string format (123.45.67.89) to the 32-bit packed
     binary format used in low-level network functions.
     """
-    try:
-        return space.wrap(socket.inet_aton(ip))
-    except socket.error, e:
-        raise wrap_socketerror(space, e)
+    packed = inet_pton_ipv4(space, ip)
+    if len(packed) == 0 or packed == "\xff\xff\xff\xff":
+        # NB: The broadcast address 255.255.255.255 is illegal because it's the
+        # error value in the C API
+        raise w_get_socketerror(space,
+                                "illegal IP address string passed to inet_aton")
+    else:
+        return space.wrap(packed)
 inet_aton.unwrap_spec = [ObjSpace, str]
 
 def inet_ntoa(space, packed):
@@ -304,37 +316,69 @@
     Convert an IP address from 32-bit packed binary format to string format
     """
     try:
-        return inet_ntop_ipv4(space, packed)
+        return space.wrap(inet_ntop_ipv4(space, packed))
     except OperationError, e:
         if not e.match(space, space.w_ValueError):
             raise
-        w_module = space.getbuiltinmodule('_socket')
-        # NB: This socket.error has no errno as per CPython
-        raise OperationError(space.getattr(w_module, space.wrap('error')),
-                             space.wrap("packed IP wrong length for inet_ntoa"))
+        raise w_get_socketerror(space, "packed IP wrong length for inet_ntoa")
 inet_ntoa.unwrap_spec = [ObjSpace, str]
 
-def inet_pton(space, af, ip):
-    """inet_pton(af, ip) -> packed IP address string
+def inet_pton(space, family, ip):
+    """inet_pton(family, ip) -> packed IP address string
 
     Convert an IP address from string format to a packed string suitable
     for use with low-level network functions.
     """
-    try:
-        return space.wrap(socket.inet_pton(af, ip))
-    except socket.error, e:
-        raise wrap_socketerror(space, e)
+    if family == socket.AF_INET:
+        packed = inet_pton_ipv4(space, ip)
+        if len(packed) == 0:
+            raise w_get_socketerror(space,
+                            "illegal IP address string passed to inet_pton")
+        else:
+            return space.wrap(packed)
+    elif socket.has_ipv6 and family == socket.AF_INET6:
+        # XXX implement pton for IPv6
+        return space.wrap(socket.inet_pton(family, ip))
+    else:
+        raise w_get_socketerror(space,
+            "Address family not supported by protocol family", errno.EAFNOSUPPORT)
 inet_pton.unwrap_spec = [ObjSpace, int, str]
 
+def inet_pton_ipv4(space, ip):
+    # NB: Some platforms accept leading and trailing whitespace in the IP
+    # representation. We don't.
+    zero_ord = ord("0")
+    nine_ord = ord("9")
+    packed = ""
+    number = 0
+    pos = 0
+    for char in ip:
+        char_ord = ord(char)
+        if char == ".":
+            if len(packed) >= 3 or pos == 0:
+                return ""
+            packed += chr(number)
+            number = 0
+            pos = 0
+        elif char_ord >= zero_ord and char_ord <= nine_ord:
+            number = number * 10 + char_ord - zero_ord
+            pos += 1
+            if number > 0xff or pos > 3:
+                return ""
+        else:
+            return ""
+    packed += chr(number)
+    return packed
+
 def inet_ntop(space, family, packed):
     """inet_ntop(family, packed_ip) -> string formatted IP address
 
     Convert a packed IP address of the given family to string format.
     """
     if family == socket.AF_INET:
-        return inet_ntop_ipv4(space, packed)
+        return space.wrap(inet_ntop_ipv4(space, packed))
     elif socket.has_ipv6 and family == socket.AF_INET6:
-        return inet_ntop_ipv6(space, packed)
+        return space.wrap(inet_ntop_ipv6(space, packed))
     else:
         raise OperationError(space.w_ValueError,
                              space.wrap("unknown address family %s" % family))
@@ -345,7 +389,7 @@
         raise OperationError(space.w_ValueError,
                              space.wrap("invalid length of packed IP address string"))
     numbers = ord(packed[0]), ord(packed[1]), ord(packed[2]), ord(packed[3])
-    return space.wrap("%d.%d.%d.%d" % numbers)
+    return "%d.%d.%d.%d" % numbers
 
 def inet_ntop_ipv6(space, packed):
     # XXX Currently does abbrevation of consecutive zeros only for leading zeros
@@ -366,17 +410,17 @@
 
     # All zeros
     if pos == IPV6_ADDRESS_SIZE:
-        return space.wrap("::")
+        return "::"
 
     # IPv4-compatible address
     elif pos >= IPV6_ADDRESS_SIZE - 4:
-        ipv4 = space.unwrap(inet_ntop_ipv4(space, packed[IPV6_ADDRESS_SIZE - 4:]))
-        return space.wrap("::" + ipv4)
+        ipv4 = inet_ntop_ipv4(space, packed[IPV6_ADDRESS_SIZE - 4:])
+        return "::" + ipv4
 
     # IPv4-mapped IPv6 address
     elif pos == IPV6_ADDRESS_SIZE - 6 and numbers[pos] == 0xff and numbers[pos + 1] == 0xff:
-        ipv4 = space.unwrap(inet_ntop_ipv4(space, packed[IPV6_ADDRESS_SIZE - 4:]))
-        return space.wrap("::ffff:" + ipv4)
+        ipv4 = inet_ntop_ipv4(space, packed[IPV6_ADDRESS_SIZE - 4:])
+        return "::ffff:" + ipv4
 
     # Standard IPv6 address
     else:
@@ -389,7 +433,7 @@
                 ip += ":"
             part = (numbers[pos] << 8) + numbers[pos + 1]
             ip += "%x" % part
-        return space.wrap(ip)
+        return ip
 
 def enumerateaddrinfo(space, addr):
     result = []

Modified: pypy/dist/pypy/module/_socket/test/test_socket2.py
==============================================================================
--- pypy/dist/pypy/module/_socket/test/test_socket2.py	(original)
+++ pypy/dist/pypy/module/_socket/test/test_socket2.py	Mon Nov 28 11:09:08 2005
@@ -130,12 +130,16 @@
 def test_pton_ntop_ipv4():
     if not hasattr(socket, 'inet_pton'):
         py.test.skip('No socket.(inet_pton|inet_ntop) on this platform')
-    for ip in '123.45.67.89', '0.0.0.0', '255.255.255.254':
-        packed = socket.inet_aton(ip)
+    tests = [
+        ("123.45.67.89", "\x7b\x2d\x43\x59"),
+        ("0.0.0.0", "\x00" * 4),
+        ("255.255.255.255", "\xff" * 4),
+    ]
+    for ip, packed in tests:
         w_p = space.appexec([w_socket, space.wrap(ip)],
                             "(_socket, ip): return _socket.inet_pton(_socket.AF_INET, ip)")
         assert space.unwrap(w_p) == packed
-        w_ip = space.appexec([w_socket, space.wrap(packed)],
+        w_ip = space.appexec([w_socket, w_p],
                              "(_socket, p): return _socket.inet_ntop(_socket.AF_INET, p)")
         assert space.unwrap(w_ip) == ip
 
@@ -163,6 +167,13 @@
     import _socket
     raises(_socket.error, _socket.inet_ntoa, "ab")
 
+def app_test_aton_exceptions():
+    import _socket
+    tests = ["127.0.0.256", "127.0.0.255555555555555555", "127.2b.0.0",
+        "127.2.0.0.1", "127.2..0", "255.255.255.255"]
+    for ip in tests:
+        raises(_socket.error, _socket.inet_aton, ip)
+
 def app_test_ntop_exceptions():
     import _socket
     for family, packed, exception in \
@@ -172,6 +183,19 @@
                  (_socket.AF_INET, u"aa\u2222a", UnicodeEncodeError)]:
         raises(exception, _socket.inet_ntop, family, packed)
 
+def app_test_pton_exceptions():
+    import _socket
+    tests = [
+        (_socket.AF_INET + _socket.AF_INET6, ""),
+        (_socket.AF_INET, "127.0.0.256"),
+        (_socket.AF_INET, "127.0.0.255555555555555555"),
+        (_socket.AF_INET, "127.2b.0.0"),
+        (_socket.AF_INET, "127.2.0.0.1"),
+        (_socket.AF_INET, "127.2..0"),
+    ]
+    for family, ip in tests:
+        raises(_socket.error, _socket.inet_pton, family, ip)
+
 def test_has_ipv6():
     res = space.appexec([w_socket], "(_socket): return _socket.has_ipv6")
     assert space.unwrap(res) == socket.has_ipv6



More information about the Pypy-commit mailing list