[Python-3000-checkins] r67109 - in python/branches/py3k: Lib/poplib.py Lib/test/test_poplib.py Misc/NEWS

christian.heimes python-3000-checkins at python.org
Wed Nov 5 20:48:28 CET 2008


Author: christian.heimes
Date: Wed Nov  5 20:48:27 2008
New Revision: 67109

Log:
Fixed issue #3727: poplib module broken by str to unicode conversion
Victor strikes again! Assisted by Barry

Modified:
   python/branches/py3k/Lib/poplib.py
   python/branches/py3k/Lib/test/test_poplib.py
   python/branches/py3k/Misc/NEWS

Modified: python/branches/py3k/Lib/poplib.py
==============================================================================
--- python/branches/py3k/Lib/poplib.py	(original)
+++ python/branches/py3k/Lib/poplib.py	Wed Nov  5 20:48:27 2008
@@ -75,26 +75,30 @@
             above.
     """
 
+    encoding = 'UTF-8'
 
     def __init__(self, host, port=POP3_PORT,
                  timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
         self.host = host
         self.port = port
-        self.sock = socket.create_connection((host, port), timeout)
+        self.sock = self._create_socket(timeout)
         self.file = self.sock.makefile('rb')
         self._debugging = 0
         self.welcome = self._getresp()
 
+    def _create_socket(self, timeout):
+        return socket.create_connection((self.host, self.port), timeout)
 
     def _putline(self, line):
         if self._debugging > 1: print('*put*', repr(line))
-        self.sock.sendall('%s%s' % (line, CRLF))
+        self.sock.sendall(line + CRLF)
 
 
     # Internal: send one command to the server (through _putline())
 
     def _putcmd(self, line):
         if self._debugging: print('*cmd*', repr(line))
+        line = bytes(line, self.encoding)
         self._putline(line)
 
 
@@ -123,8 +127,7 @@
     def _getresp(self):
         resp, o = self._getline()
         if self._debugging > 1: print('*resp*', repr(resp))
-        c = resp[:1]
-        if c != b'+':
+        if not resp.startswith(b'+'):
             raise error_proto(resp)
         return resp
 
@@ -136,7 +139,7 @@
         list = []; octets = 0
         line, o = self._getline()
         while line != b'.':
-            if line[:2] == b'..':
+            if line.startswith(b'..'):
                 o = o-1
                 line = line[1:]
             octets = octets + o
@@ -266,25 +269,26 @@
         return self._shortcmd('RPOP %s' % user)
 
 
-    timestamp = re.compile(r'\+OK.*(<[^>]+>)')
+    timestamp = re.compile(br'\+OK.*(<[^>]+>)')
 
-    def apop(self, user, secret):
+    def apop(self, user, password):
         """Authorisation
 
         - only possible if server has supplied a timestamp in initial greeting.
 
         Args:
-                user    - mailbox user;
-                secret  - secret shared between client and server.
+                user     - mailbox user;
+                password - mailbox password.
 
         NB: mailbox is locked by server from here to 'quit()'
         """
+        secret = bytes(secret, self.encoding)
         m = self.timestamp.match(self.welcome)
         if not m:
             raise error_proto('-ERR APOP not supported by server')
         import hashlib
-        digest = hashlib.md5(m.group(1)+secret).digest()
-        digest = ''.join(map(lambda x:'%02x'%ord(x), digest))
+        digest = m.group(1)+secret
+        digest = hashlib.md5(digest).hexdigest()
         return self._shortcmd('APOP %s %s' % (user, digest))
 
 
@@ -324,79 +328,19 @@
                keyfile - PEM formatted file that countains your private key
                certfile - PEM formatted certificate chain file
 
-            See the methods of the parent class POP3 for more documentation.
+        See the methods of the parent class POP3 for more documentation.
         """
 
-        def __init__(self, host, port = POP3_SSL_PORT, keyfile = None, certfile = None):
-            self.host = host
-            self.port = port
+        def __init__(self, host, port=POP3_SSL_PORT,
+        keyfile=None, certfile=None,
+        timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
             self.keyfile = keyfile
             self.certfile = certfile
-            self.buffer = ""
-            msg = "getaddrinfo returns an empty list"
-            self.sock = None
-            for res in socket.getaddrinfo(self.host, self.port, 0, socket.SOCK_STREAM):
-                af, socktype, proto, canonname, sa = res
-                try:
-                    self.sock = socket.socket(af, socktype, proto)
-                    self.sock.connect(sa)
-                except socket.error as msg:
-                    if self.sock:
-                        self.sock.close()
-                    self.sock = None
-                    continue
-                break
-            if not self.sock:
-                raise socket.error(msg)
-            self.file = self.sock.makefile('rb')
-            self.sslobj = ssl.wrap_socket(self.sock, self.keyfile, self.certfile)
-            self._debugging = 0
-            self.welcome = self._getresp()
-
-        def _fillBuffer(self):
-            localbuf = self.sslobj.read()
-            if len(localbuf) == 0:
-                raise error_proto('-ERR EOF')
-            self.buffer += localbuf
-
-        def _getline(self):
-            line = ""
-            renewline = re.compile(r'.*?\n')
-            match = renewline.match(self.buffer)
-            while not match:
-                self._fillBuffer()
-                match = renewline.match(self.buffer)
-            line = match.group(0)
-            self.buffer = renewline.sub('' ,self.buffer, 1)
-            if self._debugging > 1: print('*get*', repr(line))
-
-            octets = len(line)
-            if line[-2:] == CRLF:
-                return line[:-2], octets
-            if line[0] == CR:
-                return line[1:-1], octets
-            return line[:-1], octets
-
-        def _putline(self, line):
-            if self._debugging > 1: print('*put*', repr(line))
-            line += CRLF
-            bytes = len(line)
-            while bytes > 0:
-                sent = self.sslobj.write(line)
-                if sent == bytes:
-                    break    # avoid copy
-                line = line[sent:]
-                bytes = bytes - sent
-
-        def quit(self):
-            """Signoff: commit changes on server, unlock mailbox, close connection."""
-            try:
-                resp = self._shortcmd('QUIT')
-            except error_proto as val:
-                resp = val
-            self.sock.close()
-            del self.sslobj, self.sock
-            return resp
+            POP3.__init__(self, host, port, timeout)
+
+        def _create_socket(self, timeout):
+            sock = POP3._create_socket(self, timeout)
+            return ssl.wrap_socket(sock, self.keyfile, self.certfile)
 
     __all__.append("POP3_SSL")
 

Modified: python/branches/py3k/Lib/test/test_poplib.py
==============================================================================
--- python/branches/py3k/Lib/test/test_poplib.py	(original)
+++ python/branches/py3k/Lib/test/test_poplib.py	Wed Nov  5 20:48:27 2008
@@ -1,43 +1,284 @@
-import socket
-import threading
+"""Test script for poplib module."""
+
+# Modified by Giampaolo Rodola' to give poplib.POP3 and poplib.POP3_SSL
+# a real test suite
+
 import poplib
+import threading
+import asyncore
+import asynchat
+import socket
+import os
 import time
 
 from unittest import TestCase
-from test import support
+from test import support as test_support
 
-HOST = support.HOST
+HOST = test_support.HOST
+PORT = 0
 
-def server(evt, serv):
-    serv.listen(5)
-    try:
-        conn, addr = serv.accept()
-    except socket.timeout:
-        pass
-    else:
-        conn.send(b"+ Hola mundo\n")
-        conn.close()
-    finally:
-        serv.close()
-        evt.set()
+# the dummy data returned by server when LIST and RETR commands are issued
+LIST_RESP = b'1 1\r\n2 2\r\n3 3\r\n4 4\r\n5 5\r\n.\r\n'
+RETR_RESP = b"""From: postmaster at python.org\
+\r\nContent-Type: text/plain\r\n\
+MIME-Version: 1.0\r\n\
+Subject: Dummy\r\n\
+\r\n\
+line1\r\n\
+line2\r\n\
+line3\r\n\
+.\r\n"""
+
+
+class DummyPOP3Handler(asynchat.async_chat):
+
+    def __init__(self, conn):
+        asynchat.async_chat.__init__(self, conn)
+        self.set_terminator(b"\r\n")
+        self.in_buffer = []
+        self.push('+OK dummy pop3 server ready.')
+
+    def collect_incoming_data(self, data):
+        self.in_buffer.append(data)
+
+    def found_terminator(self):
+        line = b''.join(self.in_buffer)
+        line = str(line, 'ISO-8859-1')
+        self.in_buffer = []
+        cmd = line.split(' ')[0].lower()
+        space = line.find(' ')
+        if space != -1:
+            arg = line[space + 1:]
+        else:
+            arg = ""
+        if hasattr(self, 'cmd_' + cmd):
+            method = getattr(self, 'cmd_' + cmd)
+            method(arg)
+        else:
+            self.push('-ERR unrecognized POP3 command "%s".' %cmd)
+
+    def handle_error(self):
+        raise
+
+    def push(self, data):
+        asynchat.async_chat.push(self, data.encode("ISO-8859-1") + b'\r\n')
+
+    def cmd_echo(self, arg):
+        # sends back the received string (used by the test suite)
+        self.push(arg)
+
+    def cmd_user(self, arg):
+        if arg != "guido":
+            self.push("-ERR no such user")
+        self.push('+OK password required')
+
+    def cmd_pass(self, arg):
+        if arg != "python":
+            self.push("-ERR wrong password")
+        self.push('+OK 10 messages')
+
+    def cmd_stat(self, arg):
+        self.push('+OK 10 100')
+
+    def cmd_list(self, arg):
+        if arg:
+            self.push('+OK %s %s' %(arg, arg))
+        else:
+            self.push('+OK')
+            asynchat.async_chat.push(self, LIST_RESP)
+
+    cmd_uidl = cmd_list
+
+    def cmd_retr(self, arg):
+        self.push('+OK %s bytes' %len(RETR_RESP))
+        asynchat.async_chat.push(self, RETR_RESP)
+
+    cmd_top = cmd_retr
+
+    def cmd_dele(self, arg):
+        self.push('+OK message marked for deletion.')
+
+    def cmd_noop(self, arg):
+        self.push('+OK done nothing.')
+
+    def cmd_rpop(self, arg):
+        self.push('+OK done nothing.')
+
+
+class DummyPOP3Server(asyncore.dispatcher, threading.Thread):
+
+    handler = DummyPOP3Handler
+
+    def __init__(self, address, af=socket.AF_INET):
+        threading.Thread.__init__(self)
+        asyncore.dispatcher.__init__(self)
+        self.create_socket(af, socket.SOCK_STREAM)
+        self.bind(address)
+        self.listen(5)
+        self.active = False
+        self.active_lock = threading.Lock()
+        self.host, self.port = self.socket.getsockname()[:2]
+
+    def start(self):
+        assert not self.active
+        self.__flag = threading.Event()
+        threading.Thread.start(self)
+        self.__flag.wait()
+
+    def run(self):
+        self.active = True
+        self.__flag.set()
+        while self.active and asyncore.socket_map:
+            self.active_lock.acquire()
+            asyncore.loop(timeout=0.1, count=1)
+            self.active_lock.release()
+        asyncore.close_all(ignore_all=True)
+
+    def stop(self):
+        assert self.active
+        self.active = False
+        self.join()
+
+    def handle_accept(self):
+        conn, addr = self.accept()
+        self.handler = self.handler(conn)
+        self.close()
+
+    def handle_connect(self):
+        self.close()
+    handle_read = handle_connect
+
+    def writable(self):
+        return 0
+
+    def handle_error(self):
+        raise
+
+
+class TestPOP3Class(TestCase):
+    def assertOK(self, resp):
+        self.assertTrue(resp.startswith(b"+OK"))
+
+    def setUp(self):
+        self.server = DummyPOP3Server((HOST, PORT))
+        self.server.start()
+        self.client = poplib.POP3(self.server.host, self.server.port)
+
+    def tearDown(self):
+        self.client.quit()
+        self.server.stop()
 
-class GeneralTests(TestCase):
+    def test_getwelcome(self):
+        self.assertEqual(self.client.getwelcome(), b'+OK dummy pop3 server ready.')
+
+    def test_exceptions(self):
+        self.assertRaises(poplib.error_proto, self.client._shortcmd, 'echo -err')
+
+    def test_user(self):
+        self.assertOK(self.client.user('guido'))
+        self.assertRaises(poplib.error_proto, self.client.user, 'invalid')
+
+    def test_pass_(self):
+        self.assertOK(self.client.pass_('python'))
+        self.assertRaises(poplib.error_proto, self.client.user, 'invalid')
+
+    def test_stat(self):
+        self.assertEqual(self.client.stat(), (10, 100))
+
+    def test_list(self):
+        self.assertEqual(self.client.list()[1:],
+                         ([b'1 1', b'2 2', b'3 3', b'4 4', b'5 5'],
+                          25))
+        self.assertTrue(self.client.list('1').endswith(b"OK 1 1"))
+
+    def test_retr(self):
+        expected = (b'+OK 116 bytes',
+                    [b'From: postmaster at python.org', b'Content-Type: text/plain',
+                     b'MIME-Version: 1.0', b'Subject: Dummy',
+                     b'', b'line1', b'line2', b'line3'],
+                    113)
+        foo = self.client.retr('foo')
+        self.assertEqual(foo, expected)
+
+    def test_dele(self):
+        self.assertOK(self.client.dele('foo'))
+
+    def test_noop(self):
+        self.assertOK(self.client.noop())
+
+    def test_rpop(self):
+        self.assertOK(self.client.rpop('foo'))
+
+    def test_top(self):
+        expected =  (b'+OK 116 bytes',
+                     [b'From: postmaster at python.org', b'Content-Type: text/plain',
+                      b'MIME-Version: 1.0', b'Subject: Dummy', b'',
+                      b'line1', b'line2', b'line3'],
+                     113)
+        self.assertEqual(self.client.top(1, 1), expected)
+
+    def test_uidl(self):
+        self.client.uidl()
+        self.client.uidl('foo')
+
+
+SUPPORTS_SSL = False
+if hasattr(poplib, 'POP3_SSL'):
+    import ssl
+
+    SUPPORTS_SSL = True
+    CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "keycert.pem")
+
+    class DummyPOP3_SSLHandler(DummyPOP3Handler):
+
+        def __init__(self, conn):
+            asynchat.async_chat.__init__(self, conn)
+            ssl_socket = ssl.wrap_socket(self.socket, certfile=CERTFILE,
+                                          server_side=True)
+            self.del_channel()
+            self.set_socket(ssl_socket)
+            self.set_terminator(b"\r\n")
+            self.in_buffer = []
+            self.push('+OK dummy pop3 server ready.')
+
+    class TestPOP3_SSLClass(TestPOP3Class):
+        # repeat previous tests by using poplib.POP3_SSL
+
+        def setUp(self):
+            self.server = DummyPOP3Server((HOST, PORT))
+            self.server.handler = DummyPOP3_SSLHandler
+            self.server.start()
+            self.client = poplib.POP3_SSL(self.server.host, self.server.port)
+
+        def test__all__(self):
+            self.assert_('POP3_SSL' in poplib.__all__)
+
+
+class TestTimeouts(TestCase):
 
     def setUp(self):
         self.evt = threading.Event()
         self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
         self.sock.settimeout(3)
-        self.port = support.bind_port(self.sock)
-        threading.Thread(target=server, args=(self.evt,self.sock)).start()
+        self.port = test_support.bind_port(self.sock)
+        threading.Thread(target=self.server, args=(self.evt,self.sock)).start()
         time.sleep(.1)
 
     def tearDown(self):
         self.evt.wait()
 
-    def testBasic(self):
-        # connects
-        pop = poplib.POP3(HOST, self.port)
-        pop.sock.close()
+    def server(self, evt, serv):
+        serv.listen(5)
+        try:
+            conn, addr = serv.accept()
+        except socket.timeout:
+            pass
+        else:
+            conn.send(b"+ Hola mundo\n")
+            conn.close()
+        finally:
+            serv.close()
+            evt.set()
 
     def testTimeoutDefault(self):
         self.assertTrue(socket.getdefaulttimeout() is None)
@@ -65,8 +306,16 @@
         pop.sock.close()
 
 
-def test_main(verbose=None):
-    support.run_unittest(GeneralTests)
+def test_main():
+    tests = [TestPOP3Class, TestTimeouts]
+    if SUPPORTS_SSL:
+        tests.append(TestPOP3_SSLClass)
+    thread_info = test_support.threading_setup()
+    try:
+        test_support.run_unittest(*tests)
+    finally:
+        test_support.threading_cleanup(*thread_info)
+
 
 if __name__ == '__main__':
     test_main()

Modified: python/branches/py3k/Misc/NEWS
==============================================================================
--- python/branches/py3k/Misc/NEWS	(original)
+++ python/branches/py3k/Misc/NEWS	Wed Nov  5 20:48:27 2008
@@ -15,6 +15,8 @@
 Core and Builtins
 -----------------
 
+- Issue #3727: Fixed poplib
+
 - Issue #3714: Fixed nntplib by using bytes where appropriate.
 
 - Issue #1210: Fixed imaplib and its documentation.


More information about the Python-3000-checkins mailing list