[Python-checkins] r80599 - in python/branches/release31-maint: Lib/test/test_ssl.py

antoine.pitrou python-checkins at python.org
Wed Apr 28 23:39:56 CEST 2010


Author: antoine.pitrou
Date: Wed Apr 28 23:39:56 2010
New Revision: 80599

Log:
Merged revisions 80598 via svnmerge from 
svn+ssh://pythondev@svn.python.org/python/branches/py3k

................
  r80598 | antoine.pitrou | 2010-04-28 23:37:09 +0200 (mer., 28 avril 2010) | 9 lines
  
  Merged revisions 80596 via svnmerge from 
  svn+ssh://pythondev@svn.python.org/python/trunk
  
  ........
    r80596 | antoine.pitrou | 2010-04-28 23:11:01 +0200 (mer., 28 avril 2010) | 3 lines
    
    Fix style issues in test_ssl
  ........
................


Modified:
   python/branches/release31-maint/   (props changed)
   python/branches/release31-maint/Lib/test/test_ssl.py

Modified: python/branches/release31-maint/Lib/test/test_ssl.py
==============================================================================
--- python/branches/release31-maint/Lib/test/test_ssl.py	(original)
+++ python/branches/release31-maint/Lib/test/test_ssl.py	Wed Apr 28 23:39:56 2010
@@ -39,7 +39,7 @@
 
 class BasicTests(unittest.TestCase):
 
-    def testSSLconnect(self):
+    def test_connect(self):
         if not support.is_resource_enabled('network'):
             return
         s = ssl.wrap_socket(socket.socket(socket.AF_INET),
@@ -60,7 +60,7 @@
         finally:
             s.close()
 
-    def testCrucialConstants(self):
+    def test_constants(self):
         ssl.PROTOCOL_SSLv2
         ssl.PROTOCOL_SSLv23
         ssl.PROTOCOL_SSLv3
@@ -69,7 +69,7 @@
         ssl.CERT_OPTIONAL
         ssl.CERT_REQUIRED
 
-    def testRAND(self):
+    def test_random(self):
         v = ssl.RAND_status()
         if support.verbose:
             sys.stdout.write("\n RAND_status is %d (%s)\n"
@@ -83,7 +83,7 @@
             print("didn't raise TypeError")
         ssl.RAND_add("this is a random string", 75.0)
 
-    def testParseCert(self):
+    def test_parse_cert(self):
         # note that this uses an 'unofficial' function in _ssl.c,
         # provided solely for this test, to exercise the certificate
         # parsing code
@@ -91,9 +91,9 @@
         if support.verbose:
             sys.stdout.write("\n" + pprint.pformat(p) + "\n")
 
-    def testDERtoPEM(self):
-
-        pem = open(SVN_PYTHON_ORG_ROOT_CERT, 'r').read()
+    def test_DER_to_PEM(self):
+        with open(SVN_PYTHON_ORG_ROOT_CERT, 'r') as f:
+            pem = f.read()
         d1 = ssl.PEM_cert_to_DER_cert(pem)
         p2 = ssl.DER_cert_to_PEM_cert(d1)
         d2 = ssl.PEM_cert_to_DER_cert(p2)
@@ -125,7 +125,7 @@
 
 class NetworkedTests(unittest.TestCase):
 
-    def testConnect(self):
+    def test_connect(self):
         s = ssl.wrap_socket(socket.socket(socket.AF_INET),
                             cert_reqs=ssl.CERT_NONE)
         s.connect(("svn.python.org", 443))
@@ -175,7 +175,7 @@
         else:
             self.fail("OSError wasn't raised")
 
-    def testNonBlockingHandshake(self):
+    def test_non_blocking_handshake(self):
         s = socket.socket(socket.AF_INET)
         s.connect(("svn.python.org", 443))
         s.setblocking(False)
@@ -199,8 +199,7 @@
         if support.verbose:
             sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
 
-    def testFetchServerCert(self):
-
+    def test_get_server_certificate(self):
         pem = ssl.get_server_certificate(("svn.python.org", 443))
         if not pem:
             self.fail("No server certificate on svn.python.org:443!")
@@ -256,7 +255,6 @@
 except ImportError:
     _have_threads = False
 else:
-
     _have_threads = True
 
     class ThreadedEchoServer(threading.Thread):
@@ -277,7 +275,7 @@
                 threading.Thread.__init__(self)
                 self.daemon = True
 
-            def wrap_conn (self):
+            def wrap_conn(self):
                 try:
                     self.sslconn = ssl.wrap_socket(self.sock, server_side=True,
                                                    certfile=self.server.certificate,
@@ -325,7 +323,7 @@
                 else:
                     self.sock.close()
 
-            def run (self):
+            def run(self):
                 self.running = True
                 if not self.server.starttls_server:
                     if not self.wrap_conn():
@@ -333,28 +331,28 @@
                 while self.running:
                     try:
                         msg = self.read()
-                        amsg = (msg and str(msg, 'ASCII', 'strict')) or ''
-                        if not msg:
+                        stripped = msg.strip()
+                        if not stripped:
                             # eof, so quit this handler
                             self.running = False
                             self.close()
-                        elif amsg.strip() == 'over':
+                        elif stripped == b'over':
                             if support.verbose and self.server.connectionchatty:
                                 sys.stdout.write(" server: client closed connection\n")
                             self.close()
                             return
                         elif (self.server.starttls_server and
-                              amsg.strip() == 'STARTTLS'):
+                              stripped == 'STARTTLS'):
                             if support.verbose and self.server.connectionchatty:
                                 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
-                            self.write("OK\n".encode("ASCII", "strict"))
+                            self.write(b"OK\n")
                             if not self.wrap_conn():
                                 return
                         elif (self.server.starttls_server and self.sslconn
-                              and amsg.strip() == 'ENDTLS'):
+                              and stripped == 'ENDTLS'):
                             if support.verbose and self.server.connectionchatty:
                                 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
-                            self.write("OK\n".encode("ASCII", "strict"))
+                            self.write(b"OK\n")
                             self.sock = self.sslconn.unwrap()
                             self.sslconn = None
                             if support.verbose and self.server.connectionchatty:
@@ -363,9 +361,9 @@
                             if (support.verbose and
                                 self.server.connectionchatty):
                                 ctype = (self.sslconn and "encrypted") or "unencrypted"
-                                sys.stdout.write(" server: read %s (%s), sending back %s (%s)...\n"
-                                                 % (repr(msg), ctype, repr(msg.lower()), ctype))
-                            self.write(amsg.lower().encode('ASCII', 'strict'))
+                                sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
+                                                 % (msg, ctype, msg.lower(), ctype))
+                            self.write(msg.lower())
                     except socket.error:
                         if self.server.chatty:
                             handle_error("Test server failure:\n")
@@ -396,11 +394,11 @@
             threading.Thread.__init__(self)
             self.daemon = True
 
-        def start (self, flag=None):
+        def start(self, flag=None):
             self.flag = flag
             threading.Thread.start(self)
 
-        def run (self):
+        def run(self):
             self.sock.settimeout(0.05)
             self.sock.listen(5)
             self.active = True
@@ -421,7 +419,7 @@
                     self.stop()
             self.sock.close()
 
-        def stop (self):
+        def stop(self):
             self.active = False
 
     class OurHTTPSServer(threading.Thread):
@@ -431,12 +429,9 @@
         class HTTPSServer(HTTPServer):
 
             def __init__(self, server_address, RequestHandlerClass, certfile):
-
                 HTTPServer.__init__(self, server_address, RequestHandlerClass)
                 # we assume the certfile contains both private key and certificate
                 self.certfile = certfile
-                self.active = False
-                self.active_lock = threading.Lock()
                 self.allow_reuse_address = True
 
             def __str__(self):
@@ -445,7 +440,7 @@
                          self.server_name,
                          self.server_port))
 
-            def get_request (self):
+            def get_request(self):
                 # override this to wrap socket with SSL
                 sock, addr = self.socket.accept()
                 sslconn = ssl.wrap_socket(sock, server_side=True,
@@ -453,7 +448,6 @@
                 return sslconn, addr
 
         class RootedHTTPRequestHandler(SimpleHTTPRequestHandler):
-
             # need to override translate_path to get a known root,
             # instead of using os.curdir, since the test could be
             # run from anywhere
@@ -484,7 +478,6 @@
                 return path
 
             def log_message(self, format, *args):
-
                 # we override this to suppress logging unless "verbose"
 
                 if support.verbose:
@@ -498,7 +491,6 @@
 
         def __init__(self, certfile):
             self.flag = None
-            self.active = False
             self.RootedHTTPRequestHandler.root = os.path.split(CERTFILE)[0]
             self.server = self.HTTPSServer(
                 (HOST, 0), self.RootedHTTPRequestHandler, certfile)
@@ -509,19 +501,16 @@
         def __str__(self):
             return "<%s %s>" % (self.__class__.__name__, self.server)
 
-        def start (self, flag=None):
+        def start(self, flag=None):
             self.flag = flag
             threading.Thread.start(self)
 
-        def run (self):
-            self.active = True
+        def run(self):
             if self.flag:
                 self.flag.set()
             self.server.serve_forever(0.05)
-            self.active = False
 
-        def stop (self):
-            self.active = False
+        def stop(self):
             self.server.shutdown()
 
 
@@ -573,7 +562,7 @@
                         if not data:
                             self.close()
                         else:
-                            self.send(str(data, 'ASCII', 'strict').lower().encode('ASCII', 'strict'))
+                            self.send(data.lower())
 
                 def handle_close(self):
                     self.close()
@@ -614,7 +603,7 @@
             self.flag = flag
             threading.Thread.start(self)
 
-        def run (self):
+        def run(self):
             self.active = True
             if self.flag:
                 self.flag.set()
@@ -624,11 +613,15 @@
                 except:
                     pass
 
-        def stop (self):
+        def stop(self):
             self.active = False
             self.server.close()
 
-    def badCertTest (certfile):
+    def bad_cert_test(certfile):
+        """
+        Launch a server with CERT_REQUIRED, and check that trying to
+        connect to it with the given client certificate fails.
+        """
         server = ThreadedEchoServer(CERTFILE,
                                     certreqs=ssl.CERT_REQUIRED,
                                     cacerts=CERTFILE, chatty=False,
@@ -648,7 +641,7 @@
                 if support.verbose:
                     sys.stdout.write("\nSSLError is %s\n" % x.args[1])
             except socket.error as x:
-                if test_support.verbose:
+                if support.verbose:
                     sys.stdout.write("\nsocket.error is %s\n" % x[1])
             else:
                 self.fail("Use of invalid cert should have failed!")
@@ -656,11 +649,13 @@
             server.stop()
             server.join()
 
-    def serverParamsTest (certfile, protocol, certreqs, cacertsfile,
-                          client_certfile, client_protocol=None,
-                          indata="FOO\n",
-                          chatty=False, connectionchatty=False):
-
+    def server_params_test(certfile, protocol, certreqs, cacertsfile,
+                           client_certfile, client_protocol=None, indata=b"FOO\n",
+                           chatty=True, connectionchatty=False):
+        """
+        Launch a server, connect a client to it and try various reads
+        and writes.
+        """
         server = ThreadedEchoServer(certfile,
                                     certreqs=certreqs,
                                     ssl_version=protocol,
@@ -681,23 +676,22 @@
                                 cert_reqs=certreqs,
                                 ssl_version=client_protocol)
             s.connect((HOST, server.port))
-            arg = indata.encode('ASCII', 'strict')
+            arg = indata
             if connectionchatty:
                 if support.verbose:
                     sys.stdout.write(
-                        " client:  sending %s...\n" % (repr(indata)))
+                        " client:  sending %r...\n" % indata)
             s.write(arg)
             outdata = s.read()
             if connectionchatty:
                 if support.verbose:
-                    sys.stdout.write(" client:  read %s\n" % repr(outdata))
-            outdata = str(outdata, 'ASCII', 'strict')
+                    sys.stdout.write(" client:  read %r\n" % outdata)
             if outdata != indata.lower():
                 self.fail(
-                    "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
-                    % (repr(outdata[:min(len(outdata),20)]), len(outdata),
-                       repr(indata[:min(len(indata),20)].lower()), len(indata)))
-            s.write("over\n".encode("ASCII", "strict"))
+                    "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
+                    % (outdata[:20], len(outdata),
+                       indata[:20].lower(), len(indata)))
+            s.write(b"over\n")
             if connectionchatty:
                 if support.verbose:
                     sys.stdout.write(" client:  closing connection.\n")
@@ -706,40 +700,38 @@
             server.stop()
             server.join()
 
-    def tryProtocolCombo (server_protocol,
-                          client_protocol,
-                          expectedToWork,
-                          certsreqs=None):
-
+    def try_protocol_combo(server_protocol,
+                           client_protocol,
+                           expect_success,
+                           certsreqs=None):
         if certsreqs is None:
             certsreqs = ssl.CERT_NONE
-
-        if certsreqs == ssl.CERT_NONE:
-            certtype = "CERT_NONE"
-        elif certsreqs == ssl.CERT_OPTIONAL:
-            certtype = "CERT_OPTIONAL"
-        elif certsreqs == ssl.CERT_REQUIRED:
-            certtype = "CERT_REQUIRED"
+        certtype = {
+            ssl.CERT_NONE: "CERT_NONE",
+            ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
+            ssl.CERT_REQUIRED: "CERT_REQUIRED",
+        }[certsreqs]
         if support.verbose:
-            formatstr = (expectedToWork and " %s->%s %s\n") or " {%s->%s} %s\n"
+            formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
             sys.stdout.write(formatstr %
                              (ssl.get_protocol_name(client_protocol),
                               ssl.get_protocol_name(server_protocol),
                               certtype))
         try:
-            serverParamsTest(CERTFILE, server_protocol, certsreqs,
-                             CERTFILE, CERTFILE, client_protocol,
-                             chatty=False, connectionchatty=False)
+            server_params_test(CERTFILE, server_protocol, certsreqs,
+                               CERTFILE, CERTFILE, client_protocol,
+                               chatty=False,
+                               connectionchatty=False)
         # Protocol mismatch can result in either an SSLError, or a
         # "Connection reset by peer" error.
         except ssl.SSLError:
-            if expectedToWork:
+            if expect_success:
                 raise
         except socket.error as e:
-            if expectedToWork or e.errno != errno.ECONNRESET:
+            if expect_success or e.errno != errno.ECONNRESET:
                 raise
         else:
-            if not expectedToWork:
+            if not expect_success:
                 self.fail(
                     "Client protocol %s succeeded with server protocol %s!"
                     % (ssl.get_protocol_name(client_protocol),
@@ -748,16 +740,15 @@
 
     class ThreadedTests(unittest.TestCase):
 
-        def testEcho (self):
-
+        def test_echo(self):
+            """Basic test of an SSL client connecting to a server"""
             if support.verbose:
                 sys.stdout.write("\n")
-            serverParamsTest(CERTFILE, ssl.PROTOCOL_TLSv1, ssl.CERT_NONE,
-                             CERTFILE, CERTFILE, ssl.PROTOCOL_TLSv1,
-                             chatty=True, connectionchatty=True)
-
-        def testReadCert(self):
+            server_params_test(CERTFILE, ssl.PROTOCOL_TLSv1, ssl.CERT_NONE,
+                               CERTFILE, CERTFILE, ssl.PROTOCOL_TLSv1,
+                               chatty=True, connectionchatty=True)
 
+        def test_getpeercert(self):
             if support.verbose:
                 sys.stdout.write("\n")
             s2 = socket.socket()
@@ -797,23 +788,30 @@
                 server.stop()
                 server.join()
 
-        def testNULLcert(self):
-            badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
-                                     "nullcert.pem"))
-        def testMalformedCert(self):
-            badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
-                                     "badcert.pem"))
-        def testWrongCert(self):
-            badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
-                                     "wrongcert.pem"))
-        def testMalformedKey(self):
-            badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
-                                     "badkey.pem"))
-
-        def testRudeShutdown(self):
-
+        def test_empty_cert(self):
+            """Connecting with an empty cert file"""
+            bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
+                                      "nullcert.pem"))
+        def test_malformed_cert(self):
+            """Connecting with a badly formatted certificate (syntax error)"""
+            bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
+                                       "badcert.pem"))
+        def test_nonexisting_cert(self):
+            """Connecting with a non-existing cert file"""
+            bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
+                                       "wrongcert.pem"))
+        def test_malformed_key(self):
+            """Connecting with a badly formatted key (syntax error)"""
+            bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
+                                       "badkey.pem"))
+
+        def test_rude_shutdown(self):
+            """A brutal shutdown of an SSL server should raise an IOError
+            in the client when attempting handshake.
+            """
             listener_ready = threading.Event()
             listener_gone = threading.Event()
+
             s = socket.socket()
             port = support.bind_port(s, HOST)
 
@@ -847,62 +845,66 @@
             finally:
                 t.join()
 
-        def testProtocolSSL2(self):
+        def test_protocol_sslv2(self):
+            """Connecting to an SSLv2 server with various client options"""
             if support.verbose:
                 sys.stdout.write("\n")
-            tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
-            tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
-            tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
-            tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, True)
-            tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
-            tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
+            try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
+            try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
+            try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
+            try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, True)
+            try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
+            try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
 
-        def testProtocolSSL23(self):
+        def test_protocol_sslv23(self):
+            """Connecting to an SSLv23 server with various client options"""
             if support.verbose:
                 sys.stdout.write("\n")
             try:
-                tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
+                try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
             except (ssl.SSLError, socket.error) as x:
                 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
                 if support.verbose:
                     sys.stdout.write(
                         " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
                         % str(x))
-            tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True)
-            tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True)
-            tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True)
-
-            tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL)
-            tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL)
-            tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL)
-
-            tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED)
-            tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED)
-            tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED)
+            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True)
+            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True)
+            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True)
+
+            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL)
+            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL)
+            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL)
+
+            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED)
+            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED)
+            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED)
 
-        def testProtocolSSL3(self):
+        def test_protocol_sslv3(self):
+            """Connecting to an SSLv3 server with various client options"""
             if support.verbose:
                 sys.stdout.write("\n")
-            tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True)
-            tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL)
-            tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED)
-            tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
-            tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False)
-            tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
+            try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True)
+            try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL)
+            try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED)
+            try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
+            try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False)
+            try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
 
-        def testProtocolTLS1(self):
+        def test_protocol_tlsv1(self):
+            """Connecting to a TLSv1 server with various client options"""
             if support.verbose:
                 sys.stdout.write("\n")
-            tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True)
-            tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL)
-            tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED)
-            tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
-            tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
-            tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False)
-
-        def testSTARTTLS (self):
-
-            msgs = ("msg 1", "MSG 2", "STARTTLS", "MSG 3", "msg 4", "ENDTLS", "msg 5", "msg 6")
+            try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True)
+            try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL)
+            try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED)
+            try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
+            try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
+            try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False)
+
+        def test_starttls(self):
+            """Switching from clear text to encrypted and back again."""
+            msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
 
             server = ThreadedEchoServer(CERTFILE,
                                         ssl_version=ssl.PROTOCOL_TLSv1,
@@ -922,45 +924,42 @@
                 if support.verbose:
                     sys.stdout.write("\n")
                 for indata in msgs:
-                    msg = indata.encode('ASCII', 'replace')
                     if support.verbose:
                         sys.stdout.write(
-                            " client:  sending %s...\n" % repr(msg))
+                            " client:  sending %r...\n" % indata)
                     if wrapped:
-                        conn.write(msg)
+                        conn.write(indata)
                         outdata = conn.read()
                     else:
-                        s.send(msg)
+                        s.send(indata)
                         outdata = s.recv(1024)
-                    if (indata == "STARTTLS" and
-                        str(outdata, 'ASCII', 'replace').strip().lower().startswith("ok")):
+                    msg = outdata.strip().lower()
+                    if indata == b"STARTTLS" and msg.startswith(b"ok"):
+                        # STARTTLS ok, switch to secure mode
                         if support.verbose:
-                            msg = str(outdata, 'ASCII', 'replace')
                             sys.stdout.write(
-                                " client:  read %s from server, starting TLS...\n"
-                                % repr(msg))
+                                " client:  read %r from server, starting TLS...\n"
+                                % msg)
                         conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
                         wrapped = True
-                    elif (indata == "ENDTLS" and
-                          str(outdata, 'ASCII', 'replace').strip().lower().startswith("ok")):
+                    elif indata == b"ENDTLS" and msg.startswith(b"ok"):
+                        # ENDTLS ok, switch back to clear text
                         if support.verbose:
-                            msg = str(outdata, 'ASCII', 'replace')
                             sys.stdout.write(
-                                " client:  read %s from server, ending TLS...\n"
-                                % repr(msg))
+                                " client:  read %r from server, ending TLS...\n"
+                                % msg)
                         s = conn.unwrap()
                         wrapped = False
                     else:
                         if support.verbose:
-                            msg = str(outdata, 'ASCII', 'replace')
                             sys.stdout.write(
-                                " client:  read %s from server\n" % repr(msg))
+                                " client:  read %r from server\n" % msg)
                 if support.verbose:
                     sys.stdout.write(" client:  closing connection.\n")
                 if wrapped:
-                    conn.write("over\n".encode("ASCII", "strict"))
+                    conn.write(b"over\n")
                 else:
-                    s.send("over\n".encode("ASCII", "strict"))
+                    s.send(b"over\n")
                 if wrapped:
                     conn.close()
                 else:
@@ -969,8 +968,8 @@
                 server.stop()
                 server.join()
 
-        def testSocketServer(self):
-
+        def test_socketserver(self):
+            """Using a SocketServer to create and manage SSL connections."""
             server = OurHTTPSServer(CERTFILE)
             flag = threading.Event()
             server.start(flag)
@@ -980,7 +979,8 @@
             try:
                 if support.verbose:
                     sys.stdout.write('\n')
-                d1 = open(CERTFILE, 'rb').read()
+                with open(CERTFILE, 'rb') as f:
+                    d1 = f.read()
                 d2 = ''
                 # now fetch the same data from the HTTPS server
                 url = 'https://%s:%d/%s' % (
@@ -1003,12 +1003,14 @@
                     sys.stdout.write('joining thread\n')
                 server.join()
 
-        def testAsyncoreServer(self):
+        def test_asyncore_server(self):
+            """Check the example asyncore integration."""
+            indata = "TEST MESSAGE of mixed case\n"
 
             if support.verbose:
                 sys.stdout.write("\n")
 
-            indata="FOO\n"
+            indata = b"FOO\n"
             server = AsyncoreEchoServer(CERTFILE)
             flag = threading.Event()
             server.start(flag)
@@ -1020,18 +1022,17 @@
                 s.connect(('127.0.0.1', server.port))
                 if support.verbose:
                     sys.stdout.write(
-                        " client:  sending %s...\n" % (repr(indata)))
-                s.write(indata.encode('ASCII', 'strict'))
+                        " client:  sending %r...\n" % indata)
+                s.write(indata)
                 outdata = s.read()
                 if support.verbose:
-                    sys.stdout.write(" client:  read %s\n" % repr(outdata))
-                outdata = str(outdata, 'ASCII', 'strict')
+                    sys.stdout.write(" client:  read %r\n" % outdata)
                 if outdata != indata.lower():
                     self.fail(
-                        "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
-                        % (outdata[:min(len(outdata),20)], len(outdata),
-                           indata[:min(len(indata),20)].lower(), len(indata)))
-                s.write("over\n".encode("ASCII", "strict"))
+                        "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
+                        % (outdata[:20], len(outdata),
+                           indata[:20].lower(), len(indata)))
+                s.write(b"over\n")
                 if support.verbose:
                     sys.stdout.write(" client:  closing connection.\n")
                 s.close()
@@ -1039,8 +1040,8 @@
                 server.stop()
                 server.join()
 
-        def testAllRecvAndSendMethods(self):
-
+        def test_recv_send(self):
+            """Test recv(), send() and friends."""
             if support.verbose:
                 sys.stdout.write("\n")
 
@@ -1089,19 +1090,18 @@
                 data_prefix = "PREFIX_"
 
                 for meth_name, send_meth, expect_success, args in send_methods:
-                    indata = data_prefix + meth_name
+                    indata = (data_prefix + meth_name).encode('ascii')
                     try:
-                        send_meth(indata.encode('ASCII', 'strict'), *args)
+                        send_meth(indata, *args)
                         outdata = s.read()
-                        outdata = str(outdata, 'ASCII', 'strict')
                         if outdata != indata.lower():
                             raise support.TestFailed(
                                 "While sending with <<{name:s}>> bad data "
-                                "<<{outdata:s}>> ({nout:d}) received; "
-                                "expected <<{indata:s}>> ({nin:d})\n".format(
-                                    name=meth_name, outdata=repr(outdata[:20]),
+                                "<<{outdata:r}>> ({nout:d}) received; "
+                                "expected <<{indata:r}>> ({nin:d})\n".format(
+                                    name=meth_name, outdata=outdata[:20],
                                     nout=len(outdata),
-                                    indata=repr(indata[:20]), nin=len(indata)
+                                    indata=indata[:20], nin=len(indata)
                                 )
                             )
                     except ValueError as e:
@@ -1119,19 +1119,18 @@
                             )
 
                 for meth_name, recv_meth, expect_success, args in recv_methods:
-                    indata = data_prefix + meth_name
+                    indata = (data_prefix + meth_name).encode('ascii')
                     try:
-                        s.send(indata.encode('ASCII', 'strict'))
+                        s.send(indata)
                         outdata = recv_meth(*args)
-                        outdata = str(outdata, 'ASCII', 'strict')
                         if outdata != indata.lower():
                             raise support.TestFailed(
                                 "While receiving with <<{name:s}>> bad data "
-                                "<<{outdata:s}>> ({nout:d}) received; "
-                                "expected <<{indata:s}>> ({nin:d})\n".format(
-                                    name=meth_name, outdata=repr(outdata[:20]),
+                                "<<{outdata:r}>> ({nout:d}) received; "
+                                "expected <<{indata:r}>> ({nin:d})\n".format(
+                                    name=meth_name, outdata=outdata[:20],
                                     nout=len(outdata),
-                                    indata=repr(indata[:20]), nin=len(indata)
+                                    indata=indata[:20], nin=len(indata)
                                 )
                             )
                     except ValueError as e:
@@ -1150,7 +1149,7 @@
                         # consume data
                         s.read()
 
-                s.write("over\n".encode("ASCII", "strict"))
+                s.write(b"over\n")
                 s.close()
             finally:
                 server.stop()
@@ -1229,10 +1228,11 @@
         if thread_info and support.is_resource_enabled('network'):
             tests.append(ThreadedTests)
 
-    support.run_unittest(*tests)
-
-    if _have_threads:
-        support.threading_cleanup(*thread_info)
+    try:
+        support.run_unittest(*tests)
+    finally:
+        if _have_threads:
+            support.threading_cleanup(*thread_info)
 
 if __name__ == "__main__":
     test_main()


More information about the Python-checkins mailing list