[Python-Dev] Other SSL issues in the tracker have been marked
Bill Janssen
janssen at parc.com
Sun Aug 26 20:51:14 CEST 2007
> But I think this exposes a more generic bug in test_ssl.py, which is
> that the server thread doesn't die when one of these failures occurs.
> It probably should. I'll make a patch -- but I don't have a system
> that this fails on, how will I test it?
Here's a patch which makes test_ssl a better player in the buildbots
environment. I deep-ended on "try-except-else" clauses.
Should I post this as an issue to the Tracker?
Bill
Index: Lib/ssl.py
===================================================================
--- Lib/ssl.py (revision 57506)
+++ Lib/ssl.py (working copy)
@@ -100,12 +100,13 @@
# see if it's connected
try:
socket.getpeername(self)
- # yes
+ except:
+ # no, no connection yet
+ self._sslobj = None
+ else:
+ # yes, create the SSL object
self._sslobj = _ssl.sslwrap(self._sock, 0, keyfile, certfile,
cert_reqs, ssl_version, ca_certs)
- except:
- # no
- self._sslobj = None
self.keyfile = keyfile
self.certfile = certfile
self.cert_reqs = cert_reqs
Index: Lib/test/test_ssl.py
===================================================================
--- Lib/test/test_ssl.py (revision 57506)
+++ Lib/test/test_ssl.py (working copy)
@@ -91,38 +91,66 @@
def testTLSecho (self):
s1 = socket.socket()
- s1.connect(('127.0.0.1', 10024))
- c1 = ssl.sslsocket(s1, ssl_version=ssl.PROTOCOL_TLSv1)
- indata = "FOO\n"
- c1.write(indata)
- outdata = c1.read()
- if outdata != indata.lower():
- sys.stderr.write("bad data <<%s>> received\n" % data)
- c1.close()
+ try:
+ s1.connect(('127.0.0.1', 10024))
+ except:
+ sys.stdout.write("connection failure:\n" + string.join(
+ traceback.format_exception(*sys.exc_info())))
+ raise test_support.TestFailed("Can't connect to test server")
+ else:
+ try:
+ c1 = ssl.sslsocket(s1, ssl_version=ssl.PROTOCOL_TLSv1)
+ except:
+ sys.stdout.write("SSL handshake failure:\n" + string.join(
+ traceback.format_exception(*sys.exc_info())))
+ raise test_support.TestFailed("Can't SSL-handshake with test server")
+ else:
+ if not c1:
+ raise test_support.TestFailed("Can't SSL-handshake with test server")
+ indata = "FOO\n"
+ c1.write(indata)
+ outdata = c1.read()
+ if outdata != indata.lower():
+ raise test_support.TestFailed("bad data <<%s>> received; expected <<%s>>\n" % (data, indata.lower()))
+ c1.close()
def testReadCert(self):
s2 = socket.socket()
- s2.connect(('127.0.0.1', 10024))
- c2 = ssl.sslsocket(s2, ssl_version=ssl.PROTOCOL_TLSv1,
- cert_reqs=ssl.CERT_REQUIRED, ca_certs=CERTFILE)
- cert = c2.getpeercert()
- if not cert:
- raise test_support.TestFailed("Can't get peer certificate.")
- if not cert.has_key('subject'):
- raise test_support.TestFailed(
- "No subject field in certificate: %s." %
- pprint.pformat(cert))
- if not (cert['subject'].has_key('organizationName')):
- raise test_support.TestFailed(
- "No 'organizationName' field in certificate subject: %s." %
- pprint.pformat(cert))
- if (cert['subject']['organizationName'] !=
- "Python Software Foundation"):
- raise test_support.TestFailed(
- "Invalid 'organizationName' field in certificate subject; "
- "should be 'Python Software Foundation'.");
- c2.close()
+ try:
+ s2.connect(('127.0.0.1', 10024))
+ except:
+ sys.stdout.write("connection failure:\n" + string.join(
+ traceback.format_exception(*sys.exc_info())))
+ raise test_support.TestFailed("Can't connect to test server")
+ else:
+ try:
+ c2 = ssl.sslsocket(s2, ssl_version=ssl.PROTOCOL_TLSv1,
+ cert_reqs=ssl.CERT_REQUIRED, ca_certs=CERTFILE)
+ except:
+ sys.stdout.write("SSL handshake failure:\n" + string.join(
+ traceback.format_exception(*sys.exc_info())))
+ raise test_support.TestFailed("Can't SSL-handshake with test server")
+ else:
+ if not c2:
+ raise test_support.TestFailed("Can't SSL-handshake with test server")
+ cert = c2.getpeercert()
+ if not cert:
+ raise test_support.TestFailed("Can't get peer certificate.")
+ if not cert.has_key('subject'):
+ raise test_support.TestFailed(
+ "No subject field in certificate: %s." %
+ pprint.pformat(cert))
+ if not (cert['subject'].has_key('organizationName')):
+ raise test_support.TestFailed(
+ "No 'organizationName' field in certificate subject: %s." %
+ pprint.pformat(cert))
+ if (cert['subject']['organizationName'] !=
+ "Python Software Foundation"):
+ raise test_support.TestFailed(
+ "Invalid 'organizationName' field in certificate subject; "
+ "should be 'Python Software Foundation'.");
+ c2.close()
class threadedEchoServer(threading.Thread):
@@ -138,10 +166,22 @@
def run (self):
self.running = True
- sslconn = ssl.sslsocket(self.sock, server_side=True,
- certfile=self.server.certificate,
- ssl_version=self.server.protocol,
- cert_reqs=self.server.certreqs)
+ try:
+ sslconn = ssl.sslsocket(self.sock, server_side=True,
+ certfile=self.server.certificate,
+ ssl_version=self.server.protocol,
+ cert_reqs=self.server.certreqs)
+ except:
+ # here, we want to stop the server, because this shouldn't
+ # happen in the context of our test case
+ sys.stdout.write("Test server failure:\n" + string.join(
+ traceback.format_exception(*sys.exc_info())))
+ self.running = False
+ # normally, we'd just stop here, but for the test
+ # harness, we want to stop the server
+ self.server.stop()
+ return
+
while self.running:
try:
msg = sslconn.read()
@@ -154,15 +194,18 @@
self.server.stop()
self.running = False
else:
- # print "server:", msg.strip().lower()
+ sys.stdout.write("\nserver: %s\n" % msg.strip().lower())
sslconn.write(msg.lower())
except ssl.sslerror:
- sys.stderr.write(string.join(
+ sys.stdout.write("Test server failure:\n" + string.join(
traceback.format_exception(*sys.exc_info())))
sslconn.close()
self.running = False
+ # normally, we'd just stop here, but for the test
+ # harness, we want to stop the server
+ self.server.stop()
except:
- sys.stderr.write(string.join(
+ sys.stdout.write(string.join(
traceback.format_exception(*sys.exc_info())))
def __init__(self, port, certificate, ssl_version=None,
@@ -192,21 +235,21 @@
while self.active:
try:
newconn, connaddr = self.sock.accept()
- # sys.stderr.write('new connection from ' + str(connaddr))
+ sys.stdout.write('\nserver: new connection from ' + str(connaddr) + '\n')
handler = self.connectionHandler(self, newconn)
handler.start()
except socket.timeout:
pass
except KeyboardInterrupt:
- self.active = False
+ self.stop()
except:
- sys.stderr.write(string.join(
+ sys.stdout.write("Test server failure:\n" + string.join(
traceback.format_exception(*sys.exc_info())))
def stop (self):
self.active = False
+ self.sock.close()
-
CERTFILE_CONFIG_TEMPLATE = """
# create RSA certs - Server
More information about the Python-Dev
mailing list