[pypy-svn] r30690 - in pypy/dist/pypy/module/_ssl: . test
rhymes at codespeak.net
rhymes at codespeak.net
Fri Jul 28 16:08:24 CEST 2006
Author: rhymes
Date: Fri Jul 28 16:08:22 2006
New Revision: 30690
Modified:
pypy/dist/pypy/module/_ssl/interp_ssl.py
pypy/dist/pypy/module/_ssl/test/test_ssl.py
Log:
exposed server() and issuer() in the SSLObject. Also refactored _ssl_set_error for the ext compiler. But the modules does not compile anyway...
Modified: pypy/dist/pypy/module/_ssl/interp_ssl.py
==============================================================================
--- pypy/dist/pypy/module/_ssl/interp_ssl.py (original)
+++ pypy/dist/pypy/module/_ssl/interp_ssl.py Fri Jul 28 16:08:22 2006
@@ -3,7 +3,9 @@
import pypy.rpython.rctypes.implementation # this defines rctypes magic
from pypy.rpython.rctypes.aerrno import geterrno
from pypy.interpreter.error import OperationError
-from pypy.interpreter.baseobjspace import W_Root, ObjSpace
+from pypy.interpreter.baseobjspace import W_Root, ObjSpace, Wrappable
+from pypy.interpreter.typedef import TypeDef
+from pypy.interpreter.gateway import interp2app
from ctypes import *
import ctypes.util
import sys
@@ -94,6 +96,8 @@
pollfd = cConfig.pollfd
nfds_t = cConfig.nfds_t
+arr_x509 = c_char * X509_NAME_MAXLEN
+
constants = {}
constants["SSL_ERROR_ZERO_RETURN"] = PY_SSL_ERROR_ZERO_RETURN
constants["SSL_ERROR_WANT_READ"] = PY_SSL_ERROR_WANT_READ
@@ -153,8 +157,8 @@
libssl.X509_get_subject_name.restype = POINTER(X509_NAME)
libssl.X509_get_issuer_name.argtypes = [POINTER(X509)]
libssl.X509_get_issuer_name.restype = POINTER(X509_NAME)
-libssl.X509_NAME_oneline.argtypes = [POINTER(X509_NAME), c_char_p, c_int]
-libssl.X509_NAME_oneline.restype = c_char_p
+libssl.X509_NAME_oneline.argtypes = [POINTER(X509_NAME), arr_x509, c_int]
+libssl.X509_NAME_oneline.restype = arr_x509
def _init_ssl():
libssl.SSL_load_error_strings()
@@ -201,18 +205,34 @@
return space.wrap(bytes)
RAND_egd.unwrap_spec = [ObjSpace, str]
-class SSLObject(object):
- def __init__(self):
- self.socket = None
+class SSLObject(Wrappable):
+ def __init__(self, space):
+ self.space = space
+ self.w_socket = None
self.ctx = POINTER(SSL_CTX)()
self.ssl = POINTER(SSL)()
self.server_cert = POINTER(X509)()
- arr = c_char * X509_NAME_MAXLEN
- self.server = arr()
- self.issuer = arr()
+ self._server = arr_x509()
+ self._issuer = arr_x509()
+
+ def server(self):
+ return self.space.wrap(self._server.value)
+ server.unwrap_spec = ['self']
+
+ def issuer(self):
+ return self.space.wrap(self._issuer.value)
+ issuer.unwrap_spec = ['self']
+
+SSLObject.typedef = TypeDef("SSLObject",
+ server = interp2app(SSLObject.server,
+ unwrap_spec=SSLObject.server.unwrap_spec),
+ issuer = interp2app(SSLObject.issuer,
+ unwrap_spec=SSLObject.issuer.unwrap_spec)
+)
+
def new_sslobject(space, w_sock, w_key_file, w_cert_file):
- ss = SSLObject()
+ ss = SSLObject(space)
sock_fd = space.int_w(space.call_method(w_sock, "fileno"))
w_timeout = space.call_method(w_sock, "gettimeout")
@@ -297,7 +317,7 @@
break
if ret < 0:
- errstr, p = _ssl_seterror(ss, ret)
+ errstr, p = _ssl_seterror(space, ss, ret)
raise OperationError(space.w_Exception,
space.wrap("%s: %d" % (errstr, p)))
@@ -306,11 +326,11 @@
ss.server_cert = libssl.SSL_get_peer_certificate(ss.ssl)
if ss.server_cert:
libssl.X509_NAME_oneline(libssl.X509_get_subject_name(ss.server_cert),
- ss.server, X509_NAME_MAXLEN)
+ ss._server, X509_NAME_MAXLEN)
libssl.X509_NAME_oneline(libssl.X509_get_issuer_name(ss.server_cert),
- ss.issuer, X509_NAME_MAXLEN)
+ ss._issuer, X509_NAME_MAXLEN)
- ss.socket = w_sock
+ ss.w_socket = w_sock
return ss
new_sslobject.unwrap_spec = [ObjSpace, W_Root, str, str]
@@ -374,69 +394,55 @@
else:
return SOCKET_OPERATION_OK
-def _ssl_seterror(ss, ret):
+def _ssl_seterror(space, ss, ret):
assert ret <= 0
err = libssl.SSL_get_error(ss.ssl, ret)
-
errstr = ""
- p = 0
+ errval = 0
- err_dict = {
- SSL_ERROR_ZERO_RETURN: {
- 'errstr': "TLS/SSL connection has been closed",
- 'p': PY_SSL_ERROR_ZERO_RETURN
- },
- SSL_ERROR_WANT_READ: {
- 'errstr': "The operation did not complete (read)",
- 'p': PY_SSL_ERROR_WANT_READ
- },
- SSL_ERROR_WANT_WRITE: {
- 'errstr': "The operation did not complete (write)",
- 'p': PY_SSL_ERROR_WANT_WRITE
- },
- SSL_ERROR_WANT_X509_LOOKUP: {
- 'errstr': "The operation did not complete (X509 lookup)",
- 'p': PY_SSL_ERROR_WANT_X509_LOOKUP
- },
- SSL_ERROR_WANT_CONNECT: {
- 'errstr': "The operation did not complete (connect)",
- 'p': PY_SSL_ERROR_WANT_CONNECT
- }
- }
-
- err_info = err_dict.get(err)
- if err_info:
- err_str = err_info['errstr']
- p = err_info['p']
- else:
- if err == SSL_ERROR_SYSCALL:
- e = libssl.ERR_get_error()
- if e == 0:
- if ret == 0 or not ss.socket:
- errstr = "EOF occurred in violation of protocol"
- p = PY_SSL_ERROR_EOF
- elif ret == -1:
- # the underlying BIO reported an I/0 error
- return # sock.errorhandler()?
- else:
- errstr = "Some I/O error occurred"
- p = PY_SSL_ERROR_SYSCALL
+ if err == SSL_ERROR_ZERO_RETURN:
+ errstr = "TLS/SSL connection has been closed"
+ errval = PY_SSL_ERROR_ZERO_RETURN
+ elif err == SSL_ERROR_WANT_READ:
+ errstr = "The operation did not complete (read)"
+ errval = PY_SSL_ERROR_WANT_READ
+ elif err == SSL_ERROR_WANT_WRITE:
+ errstr = "The operation did not complete (write)"
+ errval = PY_SSL_ERROR_WANT_WRITE
+ elif err == SSL_ERROR_WANT_X509_LOOKUP:
+ errstr = "The operation did not complete (X509 lookup)"
+ errval = PY_SSL_ERROR_WANT_X509_LOOKUP
+ elif err == SSL_ERROR_WANT_CONNECT:
+ errstr = "The operation did not complete (connect)"
+ errval = PY_SSL_ERROR_WANT_CONNECT
+ elif err == SSL_ERROR_SYSCALL:
+ e = libssl.ERR_get_error()
+ if e == 0:
+ if ret == 0 or space.is_w(ss.w_socket, space.w_None):
+ errstr = "EOF occurred in violation of protocol"
+ errval = PY_SSL_ERROR_EOF
+ elif ret == -1:
+ # the underlying BIO reported an I/0 error
+ return errstr, errval # sock.errorhandler()?
else:
- errstr = libssl.ERR_error_string(e, None)
- p = PY_SSL_ERROR_SYSCALL
- elif err == SSL_ERROR_SSL:
- e = libssl.ERR_get_error()
- p = PY_SSL_ERROR_SSL
- if e != 0:
- errstr = libssl.ERR_error_string(e, None)
- else:
- errstr = "A failure in the SSL library occurred"
+ errstr = "Some I/O error occurred"
+ errval = PY_SSL_ERROR_SYSCALL
else:
- p = PY_SSL_ERROR_INVALID_ERROR_CODE
- errstr = "Invalid error code"
-
- return errstr, p
+ errstr = libssl.ERR_error_string(e, None)
+ p = PY_SSL_ERROR_SYSCALL
+ elif err == SSL_ERROR_SSL:
+ e = libssl.ERR_get_error()
+ errval = PY_SSL_ERROR_SSL
+ if e != 0:
+ errstr = libssl.ERR_error_string(e, None)
+ else:
+ errstr = "A failure in the SSL library occurred"
+ else:
+ errstr = "Invalid error code"
+ errval = PY_SSL_ERROR_INVALID_ERROR_CODE
+
+ return errstr, errval
def ssl(space, w_socket, w_key_file=None, w_cert_file=None):
Modified: pypy/dist/pypy/module/_ssl/test/test_ssl.py
==============================================================================
--- pypy/dist/pypy/module/_ssl/test/test_ssl.py (original)
+++ pypy/dist/pypy/module/_ssl/test/test_ssl.py Fri Jul 28 16:08:22 2006
@@ -55,4 +55,20 @@
ss = socket.ssl(s)
s.close()
-
\ No newline at end of file
+ def test_server_method(self):
+ import socket
+ ADDR = "connect.sigen-ca.si", 443
+ s = socket.socket()
+ s.connect(ADDR)
+ ss = socket.ssl(s)
+ assert isinstance(ss.server(), str)
+ s.close()
+
+ def test_issuer_method(self):
+ import socket
+ ADDR = "connect.sigen-ca.si", 443
+ s = socket.socket()
+ s.connect(ADDR)
+ ss = socket.ssl(s)
+ assert isinstance(ss.issuer(), str)
+ s.close()
More information about the Pypy-commit
mailing list