[Python-checkins] r80531 - in python/branches/release26-maint: Lib/test/test_ssl.py

antoine.pitrou python-checkins at python.org
Tue Apr 27 12:41:42 CEST 2010


Author: antoine.pitrou
Date: Tue Apr 27 12:41:42 2010
New Revision: 80531

Log:
Merged revisions 80529 via svnmerge from 
svn+ssh://pythondev@svn.python.org/python/trunk

........
  r80529 | antoine.pitrou | 2010-04-27 12:32:58 +0200 (mar., 27 avril 2010) | 4 lines
  
  Qualify or remove or bare excepts.  Simplify exception handling in places.
  Remove uses of test_support.TestFailed.
........


Modified:
   python/branches/release26-maint/   (props changed)
   python/branches/release26-maint/Lib/test/test_ssl.py

Modified: python/branches/release26-maint/Lib/test/test_ssl.py
==============================================================================
--- python/branches/release26-maint/Lib/test/test_ssl.py	(original)
+++ python/branches/release26-maint/Lib/test/test_ssl.py	Tue Apr 27 12:41:42 2010
@@ -65,7 +65,7 @@
         s.connect(("svn.python.org", 443))
         c = s.getpeercert()
         if c:
-            raise test_support.TestFailed("Peer cert %s shouldn't be here!")
+            self.fail("Peer cert %s shouldn't be here!")
         s.close()
 
         # this should fail because we have no verification certs
@@ -115,8 +115,7 @@
         d1 = ssl.PEM_cert_to_DER_cert(pem)
         p2 = ssl.DER_cert_to_PEM_cert(d1)
         d2 = ssl.PEM_cert_to_DER_cert(p2)
-        if (d1 != d2):
-            raise test_support.TestFailed("PEM-to-DER or DER-to-PEM translation failed")
+        self.assertEqual(d1, d2)
 
     def test_refcycle(self):
         # Issue #7943: an SSL object doesn't create reference cycles with
@@ -136,7 +135,7 @@
         s.connect(("svn.python.org", 443))
         c = s.getpeercert()
         if c:
-            raise test_support.TestFailed("Peer cert %s shouldn't be here!")
+            self.fail("Peer cert %s shouldn't be here!")
         s.close()
 
         # this should fail because we have no verification certs
@@ -155,8 +154,6 @@
                             ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
         try:
             s.connect(("svn.python.org", 443))
-        except ssl.SSLError, x:
-            raise test_support.TestFailed("Unexpected exception %s" % x)
         finally:
             s.close()
 
@@ -213,7 +210,7 @@
 
         pem = ssl.get_server_certificate(("svn.python.org", 443))
         if not pem:
-            raise test_support.TestFailed("No server certificate on svn.python.org:443!")
+            self.fail("No server certificate on svn.python.org:443!")
 
         try:
             pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=CERTFILE)
@@ -221,11 +218,11 @@
             #should fail
             pass
         else:
-            raise test_support.TestFailed("Got server certificate %s for svn.python.org!" % pem)
+            self.fail("Got server certificate %s for svn.python.org!" % pem)
 
         pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
         if not pem:
-            raise test_support.TestFailed("No server certificate on svn.python.org:443!")
+            self.fail("No server certificate on svn.python.org:443!")
         if test_support.verbose:
             sys.stdout.write("\nVerified certificate for svn.python.org:443 is\n%s\n" % pem)
 
@@ -299,20 +296,17 @@
                                                    ssl_version=self.server.protocol,
                                                    ca_certs=self.server.cacerts,
                                                    cert_reqs=self.server.certreqs)
-                except:
+                except ssl.SSLError:
+                    # XXX Various errors can have happened here, for example
+                    # a mismatching protocol version, an invalid certificate,
+                    # or a low-level bug. This should be made more discriminating.
                     if self.server.chatty:
                         handle_error("\n server:  bad connection attempt from " +
                                      str(self.sock.getpeername()) + ":\n")
                     self.close()
-                    if not self.server.expect_bad_connects:
-                        # here, we want to stop the server, because this shouldn't
-                        # happen in the context of our test case
-                        self.running = False
-                        # normally, we'd just stop here, but for the test
-                        # harness, we want to stop the server
-                        self.server.stop()
+                    self.running = False
+                    self.server.stop()
                     return False
-
                 else:
                     return True
 
@@ -383,11 +377,9 @@
                         # normally, we'd just stop here, but for the test
                         # harness, we want to stop the server
                         self.server.stop()
-                    except:
-                        handle_error('')
 
         def __init__(self, certificate, ssl_version=None,
-                     certreqs=None, cacerts=None, expect_bad_connects=False,
+                     certreqs=None, cacerts=None,
                      chatty=True, connectionchatty=False, starttls_server=False,
                      wrap_accepting_socket=False):
 
@@ -399,7 +391,6 @@
             self.protocol = ssl_version
             self.certreqs = certreqs
             self.cacerts = cacerts
-            self.expect_bad_connects = expect_bad_connects
             self.chatty = chatty
             self.connectionchatty = connectionchatty
             self.starttls_server = starttls_server
@@ -441,9 +432,6 @@
                     pass
                 except KeyboardInterrupt:
                     self.stop()
-                except:
-                    if self.chatty:
-                        handle_error("Test server failure:\n")
             self.sock.close()
 
         def stop (self):
@@ -489,7 +477,8 @@
                         self._do_ssl_handshake()
                     else:
                         data = self.recv(1024)
-                        self.send(data.lower())
+                        if data and data.strip() != 'over':
+                            self.send(data.lower())
 
                 def handle_close(self):
                     self.close()
@@ -535,10 +524,7 @@
             if self.flag:
                 self.flag.set()
             while self.active:
-                try:
-                    asyncore.loop(1)
-                except:
-                    pass
+                asyncore.loop(0.05)
 
         def stop (self):
             self.active = False
@@ -661,12 +647,8 @@
             except ssl.SSLError, x:
                 if test_support.verbose:
                     sys.stdout.write("\nSSLError is %s\n" % x[1])
-            except socket.error, x:
-                if test_support.verbose:
-                    sys.stdout.write("\nsocket.error is %s\n" % x[1])
             else:
-                raise test_support.TestFailed(
-                    "Use of invalid cert should have failed!")
+                self.fail("Use of invalid cert should have failed!")
         finally:
             server.stop()
             server.join()
@@ -691,37 +673,31 @@
         if client_protocol is None:
             client_protocol = protocol
         try:
-            try:
-                s = ssl.wrap_socket(socket.socket(),
-                                    certfile=client_certfile,
-                                    ca_certs=cacertsfile,
-                                    cert_reqs=certreqs,
-                                    ssl_version=client_protocol)
-                s.connect((HOST, server.port))
-            except ssl.SSLError, x:
-                raise test_support.TestFailed("Unexpected SSL error:  " + str(x))
-            except Exception, x:
-                raise test_support.TestFailed("Unexpected exception:  " + str(x))
-            else:
-                if connectionchatty:
-                    if test_support.verbose:
-                        sys.stdout.write(
-                            " client:  sending %s...\n" % (repr(indata)))
-                s.write(indata)
-                outdata = s.read()
-                if connectionchatty:
-                    if test_support.verbose:
-                        sys.stdout.write(" client:  read %s\n" % repr(outdata))
-                if outdata != indata.lower():
-                    raise test_support.TestFailed(
-                        "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
-                        % (outdata[:min(len(outdata),20)], len(outdata),
-                           indata[:min(len(indata),20)].lower(), len(indata)))
-                s.write("over\n")
-                if connectionchatty:
-                    if test_support.verbose:
-                        sys.stdout.write(" client:  closing connection.\n")
-                s.close()
+            s = ssl.wrap_socket(socket.socket(),
+                                certfile=client_certfile,
+                                ca_certs=cacertsfile,
+                                cert_reqs=certreqs,
+                                ssl_version=client_protocol)
+            s.connect((HOST, server.port))
+            if connectionchatty:
+                if test_support.verbose:
+                    sys.stdout.write(
+                        " client:  sending %s...\n" % (repr(indata)))
+            s.write(indata)
+            outdata = s.read()
+            if connectionchatty:
+                if test_support.verbose:
+                    sys.stdout.write(" client:  read %s\n" % repr(outdata))
+            if outdata != indata.lower():
+                self.fail(
+                    "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
+                    % (outdata[:min(len(outdata),20)], len(outdata),
+                       indata[:min(len(indata),20)].lower(), len(indata)))
+            s.write("over\n")
+            if connectionchatty:
+                if test_support.verbose:
+                    sys.stdout.write(" client:  closing connection.\n")
+            s.close()
         finally:
             server.stop()
             server.join()
@@ -749,12 +725,17 @@
         try:
             serverParamsTest(CERTFILE, server_protocol, certsreqs,
                              CERTFILE, CERTFILE, client_protocol, chatty=False)
-        except test_support.TestFailed:
+        # Protocol mismatch can result in either an SSLError, or a
+        # "Connection reset by peer" error.
+        except ssl.SSLError:
             if expectedToWork:
                 raise
+        except socket.error as e:
+            if expectedToWork or e.errno != errno.ECONNRESET:
+                raise
         else:
             if not expectedToWork:
-                raise test_support.TestFailed(
+                self.fail(
                     "Client protocol %s succeeded with server protocol %s!"
                     % (ssl.get_protocol_name(client_protocol),
                        ssl.get_protocol_name(server_protocol)))
@@ -791,8 +772,7 @@
                 except IOError:
                     pass
                 else:
-                    raise test_support.TestFailed(
-                          'connecting to closed SSL socket should have failed')
+                    self.fail('connecting to closed SSL socket should have failed')
 
             t = threading.Thread(target=listener)
             t.start()
@@ -825,41 +805,27 @@
             flag.wait()
             # try to connect
             try:
-                try:
-                    s = ssl.wrap_socket(socket.socket(),
-                                        certfile=CERTFILE,
-                                        ca_certs=CERTFILE,
-                                        cert_reqs=ssl.CERT_REQUIRED,
-                                        ssl_version=ssl.PROTOCOL_SSLv23)
-                    s.connect((HOST, server.port))
-                except ssl.SSLError, x:
-                    raise test_support.TestFailed(
-                        "Unexpected SSL error:  " + str(x))
-                except Exception, x:
-                    raise test_support.TestFailed(
-                        "Unexpected exception:  " + str(x))
-                else:
-                    if not s:
-                        raise test_support.TestFailed(
-                            "Can't SSL-handshake with test server")
-                    cert = s.getpeercert()
-                    if not cert:
-                        raise test_support.TestFailed(
-                            "Can't get peer certificate.")
-                    cipher = s.cipher()
-                    if test_support.verbose:
-                        sys.stdout.write(pprint.pformat(cert) + '\n')
-                        sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
-                    if not cert.has_key('subject'):
-                        raise test_support.TestFailed(
-                            "No subject field in certificate: %s." %
-                            pprint.pformat(cert))
-                    if ((('organizationName', 'Python Software Foundation'),)
-                        not in cert['subject']):
-                        raise test_support.TestFailed(
-                            "Missing or invalid 'organizationName' field in certificate subject; "
-                            "should be 'Python Software Foundation'.")
-                    s.close()
+                s = ssl.wrap_socket(socket.socket(),
+                                    certfile=CERTFILE,
+                                    ca_certs=CERTFILE,
+                                    cert_reqs=ssl.CERT_REQUIRED,
+                                    ssl_version=ssl.PROTOCOL_SSLv23)
+                s.connect((HOST, server.port))
+                cert = s.getpeercert()
+                self.assertTrue(cert, "Can't get peer certificate.")
+                cipher = s.cipher()
+                if test_support.verbose:
+                    sys.stdout.write(pprint.pformat(cert) + '\n')
+                    sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
+                if 'subject' not in cert:
+                    self.fail("No subject field in certificate: %s." %
+                              pprint.pformat(cert))
+                if ((('organizationName', 'Python Software Foundation'),)
+                    not in cert['subject']):
+                    self.fail(
+                        "Missing or invalid 'organizationName' field in certificate subject; "
+                        "should be 'Python Software Foundation'.")
+                s.close()
             finally:
                 server.stop()
                 server.join()
@@ -892,7 +858,7 @@
                 sys.stdout.write("\n")
             try:
                 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
-            except test_support.TestFailed, x:
+            except (SSLError, socket.error), x:
                 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
                 if test_support.verbose:
                     sys.stdout.write(
@@ -946,52 +912,48 @@
             # try to connect
             wrapped = False
             try:
-                try:
-                    s = socket.socket()
-                    s.setblocking(1)
-                    s.connect((HOST, server.port))
-                except Exception, x:
-                    raise test_support.TestFailed("Unexpected exception:  " + str(x))
-                else:
+                s = socket.socket()
+                s.setblocking(1)
+                s.connect((HOST, server.port))
+                if test_support.verbose:
+                    sys.stdout.write("\n")
+                for indata in msgs:
                     if test_support.verbose:
-                        sys.stdout.write("\n")
-                    for indata in msgs:
+                        sys.stdout.write(
+                            " client:  sending %s...\n" % repr(indata))
+                    if wrapped:
+                        conn.write(indata)
+                        outdata = conn.read()
+                    else:
+                        s.send(indata)
+                        outdata = s.recv(1024)
+                    if (indata == "STARTTLS" and
+                        outdata.strip().lower().startswith("ok")):
                         if test_support.verbose:
                             sys.stdout.write(
-                                " client:  sending %s...\n" % repr(indata))
-                        if wrapped:
-                            conn.write(indata)
-                            outdata = conn.read()
-                        else:
-                            s.send(indata)
-                            outdata = s.recv(1024)
-                        if (indata == "STARTTLS" and
-                            outdata.strip().lower().startswith("ok")):
-                            if test_support.verbose:
-                                sys.stdout.write(
-                                    " client:  read %s from server, starting TLS...\n"
-                                    % repr(outdata))
-                            conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
-                            wrapped = True
-                        elif (indata == "ENDTLS" and
-                            outdata.strip().lower().startswith("ok")):
-                            if test_support.verbose:
-                                sys.stdout.write(
-                                    " client:  read %s from server, ending TLS...\n"
-                                    % repr(outdata))
-                            s = conn.unwrap()
-                            wrapped = False
-                        else:
-                            if test_support.verbose:
-                                sys.stdout.write(
-                                    " client:  read %s from server\n" % repr(outdata))
-                    if test_support.verbose:
-                        sys.stdout.write(" client:  closing connection.\n")
-                    if wrapped:
-                        conn.write("over\n")
+                                " client:  read %s from server, starting TLS...\n"
+                                % repr(outdata))
+                        conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
+                        wrapped = True
+                    elif (indata == "ENDTLS" and
+                        outdata.strip().lower().startswith("ok")):
+                        if test_support.verbose:
+                            sys.stdout.write(
+                                " client:  read %s from server, ending TLS...\n"
+                                % repr(outdata))
+                        s = conn.unwrap()
+                        wrapped = False
                     else:
-                        s.send("over\n")
-                    s.close()
+                        if test_support.verbose:
+                            sys.stdout.write(
+                                " client:  read %s from server\n" % repr(outdata))
+                if test_support.verbose:
+                    sys.stdout.write(" client:  closing connection.\n")
+                if wrapped:
+                    conn.write("over\n")
+                else:
+                    s.send("over\n")
+                s.close()
             finally:
                 server.stop()
                 server.join()
@@ -1021,15 +983,7 @@
                             " client: read %d bytes from remote server '%s'\n"
                             % (len(d2), server))
                 f.close()
-            except:
-                msg = ''.join(traceback.format_exception(*sys.exc_info()))
-                if test_support.verbose:
-                    sys.stdout.write('\n' + msg)
-                raise test_support.TestFailed(msg)
-            else:
-                if not (d1 == d2):
-                    raise test_support.TestFailed(
-                        "Couldn't fetch data from HTTPS server")
+                self.assertEqual(d1, d2)
             finally:
                 server.stop()
                 server.join()
@@ -1057,30 +1011,24 @@
             flag.wait()
             # try to connect
             try:
-                try:
-                    s = ssl.wrap_socket(socket.socket())
-                    s.connect(('127.0.0.1', server.port))
-                except ssl.SSLError, x:
-                    raise test_support.TestFailed("Unexpected SSL error:  " + str(x))
-                except Exception, x:
-                    raise test_support.TestFailed("Unexpected exception:  " + str(x))
-                else:
-                    if test_support.verbose:
-                        sys.stdout.write(
-                            " client:  sending %s...\n" % (repr(indata)))
-                    s.write(indata)
-                    outdata = s.read()
-                    if test_support.verbose:
-                        sys.stdout.write(" client:  read %s\n" % repr(outdata))
-                    if outdata != indata.lower():
-                        raise test_support.TestFailed(
-                            "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
-                            % (outdata[:min(len(outdata),20)], len(outdata),
-                               indata[:min(len(indata),20)].lower(), len(indata)))
-                    s.write("over\n")
-                    if test_support.verbose:
-                        sys.stdout.write(" client:  closing connection.\n")
-                    s.close()
+                s = ssl.wrap_socket(socket.socket())
+                s.connect(('127.0.0.1', server.port))
+                if test_support.verbose:
+                    sys.stdout.write(
+                        " client:  sending %s...\n" % (repr(indata)))
+                s.write(indata)
+                outdata = s.read()
+                if test_support.verbose:
+                    sys.stdout.write(" client:  read %s\n" % repr(outdata))
+                if outdata != indata.lower():
+                    self.fail(
+                        "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
+                        % (outdata[:min(len(outdata),20)], len(outdata),
+                           indata[:min(len(indata),20)].lower(), len(indata)))
+                s.write("over\n")
+                if test_support.verbose:
+                    sys.stdout.write(" client:  closing connection.\n")
+                s.close()
             finally:
                 server.stop()
                 # wait for server thread to end
@@ -1103,19 +1051,14 @@
             # wait for it to start
             flag.wait()
             # try to connect
+            s = ssl.wrap_socket(socket.socket(),
+                                server_side=False,
+                                certfile=CERTFILE,
+                                ca_certs=CERTFILE,
+                                cert_reqs=ssl.CERT_NONE,
+                                ssl_version=ssl.PROTOCOL_TLSv1)
+            s.connect((HOST, server.port))
             try:
-                s = ssl.wrap_socket(socket.socket(),
-                                    server_side=False,
-                                    certfile=CERTFILE,
-                                    ca_certs=CERTFILE,
-                                    cert_reqs=ssl.CERT_NONE,
-                                    ssl_version=ssl.PROTOCOL_TLSv1)
-                s.connect((HOST, server.port))
-            except ssl.SSLError as x:
-                raise support.TestFailed("Unexpected SSL error:  " + str(x))
-            except Exception as x:
-                raise support.TestFailed("Unexpected exception:  " + str(x))
-            else:
                 # helper methods for standardising recv* method signatures
                 def _recv_into():
                     b = bytearray("\0"*100)


More information about the Python-checkins mailing list