[Python-3000-checkins] r66311 - in python/branches/py3k/Lib: ssl.py test/test_ssl.py

bill.janssen python-3000-checkins at python.org
Mon Sep 8 18:45:20 CEST 2008


Author: bill.janssen
Date: Mon Sep  8 18:45:19 2008
New Revision: 66311

Log:
fixes from issue 3162 for SSL module

Modified:
   python/branches/py3k/Lib/ssl.py
   python/branches/py3k/Lib/test/test_ssl.py

Modified: python/branches/py3k/Lib/ssl.py
==============================================================================
--- python/branches/py3k/Lib/ssl.py	(original)
+++ python/branches/py3k/Lib/ssl.py	Mon Sep  8 18:45:19 2008
@@ -284,6 +284,14 @@
         else:
             return socket.recvfrom(self, addr, buflen, flags)
 
+    def recvfrom_into(self, buffer, nbytes=None, flags=0):
+        self._checkClosed()
+        if self._sslobj:
+            raise ValueError("recvfrom_into not allowed on instances of %s" %
+                             self.__class__)
+        else:
+            return socket.recvfrom_into(self, buffer, nbytes, flags)
+
     def pending(self):
         self._checkClosed()
         if self._sslobj:

Modified: python/branches/py3k/Lib/test/test_ssl.py
==============================================================================
--- python/branches/py3k/Lib/test/test_ssl.py	(original)
+++ python/branches/py3k/Lib/test/test_ssl.py	Mon Sep  8 18:45:19 2008
@@ -781,6 +781,9 @@
         def testMalformedCert(self):
             badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
                                      "badcert.pem"))
+        def testWrongCert(self):
+            badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
+                                     "wrongcert.pem"))
         def testMalformedKey(self):
             badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
                                      "badkey.pem"))
@@ -1033,6 +1036,129 @@
                 server.stop()
                 server.join()
 
+        def testAllRecvAndSendMethods(self):
+
+            if support.verbose:
+                sys.stdout.write("\n")
+
+            server = ThreadedEchoServer(CERTFILE,
+                                        certreqs=ssl.CERT_NONE,
+                                        ssl_version=ssl.PROTOCOL_TLSv1,
+                                        cacerts=CERTFILE,
+                                        chatty=True,
+                                        connectionchatty=False)
+            flag = threading.Event()
+            server.start(flag)
+            # wait for it to start
+            flag.wait()
+            # try to connect
+            try:
+                s = ssl.wrap_socket(socket.socket(),
+                                    server_side=False,
+                                    certfile=CERTFILE,
+                                    ca_certs=CERTFILE,
+                                    cert_reqs=ssl.CERT_NONE,
+                                    ssl_version=ssl.PROTOCOL_TLSv1)
+                s.connect((HOST, server.port))
+            except ssl.SSLError as x:
+                raise support.TestFailed("Unexpected SSL error:  " + str(x))
+            except Exception as x:
+                raise support.TestFailed("Unexpected exception:  " + str(x))
+            else:
+                # helper methods for standardising recv* method signatures
+                def _recv_into():
+                    b = bytearray(b"\0"*100)
+                    count = s.recv_into(b)
+                    return b[:count]
+
+                def _recvfrom_into():
+                    b = bytearray(b"\0"*100)
+                    count, addr = s.recvfrom_into(b)
+                    return b[:count]
+
+                # (name, method, whether to expect success, *args)
+                send_methods = [
+                    ('send', s.send, True, []),
+                    ('sendto', s.sendto, False, ["some.address"]),
+                    ('sendall', s.sendall, True, []),
+                ]
+                recv_methods = [
+                    ('recv', s.recv, True, []),
+                    ('recvfrom', s.recvfrom, False, ["some.address"]),
+                    ('recv_into', _recv_into, True, []),
+                    ('recvfrom_into', _recvfrom_into, False, []),
+                ]
+                data_prefix = "PREFIX_"
+
+                for meth_name, send_meth, expect_success, args in send_methods:
+                    indata = data_prefix + meth_name
+                    try:
+                        send_meth(indata.encode('ASCII', 'strict'), *args)
+                        outdata = s.read()
+                        outdata = str(outdata, 'ASCII', 'strict')
+                        if outdata != indata.lower():
+                            raise support.TestFailed(
+                                "While sending with <<{name:s}>> bad data "
+                                "<<{outdata:s}>> ({nout:d}) received; "
+                                "expected <<{indata:s}>> ({nin:d})\n".format(
+                                    name=meth_name, outdata=repr(outdata[:20]),
+                                    nout=len(outdata),
+                                    indata=repr(indata[:20]), nin=len(indata)
+                                )
+                            )
+                    except ValueError as e:
+                        if expect_success:
+                            raise support.TestFailed(
+                                "Failed to send with method <<{name:s}>>; "
+                                "expected to succeed.\n".format(name=meth_name)
+                            )
+                        if not str(e).startswith(meth_name):
+                            raise support.TestFailed(
+                                "Method <<{name:s}>> failed with unexpected "
+                                "exception message: {exp:s}\n".format(
+                                    name=meth_name, exp=e
+                                )
+                            )
+
+                for meth_name, recv_meth, expect_success, args in recv_methods:
+                    indata = data_prefix + meth_name
+                    try:
+                        s.send(indata.encode('ASCII', 'strict'))
+                        outdata = recv_meth(*args)
+                        outdata = str(outdata, 'ASCII', 'strict')
+                        if outdata != indata.lower():
+                            raise support.TestFailed(
+                                "While receiving with <<{name:s}>> bad data "
+                                "<<{outdata:s}>> ({nout:d}) received; "
+                                "expected <<{indata:s}>> ({nin:d})\n".format(
+                                    name=meth_name, outdata=repr(outdata[:20]),
+                                    nout=len(outdata),
+                                    indata=repr(indata[:20]), nin=len(indata)
+                                )
+                            )
+                    except ValueError as e:
+                        if expect_success:
+                            raise support.TestFailed(
+                                "Failed to receive with method <<{name:s}>>; "
+                                "expected to succeed.\n".format(name=meth_name)
+                            )
+                        if not str(e).startswith(meth_name):
+                            raise support.TestFailed(
+                                "Method <<{name:s}>> failed with unexpected "
+                                "exception message: {exp:s}\n".format(
+                                    name=meth_name, exp=e
+                                )
+                            )
+                        # consume data
+                        s.read()
+
+                s.write("over\n".encode("ASCII", "strict"))
+                s.close()
+            finally:
+                server.stop()
+                server.join()
+
+
 def test_main(verbose=False):
     if skip_expected:
         raise support.TestSkipped("No SSL support")


More information about the Python-3000-checkins mailing list