[pypy-svn] r20334 - in pypy/dist/pypy/module/_socket: . test
nik at codespeak.net
nik at codespeak.net
Mon Nov 28 11:09:09 CET 2005
Author: nik
Date: Mon Nov 28 11:09:08 2005
New Revision: 20334
Modified:
pypy/dist/pypy/module/_socket/interp_socket.py
pypy/dist/pypy/module/_socket/test/test_socket2.py
Log:
interp level implemenation of _socket.inet_aton/pton (IPv4 only for the moment)
Modified: pypy/dist/pypy/module/_socket/interp_socket.py
==============================================================================
--- pypy/dist/pypy/module/_socket/interp_socket.py (original)
+++ pypy/dist/pypy/module/_socket/interp_socket.py Mon Nov 28 11:09:08 2005
@@ -99,6 +99,14 @@
space.wrap(errno),
space.wrap(msg))
+def w_get_socketerror(space, message, errno=-1):
+ w_module = space.getbuiltinmodule('_socket')
+ w_errortype = space.getattr(w_module, space.wrap('error'))
+ if errno > -1:
+ return OperationError(w_errortype, space.wrap(errno), space.wrap(message))
+ else:
+ return OperationError(w_errortype, space.wrap(message))
+
def wrap_timeouterror(space):
w_module = space.getbuiltinmodule('_socket')
@@ -292,10 +300,14 @@
Convert an IP address in string format (123.45.67.89) to the 32-bit packed
binary format used in low-level network functions.
"""
- try:
- return space.wrap(socket.inet_aton(ip))
- except socket.error, e:
- raise wrap_socketerror(space, e)
+ packed = inet_pton_ipv4(space, ip)
+ if len(packed) == 0 or packed == "\xff\xff\xff\xff":
+ # NB: The broadcast address 255.255.255.255 is illegal because it's the
+ # error value in the C API
+ raise w_get_socketerror(space,
+ "illegal IP address string passed to inet_aton")
+ else:
+ return space.wrap(packed)
inet_aton.unwrap_spec = [ObjSpace, str]
def inet_ntoa(space, packed):
@@ -304,37 +316,69 @@
Convert an IP address from 32-bit packed binary format to string format
"""
try:
- return inet_ntop_ipv4(space, packed)
+ return space.wrap(inet_ntop_ipv4(space, packed))
except OperationError, e:
if not e.match(space, space.w_ValueError):
raise
- w_module = space.getbuiltinmodule('_socket')
- # NB: This socket.error has no errno as per CPython
- raise OperationError(space.getattr(w_module, space.wrap('error')),
- space.wrap("packed IP wrong length for inet_ntoa"))
+ raise w_get_socketerror(space, "packed IP wrong length for inet_ntoa")
inet_ntoa.unwrap_spec = [ObjSpace, str]
-def inet_pton(space, af, ip):
- """inet_pton(af, ip) -> packed IP address string
+def inet_pton(space, family, ip):
+ """inet_pton(family, ip) -> packed IP address string
Convert an IP address from string format to a packed string suitable
for use with low-level network functions.
"""
- try:
- return space.wrap(socket.inet_pton(af, ip))
- except socket.error, e:
- raise wrap_socketerror(space, e)
+ if family == socket.AF_INET:
+ packed = inet_pton_ipv4(space, ip)
+ if len(packed) == 0:
+ raise w_get_socketerror(space,
+ "illegal IP address string passed to inet_pton")
+ else:
+ return space.wrap(packed)
+ elif socket.has_ipv6 and family == socket.AF_INET6:
+ # XXX implement pton for IPv6
+ return space.wrap(socket.inet_pton(family, ip))
+ else:
+ raise w_get_socketerror(space,
+ "Address family not supported by protocol family", errno.EAFNOSUPPORT)
inet_pton.unwrap_spec = [ObjSpace, int, str]
+def inet_pton_ipv4(space, ip):
+ # NB: Some platforms accept leading and trailing whitespace in the IP
+ # representation. We don't.
+ zero_ord = ord("0")
+ nine_ord = ord("9")
+ packed = ""
+ number = 0
+ pos = 0
+ for char in ip:
+ char_ord = ord(char)
+ if char == ".":
+ if len(packed) >= 3 or pos == 0:
+ return ""
+ packed += chr(number)
+ number = 0
+ pos = 0
+ elif char_ord >= zero_ord and char_ord <= nine_ord:
+ number = number * 10 + char_ord - zero_ord
+ pos += 1
+ if number > 0xff or pos > 3:
+ return ""
+ else:
+ return ""
+ packed += chr(number)
+ return packed
+
def inet_ntop(space, family, packed):
"""inet_ntop(family, packed_ip) -> string formatted IP address
Convert a packed IP address of the given family to string format.
"""
if family == socket.AF_INET:
- return inet_ntop_ipv4(space, packed)
+ return space.wrap(inet_ntop_ipv4(space, packed))
elif socket.has_ipv6 and family == socket.AF_INET6:
- return inet_ntop_ipv6(space, packed)
+ return space.wrap(inet_ntop_ipv6(space, packed))
else:
raise OperationError(space.w_ValueError,
space.wrap("unknown address family %s" % family))
@@ -345,7 +389,7 @@
raise OperationError(space.w_ValueError,
space.wrap("invalid length of packed IP address string"))
numbers = ord(packed[0]), ord(packed[1]), ord(packed[2]), ord(packed[3])
- return space.wrap("%d.%d.%d.%d" % numbers)
+ return "%d.%d.%d.%d" % numbers
def inet_ntop_ipv6(space, packed):
# XXX Currently does abbrevation of consecutive zeros only for leading zeros
@@ -366,17 +410,17 @@
# All zeros
if pos == IPV6_ADDRESS_SIZE:
- return space.wrap("::")
+ return "::"
# IPv4-compatible address
elif pos >= IPV6_ADDRESS_SIZE - 4:
- ipv4 = space.unwrap(inet_ntop_ipv4(space, packed[IPV6_ADDRESS_SIZE - 4:]))
- return space.wrap("::" + ipv4)
+ ipv4 = inet_ntop_ipv4(space, packed[IPV6_ADDRESS_SIZE - 4:])
+ return "::" + ipv4
# IPv4-mapped IPv6 address
elif pos == IPV6_ADDRESS_SIZE - 6 and numbers[pos] == 0xff and numbers[pos + 1] == 0xff:
- ipv4 = space.unwrap(inet_ntop_ipv4(space, packed[IPV6_ADDRESS_SIZE - 4:]))
- return space.wrap("::ffff:" + ipv4)
+ ipv4 = inet_ntop_ipv4(space, packed[IPV6_ADDRESS_SIZE - 4:])
+ return "::ffff:" + ipv4
# Standard IPv6 address
else:
@@ -389,7 +433,7 @@
ip += ":"
part = (numbers[pos] << 8) + numbers[pos + 1]
ip += "%x" % part
- return space.wrap(ip)
+ return ip
def enumerateaddrinfo(space, addr):
result = []
Modified: pypy/dist/pypy/module/_socket/test/test_socket2.py
==============================================================================
--- pypy/dist/pypy/module/_socket/test/test_socket2.py (original)
+++ pypy/dist/pypy/module/_socket/test/test_socket2.py Mon Nov 28 11:09:08 2005
@@ -130,12 +130,16 @@
def test_pton_ntop_ipv4():
if not hasattr(socket, 'inet_pton'):
py.test.skip('No socket.(inet_pton|inet_ntop) on this platform')
- for ip in '123.45.67.89', '0.0.0.0', '255.255.255.254':
- packed = socket.inet_aton(ip)
+ tests = [
+ ("123.45.67.89", "\x7b\x2d\x43\x59"),
+ ("0.0.0.0", "\x00" * 4),
+ ("255.255.255.255", "\xff" * 4),
+ ]
+ for ip, packed in tests:
w_p = space.appexec([w_socket, space.wrap(ip)],
"(_socket, ip): return _socket.inet_pton(_socket.AF_INET, ip)")
assert space.unwrap(w_p) == packed
- w_ip = space.appexec([w_socket, space.wrap(packed)],
+ w_ip = space.appexec([w_socket, w_p],
"(_socket, p): return _socket.inet_ntop(_socket.AF_INET, p)")
assert space.unwrap(w_ip) == ip
@@ -163,6 +167,13 @@
import _socket
raises(_socket.error, _socket.inet_ntoa, "ab")
+def app_test_aton_exceptions():
+ import _socket
+ tests = ["127.0.0.256", "127.0.0.255555555555555555", "127.2b.0.0",
+ "127.2.0.0.1", "127.2..0", "255.255.255.255"]
+ for ip in tests:
+ raises(_socket.error, _socket.inet_aton, ip)
+
def app_test_ntop_exceptions():
import _socket
for family, packed, exception in \
@@ -172,6 +183,19 @@
(_socket.AF_INET, u"aa\u2222a", UnicodeEncodeError)]:
raises(exception, _socket.inet_ntop, family, packed)
+def app_test_pton_exceptions():
+ import _socket
+ tests = [
+ (_socket.AF_INET + _socket.AF_INET6, ""),
+ (_socket.AF_INET, "127.0.0.256"),
+ (_socket.AF_INET, "127.0.0.255555555555555555"),
+ (_socket.AF_INET, "127.2b.0.0"),
+ (_socket.AF_INET, "127.2.0.0.1"),
+ (_socket.AF_INET, "127.2..0"),
+ ]
+ for family, ip in tests:
+ raises(_socket.error, _socket.inet_pton, family, ip)
+
def test_has_ipv6():
res = space.appexec([w_socket], "(_socket): return _socket.has_ipv6")
assert space.unwrap(res) == socket.has_ipv6
More information about the Pypy-commit
mailing list