[pypy-svn] r20354 - in pypy/dist/pypy/module/_socket: . test
nik at codespeak.net
nik at codespeak.net
Mon Nov 28 14:46:03 CET 2005
Author: nik
Date: Mon Nov 28 14:46:02 2005
New Revision: 20354
Modified:
pypy/dist/pypy/module/_socket/interp_socket.py
pypy/dist/pypy/module/_socket/test/test_socket2.py
Log:
_socket.inet_pton for IPv6. looks ugly, but so does most parsing code ...
Modified: pypy/dist/pypy/module/_socket/interp_socket.py
==============================================================================
--- pypy/dist/pypy/module/_socket/interp_socket.py (original)
+++ pypy/dist/pypy/module/_socket/interp_socket.py Mon Nov 28 14:46:02 2005
@@ -300,6 +300,8 @@
Convert an IP address in string format (123.45.67.89) to the 32-bit packed
binary format used in low-level network functions.
"""
+ # XXX POSIX says inet_aton/inet_addr shall accept octal- and hexadecimal-
+ # dotted notation as well as CIDR, which we don't do currently.
packed = inet_pton_ipv4(space, ip)
if len(packed) == 0 or packed == "\xff\xff\xff\xff":
# NB: The broadcast address 255.255.255.255 is illegal because it's the
@@ -337,8 +339,12 @@
else:
return space.wrap(packed)
elif socket.has_ipv6 and family == socket.AF_INET6:
- # XXX implement pton for IPv6
- return space.wrap(socket.inet_pton(family, ip))
+ packed = inet_pton_ipv6(space, ip)
+ if len(packed) == 0:
+ raise w_get_socketerror(space,
+ "illegal IP address string passed to inet_pton")
+ else:
+ return space.wrap(packed)
else:
raise w_get_socketerror(space,
"Address family not supported by protocol family", errno.EAFNOSUPPORT)
@@ -355,7 +361,7 @@
for char in ip:
char_ord = ord(char)
if char == ".":
- if len(packed) >= 3 or pos == 0:
+ if len(packed) >= IPV4_ADDRESS_SIZE - 1 or pos == 0:
return ""
packed += chr(number)
number = 0
@@ -370,6 +376,70 @@
packed += chr(number)
return packed
+def inet_pton_ipv6(space, ip):
+ zero_ord = ord("0")
+ a_ord = ord("a")
+ A_ord = ord("A")
+ packed = ""
+ number = 0
+ pos = 0
+ abbrev_index = -1
+ i = 0
+
+ if ip[:2] == "::":
+ abbrev_index = 0
+ i = 2
+
+ while i < len(ip):
+ char = ip[i]
+ char_ord = ord(char)
+ if char == ":":
+ if pos == 0: # Abbrevation
+ if abbrev_index >= 0:
+ return ""
+ abbrev_index = len(packed)
+ else:
+ if len(packed) >= IPV6_ADDRESS_SIZE - 2:
+ return ""
+ packed += chr((number & 0xff00) >> 8) + chr(number & 0xff)
+ number = 0
+ pos = 0
+ elif char_ord >= zero_ord and char_ord < zero_ord + 10:
+ number = number * 16 + char_ord - zero_ord
+ pos += 1
+ elif char_ord >= a_ord and char_ord < a_ord + 6:
+ number = number * 16 + (char_ord - a_ord) + 10
+ pos += 1
+ elif char_ord >= A_ord and char_ord < A_ord + 6:
+ number = number * 16 + (char_ord - A_ord) + 10
+ pos += 1
+ elif char == ".":
+ if i - pos == 0:
+ return ""
+ packed_v4 = inet_pton_ipv4(space, ip[i - pos:])
+ if len(packed_v4) == 0:
+ return ""
+ packed += packed_v4
+ break
+ else:
+ return ""
+ if pos > 4:
+ return ""
+ i += 1
+
+ if i == len(ip) and not (abbrev_index == len(packed) and pos == 0):
+ # The above means: No IPv4 compatibility and abbrevation not at end.
+ # We need to append the last 16 bits.
+ packed += chr((number & 0xff00) >> 8) + chr(number & 0xff)
+
+ if abbrev_index >= 0:
+ if len(packed) >= IPV6_ADDRESS_SIZE:
+ return ""
+ zeros = "\x00" * (IPV6_ADDRESS_SIZE - len(packed))
+ packed = packed[:abbrev_index] + zeros + packed[abbrev_index:]
+
+ return packed
+
def inet_ntop(space, family, packed):
"""inet_ntop(family, packed_ip) -> string formatted IP address
Modified: pypy/dist/pypy/module/_socket/test/test_socket2.py
==============================================================================
--- pypy/dist/pypy/module/_socket/test/test_socket2.py (original)
+++ pypy/dist/pypy/module/_socket/test/test_socket2.py Mon Nov 28 14:46:02 2005
@@ -163,6 +163,28 @@
"(_socket, ip): return _socket.inet_pton(_socket.AF_INET6, ip)")
assert space.unwrap(w_packed) == packed
+def test_pton_ipv6():
+ if not hasattr(socket, 'inet_pton'):
+ py.test.skip('No socket.(inet_pton|inet_ntop) on this platform')
+ if not socket.has_ipv6:
+ py.test.skip("No IPv6 on this platform")
+ tests = [
+ ("\x00" * 16, "::"),
+ ("\x01" * 16, ":".join(["101"] * 8)),
+ ("\x00\x01" + "\x00" * 12 + "\x00\x02", "1::2"),
+ ("\x00" * 4 + "\x00\x01" * 6, "::1:1:1:1:1:1"),
+ ("\x00\x01" * 6 + "\x00" * 4, "1:1:1:1:1:1::"),
+ ("\xab\xcd\xef\00" + "\x00" * 12, "ABCD:EF00::"),
+ ("\xab\xcd\xef\00" + "\x00" * 12, "abcd:ef00::"),
+ ("\x00\x00\x10\x10" * 4, "::1010:" + ":".join(["0:1010"] * 3)),
+ ("\x00" * 12 + "\x01\x02\x03\x04", "::1.2.3.4"),
+ ("\x00" * 10 + "\xff\xff\x01\x02\x03\x04", "::ffff:1.2.3.4"),
+ ]
+ for packed, ip in tests:
+ w_packed = space.appexec([w_socket, space.wrap(ip)],
+ "(_socket, ip): return _socket.inet_pton(_socket.AF_INET6, ip)")
+ assert space.unwrap(w_packed) == packed
+
def app_test_ntoa_exception():
import _socket
raises(_socket.error, _socket.inet_ntoa, "ab")
@@ -192,6 +214,13 @@
(_socket.AF_INET, "127.2b.0.0"),
(_socket.AF_INET, "127.2.0.0.1"),
(_socket.AF_INET, "127.2..0"),
+ (_socket.AF_INET6, "127.0.0.1"),
+ (_socket.AF_INET6, "1::2::3"),
+ (_socket.AF_INET6, "1:1:1:1:1:1:1:1:1"),
+ (_socket.AF_INET6, "1:1:1:1:1:1:1:1::"),
+ (_socket.AF_INET6, "1:1:1::1:1:1:1:1"),
+ (_socket.AF_INET6, "1::22222:1"),
+ (_socket.AF_INET6, "1::eg"),
]
for family, ip in tests:
raises(_socket.error, _socket.inet_pton, family, ip)
More information about the Pypy-commit
mailing list