[Python-checkins] cpython (merge 3.3 -> default): Merge: #13700: Make imap.authenticate with authobject work.

r.david.murray python-checkins at python.org
Tue Feb 19 18:21:12 CET 2013


http://hg.python.org/cpython/rev/d404d33a999c
changeset:   82267:d404d33a999c
parent:      82264:83d70dd58fef
parent:      82266:b21f955b8ba2
user:        R David Murray <rdmurray at bitdance.com>
date:        Tue Feb 19 12:20:32 2013 -0500
summary:
  Merge: #13700: Make imap.authenticate with authobject work.

This fixes a bytes/string confusion in the API which prevented
custom authobjects from working at all.

Original patch by Erno Tukia.

files:
  Doc/library/imaplib.rst  |    7 +-
  Lib/imaplib.py           |   20 ++-
  Lib/test/test_imaplib.py |  127 +++++++++++++++++++++++++-
  Misc/NEWS                |    3 +
  4 files changed, 137 insertions(+), 20 deletions(-)


diff --git a/Doc/library/imaplib.rst b/Doc/library/imaplib.rst
--- a/Doc/library/imaplib.rst
+++ b/Doc/library/imaplib.rst
@@ -185,9 +185,10 @@
 
       data = authobject(response)
 
-   It will be called to process server continuation responses. It should return
-   ``data`` that will be encoded and sent to server. It should return ``None`` if
-   the client abort response ``*`` should be sent instead.
+   It will be called to process server continuation responses; the *response*
+   argument it is passed will be ``bytes``.  It should return ``bytes`` *data*
+   that will be base64 encoded and sent to the server.  It should return
+   ``None`` if the client abort response ``*`` should be sent instead.
 
 
 .. method:: IMAP4.check()
diff --git a/Lib/imaplib.py b/Lib/imaplib.py
--- a/Lib/imaplib.py
+++ b/Lib/imaplib.py
@@ -352,10 +352,10 @@
 
                 data = authobject(response)
 
-        It will be called to process server continuation responses.
-        It should return data that will be encoded and sent to server.
-        It should return None if the client abort response '*' should
-        be sent instead.
+        It will be called to process server continuation responses; the
+        response argument it is passed will be a bytes.  It should return bytes
+        data that will be base64 encoded and sent to the server.  It should
+        return None if the client abort response '*' should be sent instead.
         """
         mech = mechanism.upper()
         # XXX: shouldn't this code be removed, not commented out?
@@ -538,7 +538,9 @@
     def _CRAM_MD5_AUTH(self, challenge):
         """ Authobject to use with CRAM-MD5 authentication. """
         import hmac
-        return self.user + " " + hmac.HMAC(self.password, challenge).hexdigest()
+        pwd = (self.password.encode('ASCII') if isinstance(self.password, str)
+                                             else self.password)
+        return self.user + " " + hmac.HMAC(pwd, challenge).hexdigest()
 
 
     def logout(self):
@@ -1295,14 +1297,16 @@
         #  so when it gets to the end of the 8-bit input
         #  there's no partial 6-bit output.
         #
-        oup = ''
+        oup = b''
+        if isinstance(inp, str):
+            inp = inp.encode('ASCII')
         while inp:
             if len(inp) > 48:
                 t = inp[:48]
                 inp = inp[48:]
             else:
                 t = inp
-                inp = ''
+                inp = b''
             e = binascii.b2a_base64(t)
             if e:
                 oup = oup + e[:-1]
@@ -1310,7 +1314,7 @@
 
     def decode(self, inp):
         if not inp:
-            return ''
+            return b''
         return binascii.a2b_base64(inp)
 
 Months = ' Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split(' ')
diff --git a/Lib/test/test_imaplib.py b/Lib/test/test_imaplib.py
--- a/Lib/test/test_imaplib.py
+++ b/Lib/test/test_imaplib.py
@@ -94,14 +94,25 @@
 class SimpleIMAPHandler(socketserver.StreamRequestHandler):
 
     timeout = 1
+    continuation = None
+    capabilities = ''
 
     def _send(self, message):
         if verbose: print("SENT: %r" % message.strip())
         self.wfile.write(message)
 
+    def _send_line(self, message):
+        self._send(message + b'\r\n')
+
+    def _send_textline(self, message):
+        self._send_line(message.encode('ASCII'))
+
+    def _send_tagged(self, tag, code, message):
+        self._send_textline(' '.join((tag, code, message)))
+
     def handle(self):
         # Send a welcome message.
-        self._send(b'* OK IMAP4rev1\r\n')
+        self._send_textline('* OK IMAP4rev1')
         while 1:
             # Gather up input until we receive a line terminator or we timeout.
             # Accumulate read(1) because it's simpler to handle the differences
@@ -121,19 +132,33 @@
                     break
 
             if verbose: print('GOT: %r' % line.strip())
-            splitline = line.split()
-            tag = splitline[0].decode('ASCII')
-            cmd = splitline[1].decode('ASCII')
+            if self.continuation:
+                try:
+                    self.continuation.send(line)
+                except StopIteration:
+                    self.continuation = None
+                continue
+            splitline = line.decode('ASCII').split()
+            tag = splitline[0]
+            cmd = splitline[1]
             args = splitline[2:]
 
             if hasattr(self, 'cmd_'+cmd):
-                getattr(self, 'cmd_'+cmd)(tag, args)
+                continuation = getattr(self, 'cmd_'+cmd)(tag, args)
+                if continuation:
+                    self.continuation = continuation
+                    next(continuation)
             else:
-                self._send('{} BAD {} unknown\r\n'.format(tag, cmd).encode('ASCII'))
+                self._send_tagged(tag, 'BAD', cmd + ' unknown')
 
     def cmd_CAPABILITY(self, tag, args):
-        self._send(b'* CAPABILITY IMAP4rev1\r\n')
-        self._send('{} OK CAPABILITY completed\r\n'.format(tag).encode('ASCII'))
+        caps = 'IMAP4rev1 ' + self.capabilities if self.capabilities else 'IMAP4rev1'
+        self._send_textline('* CAPABILITY ' + caps)
+        self._send_tagged(tag, 'OK', 'CAPABILITY completed')
+
+    def cmd_LOGOUT(self, tag, args):
+        self._send_textline('* BYE IMAP4ref1 Server logging out')
+        self._send_tagged(tag, 'OK', 'LOGOUT completed')
 
 
 class BaseThreadedNetworkedTests(unittest.TestCase):
@@ -183,6 +208,16 @@
         finally:
             self.reap_server(server, thread)
 
+    @contextmanager
+    def reaped_pair(self, hdlr):
+        server, thread = self.make_server((support.HOST, 0), hdlr)
+        client = self.imap_class(*server.server_address)
+        try:
+            yield server, client
+        finally:
+            client.logout()
+            self.reap_server(server, thread)
+
     @reap_threads
     def test_connect(self):
         with self.reaped_server(SimpleIMAPHandler) as server:
@@ -208,12 +243,86 @@
 
             def cmd_CAPABILITY(self, tag, args):
                 self._send(b'* CAPABILITY IMAP4rev1 AUTH\n')
-                self._send('{} OK CAPABILITY completed\r\n'.format(tag).encode('ASCII'))
+                self._send_tagged(tag, 'OK', 'CAPABILITY completed')
 
         with self.reaped_server(BadNewlineHandler) as server:
             self.assertRaises(imaplib.IMAP4.abort,
                               self.imap_class, *server.server_address)
 
+    @reap_threads
+    def test_bad_auth_name(self):
+
+        class MyServer(SimpleIMAPHandler):
+
+            def cmd_AUTHENTICATE(self, tag, args):
+                self._send_tagged(tag, 'NO', 'unrecognized authentication '
+                        'type {}'.format(args[0]))
+
+        with self.reaped_pair(MyServer) as (server, client):
+            with self.assertRaises(imaplib.IMAP4.error):
+                client.authenticate('METHOD', lambda: 1)
+
+    @reap_threads
+    def test_invalid_authentication(self):
+
+        class MyServer(SimpleIMAPHandler):
+
+            def cmd_AUTHENTICATE(self, tag, args):
+                self._send_textline('+')
+                self.response = yield
+                self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] invalid')
+
+        with self.reaped_pair(MyServer) as (server, client):
+            with self.assertRaises(imaplib.IMAP4.error):
+                code, data = client.authenticate('MYAUTH', lambda x: b'fake')
+
+    @reap_threads
+    def test_valid_authentication(self):
+
+        class MyServer(SimpleIMAPHandler):
+
+            def cmd_AUTHENTICATE(self, tag, args):
+                self._send_textline('+')
+                self.server.response = yield
+                self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
+
+        with self.reaped_pair(MyServer) as (server, client):
+            code, data = client.authenticate('MYAUTH', lambda x: b'fake')
+            self.assertEqual(code, 'OK')
+            self.assertEqual(server.response,
+                             b'ZmFrZQ==\r\n') #b64 encoded 'fake'
+
+        with self.reaped_pair(MyServer) as (server, client):
+            code, data = client.authenticate('MYAUTH', lambda x: 'fake')
+            self.assertEqual(code, 'OK')
+            self.assertEqual(server.response,
+                             b'ZmFrZQ==\r\n') #b64 encoded 'fake'
+
+    @reap_threads
+    def test_login_cram_md5(self):
+
+        class AuthHandler(SimpleIMAPHandler):
+
+            capabilities = 'LOGINDISABLED AUTH=CRAM-MD5'
+
+            def cmd_AUTHENTICATE(self, tag, args):
+                self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm'
+                                       'VzdG9uLm1jaS5uZXQ=')
+                r = yield
+                if r ==  b'dGltIGYxY2E2YmU0NjRiOWVmYTFjY2E2ZmZkNmNmMmQ5ZjMy\r\n':
+                    self._send_tagged(tag, 'OK', 'CRAM-MD5 successful')
+                else:
+                    self._send_tagged(tag, 'NO', 'No access')
+
+        with self.reaped_pair(AuthHandler) as (server, client):
+            self.assertTrue('AUTH=CRAM-MD5' in client.capabilities)
+            ret, data = client.login_cram_md5("tim", "tanstaaftanstaaf")
+            self.assertEqual(ret, "OK")
+
+        with self.reaped_pair(AuthHandler) as (server, client):
+            self.assertTrue('AUTH=CRAM-MD5' in client.capabilities)
+            ret, data = client.login_cram_md5("tim", b"tanstaaftanstaaf")
+            self.assertEqual(ret, "OK")
 
 
 class ThreadedNetworkedTests(BaseThreadedNetworkedTests):
diff --git a/Misc/NEWS b/Misc/NEWS
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -260,6 +260,9 @@
 Library
 -------
 
+- Issue #13700: Fix byte/string handling in imaplib authentication when an
+  authobject is specified.
+
 - Issue #13153: Tkinter functions now raise TclError instead of ValueError when
   a string argument contains non-BMP character.
 

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list