[Python-checkins] r76726 - in python/trunk: Lib/imaplib.py Lib/test/test_imaplib.py Misc/NEWS

r.david.murray python-checkins at python.org
Wed Dec 9 16:15:32 CET 2009


Author: r.david.murray
Date: Wed Dec  9 16:15:31 2009
New Revision: 76726

Log:
Issue 5949: fixed IMAP4_SSL hang when the IMAP server response is
missing proper end-of-line termination.  Patch and tests by
Scott Dial.  The new tests include a test harness which will
make it easier to add additional tests.



Modified:
   python/trunk/Lib/imaplib.py
   python/trunk/Lib/test/test_imaplib.py
   python/trunk/Misc/NEWS

Modified: python/trunk/Lib/imaplib.py
==============================================================================
--- python/trunk/Lib/imaplib.py	(original)
+++ python/trunk/Lib/imaplib.py	Wed Dec  9 16:15:31 2009
@@ -1001,6 +1001,8 @@
             raise self.abort('socket error: EOF')
 
         # Protocol mandates all lines terminated by CRLF
+        if not line.endswith('\r\n'):
+            raise self.abort('socket error: unterminated line')
 
         line = line[:-2]
         if __debug__:
@@ -1167,7 +1169,7 @@
             while 1:
                 char = self.sslobj.read(1)
                 line.append(char)
-                if char == "\n": return ''.join(line)
+                if char in ("\n", ""): return ''.join(line)
 
 
         def send(self, data):

Modified: python/trunk/Lib/test/test_imaplib.py
==============================================================================
--- python/trunk/Lib/test/test_imaplib.py	(original)
+++ python/trunk/Lib/test/test_imaplib.py	Wed Dec  9 16:15:31 2009
@@ -1,11 +1,27 @@
+from test import test_support as support
+# If we end up with a significant number of tests that don't require
+# threading, this test module should be split.  Right now we skip
+# them all if we don't have threading.
+threading = support.import_module('threading')
+
+from contextlib import contextmanager
 import imaplib
+import os.path
+import select
+import socket
+import SocketServer
+import ssl
+import sys
 import time
 
-from test import test_support
+from test_support import reap_threads, verbose
 import unittest
 
+CERTFILE = None
+
 
 class TestImaplib(unittest.TestCase):
+
     def test_that_Time2Internaldate_returns_a_result(self):
         # We can check only that it successfully produces a result,
         # not the correctness of the result itself, since the result
@@ -17,9 +33,153 @@
             imaplib.Time2Internaldate(t)
 
 
+class SecureTCPServer(SocketServer.TCPServer):
+
+    def get_request(self):
+        newsocket, fromaddr = self.socket.accept()
+        connstream = ssl.wrap_socket(newsocket,
+                                     server_side=True,
+                                     certfile=CERTFILE)
+        return connstream, fromaddr
+
+
+class SimpleIMAPHandler(SocketServer.StreamRequestHandler):
+
+    timeout = 1
+
+    def _send(self, message):
+        if verbose: print "SENT:", message.strip()
+        self.wfile.write(message)
+
+    def handle(self):
+        # Send a welcome message.
+        self._send('* OK IMAP4rev1\r\n')
+        while 1:
+            # Gather up input until we receive a line terminator or we timeout.
+            # Accumulate read(1) because it's simpler to handle the differences
+            # between naked sockets and SSL sockets.
+            line = ''
+            while 1:
+                try:
+                    part = self.rfile.read(1)
+                    if part == '':
+                        # Naked sockets return empty strings..
+                        return
+                    line += part
+                except IOError:
+                    # ..but SSLSockets throw exceptions.
+                    return
+                if line.endswith('\r\n'):
+                    break
+
+            if verbose: print 'GOT:', line.strip()
+            splitline = line.split()
+            tag = splitline[0]
+            cmd = splitline[1]
+            args = splitline[2:]
+
+            if hasattr(self, 'cmd_%s' % (cmd,)):
+                getattr(self, 'cmd_%s' % (cmd,))(tag, args)
+            else:
+                self._send('%s BAD %s unknown\r\n' % (tag, cmd))
+
+    def cmd_CAPABILITY(self, tag, args):
+        self._send('* CAPABILITY IMAP4rev1\r\n')
+        self._send('%s OK CAPABILITY completed\r\n' % (tag,))
+
+
+class BaseThreadedNetworkedTests(unittest.TestCase):
+
+    def make_server(self, addr, hdlr):
+
+        class MyServer(self.server_class):
+            def handle_error(self, request, client_address):
+                self.close_request(request)
+                self.server_close()
+                raise
+
+        if verbose: print "creating server"
+        server = MyServer(addr, hdlr)
+        self.assertEquals(server.server_address, server.socket.getsockname())
+
+        if verbose:
+            print "server created"
+            print "ADDR =", addr
+            print "CLASS =", self.server_class
+            print "HDLR =", server.RequestHandlerClass
+
+        t = threading.Thread(
+            name='%s serving' % self.server_class,
+            target=server.serve_forever,
+            # Short poll interval to make the test finish quickly.
+            # Time between requests is short enough that we won't wake
+            # up spuriously too many times.
+            kwargs={'poll_interval':0.01})
+        t.daemon = True  # In case this function raises.
+        t.start()
+        if verbose: print "server running"
+        return server, t
+
+    def reap_server(self, server, thread):
+        if verbose: print "waiting for server"
+        server.shutdown()
+        thread.join()
+        if verbose: print "done"
+
+    @contextmanager
+    def reaped_server(self, hdlr):
+        server, thread = self.make_server((support.HOST, 0), hdlr)
+        try:
+            yield server
+        finally:
+            self.reap_server(server, thread)
+
+    @reap_threads
+    def test_connect(self):
+        with self.reaped_server(SimpleIMAPHandler) as server:
+            client = self.imap_class(*server.server_address)
+            client.shutdown()
+
+    @reap_threads
+    def test_issue5949(self):
+
+        class EOFHandler(SocketServer.StreamRequestHandler):
+            def handle(self):
+                # EOF without sending a complete welcome message.
+                self.wfile.write('* OK')
+
+        with self.reaped_server(EOFHandler) as server:
+            self.assertRaises(imaplib.IMAP4.abort,
+                              self.imap_class, *server.server_address)
+
+
+class ThreadedNetworkedTests(BaseThreadedNetworkedTests):
+
+    server_class = SocketServer.TCPServer
+    imap_class = imaplib.IMAP4
+
+
+class ThreadedNetworkedTestsSSL(BaseThreadedNetworkedTests):
+
+    server_class = SecureTCPServer
+    imap_class = imaplib.IMAP4_SSL
+
+
 def test_main():
-    test_support.run_unittest(TestImaplib)
+
+    tests = [TestImaplib]
+
+    if support.is_resource_enabled('network'):
+        global CERTFILE
+        CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
+                                "keycert.pem")
+        if not os.path.exists(CERTFILE):
+            raise support.TestFailed("Can't read certificate files!")
+        tests.extend([ThreadedNetworkedTests, ThreadedNetworkedTestsSSL])
+
+    support.run_unittest(*tests)
 
 
 if __name__ == "__main__":
-    unittest.main()
+    support.use_resources = ['network']
+    test_main()

Modified: python/trunk/Misc/NEWS
==============================================================================
--- python/trunk/Misc/NEWS	(original)
+++ python/trunk/Misc/NEWS	Wed Dec  9 16:15:31 2009
@@ -15,9 +15,13 @@
 Library
 -------
 
+- Issue #5949: fixed IMAP4_SSL hang when the IMAP server response is
+  missing proper end-of-line termination.
+
 - Issue #7457: added a read_pkg_file method to 
   distutils.dist.DistributionMetadata.
 
+
 What's New in Python 2.7 alpha 1
 ================================
 


More information about the Python-checkins mailing list