[Python-checkins] r80529 - python/trunk/Lib/test/test_ssl.py

antoine.pitrou python-checkins at python.org
Tue Apr 27 12:32:58 CEST 2010


Author: antoine.pitrou
Date: Tue Apr 27 12:32:58 2010
New Revision: 80529

Log:
Qualify or remove or bare excepts.  Simplify exception handling in places.
Remove uses of test_support.TestFailed.



Modified:
   python/trunk/Lib/test/test_ssl.py

Modified: python/trunk/Lib/test/test_ssl.py
==============================================================================
--- python/trunk/Lib/test/test_ssl.py	(original)
+++ python/trunk/Lib/test/test_ssl.py	Tue Apr 27 12:32:58 2010
@@ -62,7 +62,7 @@
         s.connect(("svn.python.org", 443))
         c = s.getpeercert()
         if c:
-            raise test_support.TestFailed("Peer cert %s shouldn't be here!")
+            self.fail("Peer cert %s shouldn't be here!")
         s.close()
 
         # this should fail because we have no verification certs
@@ -112,8 +112,7 @@
         d1 = ssl.PEM_cert_to_DER_cert(pem)
         p2 = ssl.DER_cert_to_PEM_cert(d1)
         d2 = ssl.PEM_cert_to_DER_cert(p2)
-        if (d1 != d2):
-            raise test_support.TestFailed("PEM-to-DER or DER-to-PEM translation failed")
+        self.assertEqual(d1, d2)
 
     def test_openssl_version(self):
         n = ssl.OPENSSL_VERSION_NUMBER
@@ -178,7 +177,7 @@
         s.connect(("svn.python.org", 443))
         c = s.getpeercert()
         if c:
-            raise test_support.TestFailed("Peer cert %s shouldn't be here!")
+            self.fail("Peer cert %s shouldn't be here!")
         s.close()
 
         # this should fail because we have no verification certs
@@ -197,8 +196,6 @@
                             ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
         try:
             s.connect(("svn.python.org", 443))
-        except ssl.SSLError, x:
-            raise test_support.TestFailed("Unexpected exception %s" % x)
         finally:
             s.close()
 
@@ -249,7 +246,7 @@
 
         pem = ssl.get_server_certificate(("svn.python.org", 443))
         if not pem:
-            raise test_support.TestFailed("No server certificate on svn.python.org:443!")
+            self.fail("No server certificate on svn.python.org:443!")
 
         try:
             pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=CERTFILE)
@@ -257,11 +254,11 @@
             #should fail
             pass
         else:
-            raise test_support.TestFailed("Got server certificate %s for svn.python.org!" % pem)
+            self.fail("Got server certificate %s for svn.python.org!" % pem)
 
         pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
         if not pem:
-            raise test_support.TestFailed("No server certificate on svn.python.org:443!")
+            self.fail("No server certificate on svn.python.org:443!")
         if test_support.verbose:
             sys.stdout.write("\nVerified certificate for svn.python.org:443 is\n%s\n" % pem)
 
@@ -334,20 +331,17 @@
                                                    ca_certs=self.server.cacerts,
                                                    cert_reqs=self.server.certreqs,
                                                    ciphers=self.server.ciphers)
-                except:
+                except ssl.SSLError:
+                    # XXX Various errors can have happened here, for example
+                    # a mismatching protocol version, an invalid certificate,
+                    # or a low-level bug. This should be made more discriminating.
                     if self.server.chatty:
                         handle_error("\n server:  bad connection attempt from " +
                                      str(self.sock.getpeername()) + ":\n")
                     self.close()
-                    if not self.server.expect_bad_connects:
-                        # here, we want to stop the server, because this shouldn't
-                        # happen in the context of our test case
-                        self.running = False
-                        # normally, we'd just stop here, but for the test
-                        # harness, we want to stop the server
-                        self.server.stop()
+                    self.running = False
+                    self.server.stop()
                     return False
-
                 else:
                     return True
 
@@ -418,11 +412,9 @@
                         # normally, we'd just stop here, but for the test
                         # harness, we want to stop the server
                         self.server.stop()
-                    except:
-                        handle_error('')
 
         def __init__(self, certificate, ssl_version=None,
-                     certreqs=None, cacerts=None, expect_bad_connects=False,
+                     certreqs=None, cacerts=None,
                      chatty=True, connectionchatty=False, starttls_server=False,
                      wrap_accepting_socket=False, ciphers=None):
 
@@ -435,7 +427,6 @@
             self.certreqs = certreqs
             self.cacerts = cacerts
             self.ciphers = ciphers
-            self.expect_bad_connects = expect_bad_connects
             self.chatty = chatty
             self.connectionchatty = connectionchatty
             self.starttls_server = starttls_server
@@ -478,9 +469,6 @@
                     pass
                 except KeyboardInterrupt:
                     self.stop()
-                except:
-                    if self.chatty:
-                        handle_error("Test server failure:\n")
             self.sock.close()
 
         def stop (self):
@@ -526,7 +514,8 @@
                         self._do_ssl_handshake()
                     else:
                         data = self.recv(1024)
-                        self.send(data.lower())
+                        if data and data.strip() != 'over':
+                            self.send(data.lower())
 
                 def handle_close(self):
                     self.close()
@@ -572,10 +561,7 @@
             if self.flag:
                 self.flag.set()
             while self.active:
-                try:
-                    asyncore.loop(1)
-                except:
-                    pass
+                asyncore.loop(0.05)
 
         def stop (self):
             self.active = False
@@ -698,12 +684,8 @@
             except ssl.SSLError, x:
                 if test_support.verbose:
                     sys.stdout.write("\nSSLError is %s\n" % x[1])
-            except socket.error, x:
-                if test_support.verbose:
-                    sys.stdout.write("\nsocket.error is %s\n" % x[1])
             else:
-                raise test_support.TestFailed(
-                    "Use of invalid cert should have failed!")
+                self.fail("Use of invalid cert should have failed!")
         finally:
             server.stop()
             server.join()
@@ -729,39 +711,33 @@
         if client_protocol is None:
             client_protocol = protocol
         try:
-            try:
-                s = ssl.wrap_socket(socket.socket(),
-                                    certfile=client_certfile,
-                                    ca_certs=cacertsfile,
-                                    ciphers=ciphers,
-                                    cert_reqs=certreqs,
-                                    ssl_version=client_protocol)
-                s.connect((HOST, server.port))
-            except ssl.SSLError, x:
-                raise test_support.TestFailed("Unexpected SSL error:  " + str(x))
-            except Exception, x:
-                raise test_support.TestFailed("Unexpected exception:  " + str(x))
-            else:
-                for arg in [indata, bytearray(indata), memoryview(indata)]:
-                    if connectionchatty:
-                        if test_support.verbose:
-                            sys.stdout.write(
-                                " client:  sending %s...\n" % (repr(arg)))
-                    s.write(arg)
-                    outdata = s.read()
-                    if connectionchatty:
-                        if test_support.verbose:
-                            sys.stdout.write(" client:  read %s\n" % repr(outdata))
-                    if outdata != indata.lower():
-                        raise test_support.TestFailed(
-                            "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
-                            % (outdata[:min(len(outdata),20)], len(outdata),
-                               indata[:min(len(indata),20)].lower(), len(indata)))
-                s.write("over\n")
+            s = ssl.wrap_socket(socket.socket(),
+                                certfile=client_certfile,
+                                ca_certs=cacertsfile,
+                                ciphers=ciphers,
+                                cert_reqs=certreqs,
+                                ssl_version=client_protocol)
+            s.connect((HOST, server.port))
+            for arg in [indata, bytearray(indata), memoryview(indata)]:
                 if connectionchatty:
                     if test_support.verbose:
-                        sys.stdout.write(" client:  closing connection.\n")
-                s.close()
+                        sys.stdout.write(
+                            " client:  sending %s...\n" % (repr(arg)))
+                s.write(arg)
+                outdata = s.read()
+                if connectionchatty:
+                    if test_support.verbose:
+                        sys.stdout.write(" client:  read %s\n" % repr(outdata))
+                if outdata != indata.lower():
+                    self.fail(
+                        "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
+                        % (outdata[:min(len(outdata),20)], len(outdata),
+                           indata[:min(len(indata),20)].lower(), len(indata)))
+            s.write("over\n")
+            if connectionchatty:
+                if test_support.verbose:
+                    sys.stdout.write(" client:  closing connection.\n")
+            s.close()
         finally:
             server.stop()
             server.join()
@@ -793,12 +769,17 @@
             serverParamsTest(CERTFILE, server_protocol, certsreqs,
                              CERTFILE, CERTFILE, client_protocol,
                              ciphers="ALL", chatty=False)
-        except test_support.TestFailed:
+        # Protocol mismatch can result in either an SSLError, or a
+        # "Connection reset by peer" error.
+        except ssl.SSLError:
             if expectedToWork:
                 raise
+        except socket.error as e:
+            if expectedToWork or e.errno != errno.ECONNRESET:
+                raise
         else:
             if not expectedToWork:
-                raise test_support.TestFailed(
+                self.fail(
                     "Client protocol %s succeeded with server protocol %s!"
                     % (ssl.get_protocol_name(client_protocol),
                        ssl.get_protocol_name(server_protocol)))
@@ -835,8 +816,7 @@
                 except IOError:
                     pass
                 else:
-                    raise test_support.TestFailed(
-                          'connecting to closed SSL socket should have failed')
+                    self.fail('connecting to closed SSL socket should have failed')
 
             t = threading.Thread(target=listener)
             t.start()
@@ -869,41 +849,27 @@
             flag.wait()
             # try to connect
             try:
-                try:
-                    s = ssl.wrap_socket(socket.socket(),
-                                        certfile=CERTFILE,
-                                        ca_certs=CERTFILE,
-                                        cert_reqs=ssl.CERT_REQUIRED,
-                                        ssl_version=ssl.PROTOCOL_SSLv23)
-                    s.connect((HOST, server.port))
-                except ssl.SSLError, x:
-                    raise test_support.TestFailed(
-                        "Unexpected SSL error:  " + str(x))
-                except Exception, x:
-                    raise test_support.TestFailed(
-                        "Unexpected exception:  " + str(x))
-                else:
-                    if not s:
-                        raise test_support.TestFailed(
-                            "Can't SSL-handshake with test server")
-                    cert = s.getpeercert()
-                    if not cert:
-                        raise test_support.TestFailed(
-                            "Can't get peer certificate.")
-                    cipher = s.cipher()
-                    if test_support.verbose:
-                        sys.stdout.write(pprint.pformat(cert) + '\n')
-                        sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
-                    if 'subject' not in cert:
-                        raise test_support.TestFailed(
-                            "No subject field in certificate: %s." %
-                            pprint.pformat(cert))
-                    if ((('organizationName', 'Python Software Foundation'),)
-                        not in cert['subject']):
-                        raise test_support.TestFailed(
-                            "Missing or invalid 'organizationName' field in certificate subject; "
-                            "should be 'Python Software Foundation'.")
-                    s.close()
+                s = ssl.wrap_socket(socket.socket(),
+                                    certfile=CERTFILE,
+                                    ca_certs=CERTFILE,
+                                    cert_reqs=ssl.CERT_REQUIRED,
+                                    ssl_version=ssl.PROTOCOL_SSLv23)
+                s.connect((HOST, server.port))
+                cert = s.getpeercert()
+                self.assertTrue(cert, "Can't get peer certificate.")
+                cipher = s.cipher()
+                if test_support.verbose:
+                    sys.stdout.write(pprint.pformat(cert) + '\n')
+                    sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
+                if 'subject' not in cert:
+                    self.fail("No subject field in certificate: %s." %
+                              pprint.pformat(cert))
+                if ((('organizationName', 'Python Software Foundation'),)
+                    not in cert['subject']):
+                    self.fail(
+                        "Missing or invalid 'organizationName' field in certificate subject; "
+                        "should be 'Python Software Foundation'.")
+                s.close()
             finally:
                 server.stop()
                 server.join()
@@ -936,7 +902,7 @@
                 sys.stdout.write("\n")
             try:
                 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
-            except test_support.TestFailed, x:
+            except (SSLError, socket.error), x:
                 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
                 if test_support.verbose:
                     sys.stdout.write(
@@ -990,52 +956,48 @@
             # try to connect
             wrapped = False
             try:
-                try:
-                    s = socket.socket()
-                    s.setblocking(1)
-                    s.connect((HOST, server.port))
-                except Exception, x:
-                    raise test_support.TestFailed("Unexpected exception:  " + str(x))
-                else:
+                s = socket.socket()
+                s.setblocking(1)
+                s.connect((HOST, server.port))
+                if test_support.verbose:
+                    sys.stdout.write("\n")
+                for indata in msgs:
                     if test_support.verbose:
-                        sys.stdout.write("\n")
-                    for indata in msgs:
+                        sys.stdout.write(
+                            " client:  sending %s...\n" % repr(indata))
+                    if wrapped:
+                        conn.write(indata)
+                        outdata = conn.read()
+                    else:
+                        s.send(indata)
+                        outdata = s.recv(1024)
+                    if (indata == "STARTTLS" and
+                        outdata.strip().lower().startswith("ok")):
                         if test_support.verbose:
                             sys.stdout.write(
-                                " client:  sending %s...\n" % repr(indata))
-                        if wrapped:
-                            conn.write(indata)
-                            outdata = conn.read()
-                        else:
-                            s.send(indata)
-                            outdata = s.recv(1024)
-                        if (indata == "STARTTLS" and
-                            outdata.strip().lower().startswith("ok")):
-                            if test_support.verbose:
-                                sys.stdout.write(
-                                    " client:  read %s from server, starting TLS...\n"
-                                    % repr(outdata))
-                            conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
-                            wrapped = True
-                        elif (indata == "ENDTLS" and
-                            outdata.strip().lower().startswith("ok")):
-                            if test_support.verbose:
-                                sys.stdout.write(
-                                    " client:  read %s from server, ending TLS...\n"
-                                    % repr(outdata))
-                            s = conn.unwrap()
-                            wrapped = False
-                        else:
-                            if test_support.verbose:
-                                sys.stdout.write(
-                                    " client:  read %s from server\n" % repr(outdata))
-                    if test_support.verbose:
-                        sys.stdout.write(" client:  closing connection.\n")
-                    if wrapped:
-                        conn.write("over\n")
+                                " client:  read %s from server, starting TLS...\n"
+                                % repr(outdata))
+                        conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
+                        wrapped = True
+                    elif (indata == "ENDTLS" and
+                        outdata.strip().lower().startswith("ok")):
+                        if test_support.verbose:
+                            sys.stdout.write(
+                                " client:  read %s from server, ending TLS...\n"
+                                % repr(outdata))
+                        s = conn.unwrap()
+                        wrapped = False
                     else:
-                        s.send("over\n")
-                    s.close()
+                        if test_support.verbose:
+                            sys.stdout.write(
+                                " client:  read %s from server\n" % repr(outdata))
+                if test_support.verbose:
+                    sys.stdout.write(" client:  closing connection.\n")
+                if wrapped:
+                    conn.write("over\n")
+                else:
+                    s.send("over\n")
+                s.close()
             finally:
                 server.stop()
                 server.join()
@@ -1066,15 +1028,7 @@
                             " client: read %d bytes from remote server '%s'\n"
                             % (len(d2), server))
                 f.close()
-            except:
-                msg = ''.join(traceback.format_exception(*sys.exc_info()))
-                if test_support.verbose:
-                    sys.stdout.write('\n' + msg)
-                raise test_support.TestFailed(msg)
-            else:
-                if not (d1 == d2):
-                    raise test_support.TestFailed(
-                        "Couldn't fetch data from HTTPS server")
+                self.assertEqual(d1, d2)
             finally:
                 server.stop()
                 server.join()
@@ -1102,30 +1056,24 @@
             flag.wait()
             # try to connect
             try:
-                try:
-                    s = ssl.wrap_socket(socket.socket())
-                    s.connect(('127.0.0.1', server.port))
-                except ssl.SSLError, x:
-                    raise test_support.TestFailed("Unexpected SSL error:  " + str(x))
-                except Exception, x:
-                    raise test_support.TestFailed("Unexpected exception:  " + str(x))
-                else:
-                    if test_support.verbose:
-                        sys.stdout.write(
-                            " client:  sending %s...\n" % (repr(indata)))
-                    s.write(indata)
-                    outdata = s.read()
-                    if test_support.verbose:
-                        sys.stdout.write(" client:  read %s\n" % repr(outdata))
-                    if outdata != indata.lower():
-                        raise test_support.TestFailed(
-                            "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
-                            % (outdata[:min(len(outdata),20)], len(outdata),
-                               indata[:min(len(indata),20)].lower(), len(indata)))
-                    s.write("over\n")
-                    if test_support.verbose:
-                        sys.stdout.write(" client:  closing connection.\n")
-                    s.close()
+                s = ssl.wrap_socket(socket.socket())
+                s.connect(('127.0.0.1', server.port))
+                if test_support.verbose:
+                    sys.stdout.write(
+                        " client:  sending %s...\n" % (repr(indata)))
+                s.write(indata)
+                outdata = s.read()
+                if test_support.verbose:
+                    sys.stdout.write(" client:  read %s\n" % repr(outdata))
+                if outdata != indata.lower():
+                    self.fail(
+                        "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
+                        % (outdata[:min(len(outdata),20)], len(outdata),
+                           indata[:min(len(indata),20)].lower(), len(indata)))
+                s.write("over\n")
+                if test_support.verbose:
+                    sys.stdout.write(" client:  closing connection.\n")
+                s.close()
             finally:
                 server.stop()
                 # wait for server thread to end
@@ -1148,19 +1096,14 @@
             # wait for it to start
             flag.wait()
             # try to connect
+            s = ssl.wrap_socket(socket.socket(),
+                                server_side=False,
+                                certfile=CERTFILE,
+                                ca_certs=CERTFILE,
+                                cert_reqs=ssl.CERT_NONE,
+                                ssl_version=ssl.PROTOCOL_TLSv1)
+            s.connect((HOST, server.port))
             try:
-                s = ssl.wrap_socket(socket.socket(),
-                                    server_side=False,
-                                    certfile=CERTFILE,
-                                    ca_certs=CERTFILE,
-                                    cert_reqs=ssl.CERT_NONE,
-                                    ssl_version=ssl.PROTOCOL_TLSv1)
-                s.connect((HOST, server.port))
-            except ssl.SSLError as x:
-                self.fail("Unexpected SSL error:  " + str(x))
-            except Exception as x:
-                self.fail("Unexpected exception:  " + str(x))
-            else:
                 # helper methods for standardising recv* method signatures
                 def _recv_into():
                     b = bytearray("\0"*100)


More information about the Python-checkins mailing list