[pypy-svn] r18556 - in pypy/dist/pypy/module/_socket: . test

afa at codespeak.net afa at codespeak.net
Fri Oct 14 17:13:10 CEST 2005


Author: afa
Date: Fri Oct 14 17:13:08 2005
New Revision: 18556

Modified:
   pypy/dist/pypy/module/_socket/interp_socket.py
   pypy/dist/pypy/module/_socket/test/test_socket2.py
Log:
valentino, afa: more socket functions, and tests


Modified: pypy/dist/pypy/module/_socket/interp_socket.py
==============================================================================
--- pypy/dist/pypy/module/_socket/interp_socket.py	(original)
+++ pypy/dist/pypy/module/_socket/interp_socket.py	Fri Oct 14 17:13:08 2005
@@ -1,89 +1,175 @@
-
-import socket
-from pypy.interpreter.error import OperationError
-from pypy.interpreter.gateway import ObjSpace, W_Root, NoneNotWrapped
-
-def gethostname(space):
-    """gethostname() -> string
-
-    Return the current host name.
-    """
-    return space.wrap(socket.gethostname())
-gethostname.unwrap_spec = [ObjSpace]
-
-def gethostbyname(space, name):
-    """gethostbyname(host) -> address
-
-    Return the IP address (a string of the form '255.255.255.255') for a host.
-    """
-    return space.wrap(socket.gethostbyname(name))
-gethostbyname.unwrap_spec = [ObjSpace, str]
-
-def gethostbyname_ex(space, name):
-    """gethostbyname_ex(host) -> (name, aliaslist, addresslist)
-
-    Return the true host name, a list of aliases, and a list of IP addresses,
-    for a host.  The host argument is a string giving a host name or IP number.
-    """
-    return space.wrap(socket.gethostbyname_ex(name))
-gethostbyname_ex.unwrap_spec = [ObjSpace, str]
-
-def gethostbyaddr(space, ip_num):
-    """gethostbyaddr(host) -> (name, aliaslist, addresslist)
-
-    Return the true host name, a list of aliases, and a list of IP addresses,
-    for a host.  The host argument is a string giving a host name or IP number.
-    """
-    return space.wrap(socket.gethostbyaddr(ip_num))
-gethostbyaddr.unwrap_spec = [ObjSpace, str]
-
-def getservbyname(space, name, w_proto=NoneNotWrapped):
-    """getservbyname(servicename[, protocolname]) -> integer
-
-    Return a port number from a service name and protocol name.
-    The optional protocol name, if given, should be 'tcp' or 'udp',
-    otherwise any protocol will match.
-    """
-    if w_proto is None:
-        return space.wrap(socket.getservbyname(name))
-    else:
-        return space.wrap(socket.getservbyname(name, space.str_w(w_proto)))
-getservbyname.unwrap_spec = [ObjSpace, str, W_Root]
-
-def getservbyport(space, port, w_proto=NoneNotWrapped):
-    """getservbyport(port[, protocolname]) -> string
-
-    Return the service name from a port number and protocol name.
-    The optional protocol name, if given, should be 'tcp' or 'udp',
-    otherwise any protocol will match.
-    """
-    if w_proto is None:
-        return space.wrap(socket.getservbyport(port))
-    else:
-        return space.wrap(socket.getservbyport(port, space.str_w(w_proto)))
-getservbyport.unwrap_spec = [ObjSpace, int, W_Root]
-
-def getprotobyname(space, name):
-    """getprotobyname(name) -> integer
-
-    Return the protocol number for the named protocol.  (Rarely used.)
-    """
-    return space.wrap(socket.getprotobyname(name))
-getprotobyname.unwrap_spec = [ObjSpace, str]
-
-def fromfd(space, fd, family, type, w_proto=NoneNotWrapped):
-    """fromfd(fd, family, type[, proto]) -> socket object
-
-    Create a socket object from the given file descriptor.
-    The remaining arguments are the same as for socket().
-    """
-    if w_proto is None:
-        return space.wrap(socket.fromfd(fd, family, type))
-    else:
-        return space.wrap(socket.fromfd(fd, family, type, space.int_w(w_proto)))
-fromfd.unwrap_spec = [ObjSpace, int, int, int, W_Root]
-
-#    fromfd socketpair
-#    ntohs ntohl htons htonl inet_aton inet_ntoa inet_pton inet_ntop
-#    getaddrinfo getnameinfo
-#    getdefaulttimeout setdefaulttimeout
+
+import socket
+from pypy.interpreter.error import OperationError
+from pypy.interpreter.gateway import ObjSpace, W_Root, NoneNotWrapped
+
+def gethostname(space):
+    """gethostname() -> string
+
+    Return the current host name.
+    """
+    return space.wrap(socket.gethostname())
+gethostname.unwrap_spec = [ObjSpace]
+
+def gethostbyname(space, name):
+    """gethostbyname(host) -> address
+
+    Return the IP address (a string of the form '255.255.255.255') for a host.
+    """
+    return space.wrap(socket.gethostbyname(name))
+gethostbyname.unwrap_spec = [ObjSpace, str]
+
+def gethostbyname_ex(space, name):
+    """gethostbyname_ex(host) -> (name, aliaslist, addresslist)
+
+    Return the true host name, a list of aliases, and a list of IP addresses,
+    for a host.  The host argument is a string giving a host name or IP number.
+    """
+    return space.wrap(socket.gethostbyname_ex(name))
+gethostbyname_ex.unwrap_spec = [ObjSpace, str]
+
+def gethostbyaddr(space, ip_num):
+    """gethostbyaddr(host) -> (name, aliaslist, addresslist)
+
+    Return the true host name, a list of aliases, and a list of IP addresses,
+    for a host.  The host argument is a string giving a host name or IP number.
+    """
+    return space.wrap(socket.gethostbyaddr(ip_num))
+gethostbyaddr.unwrap_spec = [ObjSpace, str]
+
+def getservbyname(space, name, w_proto=NoneNotWrapped):
+    """getservbyname(servicename[, protocolname]) -> integer
+
+    Return a port number from a service name and protocol name.
+    The optional protocol name, if given, should be 'tcp' or 'udp',
+    otherwise any protocol will match.
+    """
+    if w_proto is None:
+        return space.wrap(socket.getservbyname(name))
+    else:
+        return space.wrap(socket.getservbyname(name, space.str_w(w_proto)))
+getservbyname.unwrap_spec = [ObjSpace, str, W_Root]
+
+def getservbyport(space, port, w_proto=NoneNotWrapped):
+    """getservbyport(port[, protocolname]) -> string
+
+    Return the service name from a port number and protocol name.
+    The optional protocol name, if given, should be 'tcp' or 'udp',
+    otherwise any protocol will match.
+    """
+    if w_proto is None:
+        return space.wrap(socket.getservbyport(port))
+    else:
+        return space.wrap(socket.getservbyport(port, space.str_w(w_proto)))
+getservbyport.unwrap_spec = [ObjSpace, int, W_Root]
+
+def getprotobyname(space, name):
+    """getprotobyname(name) -> integer
+
+    Return the protocol number for the named protocol.  (Rarely used.)
+    """
+    return space.wrap(socket.getprotobyname(name))
+getprotobyname.unwrap_spec = [ObjSpace, str]
+
+def fromfd(space, fd, family, type, w_proto=NoneNotWrapped):
+    """fromfd(fd, family, type[, proto]) -> socket object
+
+    Create a socket object from the given file descriptor.
+    The remaining arguments are the same as for socket().
+    """
+    if w_proto is None:
+        return space.wrap(socket.fromfd(fd, family, type))
+    else:
+        return space.wrap(socket.fromfd(fd, family, type, space.int_w(w_proto)))
+fromfd.unwrap_spec = [ObjSpace, int, int, int, W_Root]
+
+def socketpair(space, w_family=NoneNotWrapped, w_type=NoneNotWrapped, w_proto=NoneNotWrapped):
+    """socketpair([family[, type[, proto]]]) -> (socket object, socket object)
+
+    Create a pair of socket objects from the sockets returned by the platform
+    socketpair() function.
+    The arguments are the same as for socket() except the default family is
+    AF_UNIX if defined on the platform; otherwise, the default is AF_INET.
+    """
+    if w_family is None:
+        return space.wrap(socket.socketpair())
+    elif w_type is None:
+        return space.wrap(socket.socketpair(space.int_w(w_family)))
+    elif w_proto is None:
+        return space.wrap(socket.socketpair(space.int_w(w_family),
+                                            space.int_w(w_type)))
+    else:
+        return space.wrap(socket.socketpair(space.int_w(w_family),
+                                            space.int_w(w_type),
+                                            space.int_w(w_proto)))
+socketpair.unwrap_spec = [ObjSpace, W_Root, W_Root, W_Root]
+
+def ntohs(space, x):
+    """ntohs(integer) -> integer
+
+    Convert a 16-bit integer from network to host byte order.
+    """
+    return space.wrap(socket.ntohs(x))
+ntohs.unwrap_spec = [ObjSpace, int]
+
+def ntohl(space, x):
+    """ntohl(integer) -> integer
+
+    Convert a 32-bit integer from network to host byte order.
+    """
+    return space.wrap(socket.ntohl(x))
+ntohl.unwrap_spec = [ObjSpace, int]
+    
+def htons(space, x):
+    """htons(integer) -> integer
+
+    Convert a 16-bit integer from host to network byte order.
+    """
+    return space.wrap(socket.htons(x))
+htons.unwrap_spec = [ObjSpace, int]
+    
+def htonl(space, x):
+    """htonl(integer) -> integer
+
+    Convert a 32-bit integer from host to network byte order.
+    """
+    return space.wrap(socket.htonl(x))
+htonl.unwrap_spec = [ObjSpace, int]
+
+def inet_aton(space, ip):
+    """inet_aton(string) -> packed 32-bit IP representation
+
+    Convert an IP address in string format (123.45.67.89) to the 32-bit packed
+    binary format used in low-level network functions.
+    """
+    return space.wrap(socket.inet_aton(ip))
+inet_aton.unwrap_spec = [ObjSpace, str]
+
+def inet_ntoa(space, packed):
+    """inet_ntoa(packed_ip) -> ip_address_string
+
+    Convert an IP address from 32-bit packed binary format to string format
+    """
+    return space.wrap(socket.inet_ntoa(packed))
+inet_ntoa.unwrap_spec = [ObjSpace, str]
+
+def inet_pton(space, af, ip):
+    """inet_pton(af, ip) -> packed IP address string
+
+    Convert an IP address from string format to a packed string suitable
+    for use with low-level network functions.
+    """
+    return space.wrap(socket.inet_pton(af, ip))
+inet_pton.unwrap_spec = [ObjSpace, int, str]
+
+def inet_ntop(space, af, packed):
+    """inet_ntop(af, packed_ip) -> string formatted IP address
+
+    Convert a packed IP address of the given family to string format.
+    """
+    return space.wrap(socket.inet_ntop(af, packed))
+inet_ntop.unwrap_spec = [ObjSpace, int, str]
+
+
+#    getaddrinfo getnameinfo
+#    getdefaulttimeout setdefaulttimeout

Modified: pypy/dist/pypy/module/_socket/test/test_socket2.py
==============================================================================
--- pypy/dist/pypy/module/_socket/test/test_socket2.py	(original)
+++ pypy/dist/pypy/module/_socket/test/test_socket2.py	Fri Oct 14 17:13:08 2005
@@ -72,16 +72,14 @@
 
 def test_getprotobyname():
     name = "tcp"
-    num = space.appexec([w_socket, space.wrap(name)],
+    w_n = space.appexec([w_socket, space.wrap(name)],
                         "(_socket, name): return _socket.getprotobyname(name)")
-    assert space.unwrap(num) == socket.IPPROTO_TCP
-
-def test_has_ipv6():
-    res = space.appexec([w_socket], "(_socket): return _socket.has_ipv6")
-    assert space.unwrap(res) == socket.has_ipv6
+    assert space.unwrap(w_n) == socket.IPPROTO_TCP
 
 def test_fromfd():
     # XXX review
+    if not hasattr(socket, 'fromfd'):
+        py.test.skip("No socket.fromfd on this platform")
     orig_fd = path.open()
     fd = space.appexec([w_socket, space.wrap(orig_fd.fileno()),
             space.wrap(socket.AF_INET), space.wrap(socket.SOCK_STREAM),
@@ -96,3 +94,49 @@
                     return _socket.fromfd(fd, family, type)""")
 
     assert space.unwrap(fd).fileno()
+
+def test_ntohs():
+    w_n = space.appexec([w_socket, space.wrap(125)],
+                        "(_socket, x): return _socket.ntohs(x)")
+    assert space.unwrap(w_n) == socket.ntohs(125)
+
+def test_ntohl():
+    w_n = space.appexec([w_socket, space.wrap(125)],
+                        "(_socket, x): return _socket.ntohl(x)")
+    assert space.unwrap(w_n) == socket.ntohl(125)
+
+def test_htons():
+    w_n = space.appexec([w_socket, space.wrap(125)],
+                        "(_socket, x): return _socket.htons(x)")
+    assert space.unwrap(w_n) == socket.htons(125)
+
+def test_htonl():
+    w_n = space.appexec([w_socket, space.wrap(125)],
+                        "(_socket, x): return _socket.htonl(x)")
+    assert space.unwrap(w_n) == socket.htonl(125)
+
+def test_packed_ip():
+    ip = '123.45.67.89'
+    packed = socket.inet_aton(ip)
+    w_p = space.appexec([w_socket, space.wrap(ip)],
+                        "(_socket, ip): return _socket.inet_aton(ip)")
+    assert space.unwrap(w_p) == packed
+    w_ip = space.appexec([w_socket, space.wrap(packed)],
+                         "(_socket, p): return _socket.inet_ntoa(p)")
+    assert space.unwrap(w_ip) == ip
+
+def test_pton():
+    if not hasattr(socket, 'inet_pton'):
+        py.test.skip('No socket.(inet_pton|inet_ntop) on this platform')
+    w_p = space.appexec([w_socket, space.wrap(ip)],
+                        "(_socket, ip): return _socket.inet_pton(_socket.AF_INET, ip)")
+    assert space.unwrap(w_p) == packed
+    w_ip = space.appexec([w_socket, space.wrap(packed)],
+                         "(_socket, p): return _socket.inet_ntop(_socket.AF_INET, p)")
+    assert space.unwrap(w_ip) == ip
+    
+
+def test_has_ipv6():
+    res = space.appexec([w_socket], "(_socket): return _socket.has_ipv6")
+    assert space.unwrap(res) == socket.has_ipv6
+



More information about the Pypy-commit mailing list