[Python-Dev] Other SSL issues in the tracker have been marked
Guido van Rossum
guido at python.org
Sun Aug 26 21:35:26 CEST 2007
No need, I've submitted this for you. Fingers crossed.
On 8/26/07, Bill Janssen <janssen at parc.com> wrote:
> > But I think this exposes a more generic bug in test_ssl.py, which is
> > that the server thread doesn't die when one of these failures occurs.
> > It probably should. I'll make a patch -- but I don't have a system
> > that this fails on, how will I test it?
>
> Here's a patch which makes test_ssl a better player in the buildbots
> environment. I deep-ended on "try-except-else" clauses.
>
> Should I post this as an issue to the Tracker?
>
> Bill
>
> Index: Lib/ssl.py
> ===================================================================
> --- Lib/ssl.py (revision 57506)
> +++ Lib/ssl.py (working copy)
> @@ -100,12 +100,13 @@
> # see if it's connected
> try:
> socket.getpeername(self)
> - # yes
> + except:
> + # no, no connection yet
> + self._sslobj = None
> + else:
> + # yes, create the SSL object
> self._sslobj = _ssl.sslwrap(self._sock, 0, keyfile, certfile,
> cert_reqs, ssl_version, ca_certs)
> - except:
> - # no
> - self._sslobj = None
> self.keyfile = keyfile
> self.certfile = certfile
> self.cert_reqs = cert_reqs
> Index: Lib/test/test_ssl.py
> ===================================================================
> --- Lib/test/test_ssl.py (revision 57506)
> +++ Lib/test/test_ssl.py (working copy)
> @@ -91,38 +91,66 @@
> def testTLSecho (self):
>
> s1 = socket.socket()
> - s1.connect(('127.0.0.1', 10024))
> - c1 = ssl.sslsocket(s1, ssl_version=ssl.PROTOCOL_TLSv1)
> - indata = "FOO\n"
> - c1.write(indata)
> - outdata = c1.read()
> - if outdata != indata.lower():
> - sys.stderr.write("bad data <<%s>> received\n" % data)
> - c1.close()
> + try:
> + s1.connect(('127.0.0.1', 10024))
> + except:
> + sys.stdout.write("connection failure:\n" + string.join(
> + traceback.format_exception(*sys.exc_info())))
> + raise test_support.TestFailed("Can't connect to test server")
> + else:
> + try:
> + c1 = ssl.sslsocket(s1, ssl_version=ssl.PROTOCOL_TLSv1)
> + except:
> + sys.stdout.write("SSL handshake failure:\n" + string.join(
> + traceback.format_exception(*sys.exc_info())))
> + raise test_support.TestFailed("Can't SSL-handshake with test server")
> + else:
> + if not c1:
> + raise test_support.TestFailed("Can't SSL-handshake with test server")
> + indata = "FOO\n"
> + c1.write(indata)
> + outdata = c1.read()
> + if outdata != indata.lower():
> + raise test_support.TestFailed("bad data <<%s>> received; expected <<%s>>\n" % (data, indata.lower()))
> + c1.close()
>
> def testReadCert(self):
>
> s2 = socket.socket()
> - s2.connect(('127.0.0.1', 10024))
> - c2 = ssl.sslsocket(s2, ssl_version=ssl.PROTOCOL_TLSv1,
> - cert_reqs=ssl.CERT_REQUIRED, ca_certs=CERTFILE)
> - cert = c2.getpeercert()
> - if not cert:
> - raise test_support.TestFailed("Can't get peer certificate.")
> - if not cert.has_key('subject'):
> - raise test_support.TestFailed(
> - "No subject field in certificate: %s." %
> - pprint.pformat(cert))
> - if not (cert['subject'].has_key('organizationName')):
> - raise test_support.TestFailed(
> - "No 'organizationName' field in certificate subject: %s." %
> - pprint.pformat(cert))
> - if (cert['subject']['organizationName'] !=
> - "Python Software Foundation"):
> - raise test_support.TestFailed(
> - "Invalid 'organizationName' field in certificate subject; "
> - "should be 'Python Software Foundation'.");
> - c2.close()
> + try:
> + s2.connect(('127.0.0.1', 10024))
> + except:
> + sys.stdout.write("connection failure:\n" + string.join(
> + traceback.format_exception(*sys.exc_info())))
> + raise test_support.TestFailed("Can't connect to test server")
> + else:
> + try:
> + c2 = ssl.sslsocket(s2, ssl_version=ssl.PROTOCOL_TLSv1,
> + cert_reqs=ssl.CERT_REQUIRED, ca_certs=CERTFILE)
> + except:
> + sys.stdout.write("SSL handshake failure:\n" + string.join(
> + traceback.format_exception(*sys.exc_info())))
> + raise test_support.TestFailed("Can't SSL-handshake with test server")
> + else:
> + if not c2:
> + raise test_support.TestFailed("Can't SSL-handshake with test server")
> + cert = c2.getpeercert()
> + if not cert:
> + raise test_support.TestFailed("Can't get peer certificate.")
> + if not cert.has_key('subject'):
> + raise test_support.TestFailed(
> + "No subject field in certificate: %s." %
> + pprint.pformat(cert))
> + if not (cert['subject'].has_key('organizationName')):
> + raise test_support.TestFailed(
> + "No 'organizationName' field in certificate subject: %s." %
> + pprint.pformat(cert))
> + if (cert['subject']['organizationName'] !=
> + "Python Software Foundation"):
> + raise test_support.TestFailed(
> + "Invalid 'organizationName' field in certificate subject; "
> + "should be 'Python Software Foundation'.");
> + c2.close()
>
>
> class threadedEchoServer(threading.Thread):
> @@ -138,10 +166,22 @@
>
> def run (self):
> self.running = True
> - sslconn = ssl.sslsocket(self.sock, server_side=True,
> - certfile=self.server.certificate,
> - ssl_version=self.server.protocol,
> - cert_reqs=self.server.certreqs)
> + try:
> + sslconn = ssl.sslsocket(self.sock, server_side=True,
> + certfile=self.server.certificate,
> + ssl_version=self.server.protocol,
> + cert_reqs=self.server.certreqs)
> + except:
> + # here, we want to stop the server, because this shouldn't
> + # happen in the context of our test case
> + sys.stdout.write("Test server failure:\n" + string.join(
> + traceback.format_exception(*sys.exc_info())))
> + self.running = False
> + # normally, we'd just stop here, but for the test
> + # harness, we want to stop the server
> + self.server.stop()
> + return
> +
> while self.running:
> try:
> msg = sslconn.read()
> @@ -154,15 +194,18 @@
> self.server.stop()
> self.running = False
> else:
> - # print "server:", msg.strip().lower()
> + sys.stdout.write("\nserver: %s\n" % msg.strip().lower())
> sslconn.write(msg.lower())
> except ssl.sslerror:
> - sys.stderr.write(string.join(
> + sys.stdout.write("Test server failure:\n" + string.join(
> traceback.format_exception(*sys.exc_info())))
> sslconn.close()
> self.running = False
> + # normally, we'd just stop here, but for the test
> + # harness, we want to stop the server
> + self.server.stop()
> except:
> - sys.stderr.write(string.join(
> + sys.stdout.write(string.join(
> traceback.format_exception(*sys.exc_info())))
>
> def __init__(self, port, certificate, ssl_version=None,
> @@ -192,21 +235,21 @@
> while self.active:
> try:
> newconn, connaddr = self.sock.accept()
> - # sys.stderr.write('new connection from ' + str(connaddr))
> + sys.stdout.write('\nserver: new connection from ' + str(connaddr) + '\n')
> handler = self.connectionHandler(self, newconn)
> handler.start()
> except socket.timeout:
> pass
> except KeyboardInterrupt:
> - self.active = False
> + self.stop()
> except:
> - sys.stderr.write(string.join(
> + sys.stdout.write("Test server failure:\n" + string.join(
> traceback.format_exception(*sys.exc_info())))
>
> def stop (self):
> self.active = False
> + self.sock.close()
>
> -
> CERTFILE_CONFIG_TEMPLATE = """
> # create RSA certs - Server
>
> _______________________________________________
> Python-Dev mailing list
> Python-Dev at python.org
> http://mail.python.org/mailman/listinfo/python-dev
> Unsubscribe: http://mail.python.org/mailman/options/python-dev/guido%40python.org
>
--
--Guido van Rossum (home page: http://www.python.org/~guido/)
More information about the Python-Dev
mailing list