[Python-Dev] Other SSL issues in the tracker have been marked

Guido van Rossum guido at python.org
Sun Aug 26 21:35:26 CEST 2007


No need, I've submitted this for you. Fingers crossed.

On 8/26/07, Bill Janssen <janssen at parc.com> wrote:
> > But I think this exposes a more generic bug in test_ssl.py, which is
> > that the server thread doesn't die when one of these failures occurs.
> > It probably should.  I'll make a patch -- but I don't have a system
> > that this fails on, how will I test it?
>
> Here's a patch which makes test_ssl a better player in the buildbots
> environment.  I deep-ended on "try-except-else" clauses.
>
> Should I post this as an issue to the Tracker?
>
> Bill
>
> Index: Lib/ssl.py
> ===================================================================
> --- Lib/ssl.py  (revision 57506)
> +++ Lib/ssl.py  (working copy)
> @@ -100,12 +100,13 @@
>              # see if it's connected
>              try:
>                  socket.getpeername(self)
> -                # yes
> +            except:
> +                # no, no connection yet
> +                self._sslobj = None
> +            else:
> +                # yes, create the SSL object
>                  self._sslobj = _ssl.sslwrap(self._sock, 0, keyfile, certfile,
>                                              cert_reqs, ssl_version, ca_certs)
> -            except:
> -                # no
> -                self._sslobj = None
>          self.keyfile = keyfile
>          self.certfile = certfile
>          self.cert_reqs = cert_reqs
> Index: Lib/test/test_ssl.py
> ===================================================================
> --- Lib/test/test_ssl.py        (revision 57506)
> +++ Lib/test/test_ssl.py        (working copy)
> @@ -91,38 +91,66 @@
>      def testTLSecho (self):
>
>          s1 = socket.socket()
> -        s1.connect(('127.0.0.1', 10024))
> -        c1 = ssl.sslsocket(s1, ssl_version=ssl.PROTOCOL_TLSv1)
> -        indata = "FOO\n"
> -        c1.write(indata)
> -        outdata = c1.read()
> -        if outdata != indata.lower():
> -            sys.stderr.write("bad data <<%s>> received\n" % data)
> -        c1.close()
> +        try:
> +            s1.connect(('127.0.0.1', 10024))
> +        except:
> +            sys.stdout.write("connection failure:\n" + string.join(
> +                traceback.format_exception(*sys.exc_info())))
> +            raise test_support.TestFailed("Can't connect to test server")
> +        else:
> +            try:
> +                c1 = ssl.sslsocket(s1, ssl_version=ssl.PROTOCOL_TLSv1)
> +            except:
> +                sys.stdout.write("SSL handshake failure:\n" + string.join(
> +                    traceback.format_exception(*sys.exc_info())))
> +                raise test_support.TestFailed("Can't SSL-handshake with test server")
> +            else:
> +                if not c1:
> +                    raise test_support.TestFailed("Can't SSL-handshake with test server")
> +                indata = "FOO\n"
> +                c1.write(indata)
> +                outdata = c1.read()
> +                if outdata != indata.lower():
> +                    raise test_support.TestFailed("bad data <<%s>> received; expected <<%s>>\n" % (data, indata.lower()))
> +                c1.close()
>
>      def testReadCert(self):
>
>          s2 = socket.socket()
> -        s2.connect(('127.0.0.1', 10024))
> -        c2 = ssl.sslsocket(s2, ssl_version=ssl.PROTOCOL_TLSv1,
> -                           cert_reqs=ssl.CERT_REQUIRED, ca_certs=CERTFILE)
> -        cert = c2.getpeercert()
> -        if not cert:
> -            raise test_support.TestFailed("Can't get peer certificate.")
> -        if not cert.has_key('subject'):
> -            raise test_support.TestFailed(
> -                "No subject field in certificate: %s." %
> -                pprint.pformat(cert))
> -        if not (cert['subject'].has_key('organizationName')):
> -            raise test_support.TestFailed(
> -                "No 'organizationName' field in certificate subject: %s." %
> -                pprint.pformat(cert))
> -        if (cert['subject']['organizationName'] !=
> -              "Python Software Foundation"):
> -            raise test_support.TestFailed(
> -                "Invalid 'organizationName' field in certificate subject; "
> -                "should be 'Python Software Foundation'.");
> -        c2.close()
> +        try:
> +            s2.connect(('127.0.0.1', 10024))
> +        except:
> +            sys.stdout.write("connection failure:\n" + string.join(
> +                traceback.format_exception(*sys.exc_info())))
> +            raise test_support.TestFailed("Can't connect to test server")
> +        else:
> +            try:
> +                c2 = ssl.sslsocket(s2, ssl_version=ssl.PROTOCOL_TLSv1,
> +                                   cert_reqs=ssl.CERT_REQUIRED, ca_certs=CERTFILE)
> +            except:
> +                sys.stdout.write("SSL handshake failure:\n" + string.join(
> +                    traceback.format_exception(*sys.exc_info())))
> +                raise test_support.TestFailed("Can't SSL-handshake with test server")
> +            else:
> +                if not c2:
> +                    raise test_support.TestFailed("Can't SSL-handshake with test server")
> +                cert = c2.getpeercert()
> +                if not cert:
> +                    raise test_support.TestFailed("Can't get peer certificate.")
> +                if not cert.has_key('subject'):
> +                    raise test_support.TestFailed(
> +                        "No subject field in certificate: %s." %
> +                        pprint.pformat(cert))
> +                if not (cert['subject'].has_key('organizationName')):
> +                    raise test_support.TestFailed(
> +                        "No 'organizationName' field in certificate subject: %s." %
> +                        pprint.pformat(cert))
> +                if (cert['subject']['organizationName'] !=
> +                      "Python Software Foundation"):
> +                    raise test_support.TestFailed(
> +                        "Invalid 'organizationName' field in certificate subject; "
> +                        "should be 'Python Software Foundation'.");
> +                c2.close()
>
>
>  class threadedEchoServer(threading.Thread):
> @@ -138,10 +166,22 @@
>
>          def run (self):
>              self.running = True
> -            sslconn = ssl.sslsocket(self.sock, server_side=True,
> -                                    certfile=self.server.certificate,
> -                                    ssl_version=self.server.protocol,
> -                                    cert_reqs=self.server.certreqs)
> +            try:
> +                sslconn = ssl.sslsocket(self.sock, server_side=True,
> +                                        certfile=self.server.certificate,
> +                                        ssl_version=self.server.protocol,
> +                                        cert_reqs=self.server.certreqs)
> +            except:
> +                # here, we want to stop the server, because this shouldn't
> +                # happen in the context of our test case
> +                sys.stdout.write("Test server failure:\n" + string.join(
> +                    traceback.format_exception(*sys.exc_info())))
> +                self.running = False
> +                # normally, we'd just stop here, but for the test
> +                # harness, we want to stop the server
> +                self.server.stop()
> +                return
> +
>              while self.running:
>                  try:
>                      msg = sslconn.read()
> @@ -154,15 +194,18 @@
>                          self.server.stop()
>                          self.running = False
>                      else:
> -                        # print "server:", msg.strip().lower()
> +                        sys.stdout.write("\nserver: %s\n" % msg.strip().lower())
>                          sslconn.write(msg.lower())
>                  except ssl.sslerror:
> -                    sys.stderr.write(string.join(
> +                    sys.stdout.write("Test server failure:\n" + string.join(
>                          traceback.format_exception(*sys.exc_info())))
>                      sslconn.close()
>                      self.running = False
> +                    # normally, we'd just stop here, but for the test
> +                    # harness, we want to stop the server
> +                    self.server.stop()
>                  except:
> -                    sys.stderr.write(string.join(
> +                    sys.stdout.write(string.join(
>                          traceback.format_exception(*sys.exc_info())))
>
>      def __init__(self, port, certificate, ssl_version=None,
> @@ -192,21 +235,21 @@
>          while self.active:
>              try:
>                  newconn, connaddr = self.sock.accept()
> -                # sys.stderr.write('new connection from ' + str(connaddr))
> +                sys.stdout.write('\nserver:  new connection from ' + str(connaddr) + '\n')
>                  handler = self.connectionHandler(self, newconn)
>                  handler.start()
>              except socket.timeout:
>                  pass
>              except KeyboardInterrupt:
> -                self.active = False
> +                self.stop()
>              except:
> -                sys.stderr.write(string.join(
> +                sys.stdout.write("Test server failure:\n" + string.join(
>                      traceback.format_exception(*sys.exc_info())))
>
>      def stop (self):
>          self.active = False
> +        self.sock.close()
>
> -
>  CERTFILE_CONFIG_TEMPLATE = """
>  # create RSA certs - Server
>
> _______________________________________________
> Python-Dev mailing list
> Python-Dev at python.org
> http://mail.python.org/mailman/listinfo/python-dev
> Unsubscribe: http://mail.python.org/mailman/options/python-dev/guido%40python.org
>


-- 
--Guido van Rossum (home page: http://www.python.org/~guido/)


More information about the Python-Dev mailing list