[Python-checkins] cpython (3.5): #25446: Fix regression in smtplib's AUTH LOGIN support.

r.david.murray python-checkins at python.org
Sun Nov 8 01:06:01 EST 2015


https://hg.python.org/cpython/rev/d13263ecf0c6
changeset:   99009:d13263ecf0c6
branch:      3.5
parent:      99007:9cd9615263a2
user:        R David Murray <rdmurray at bitdance.com>
date:        Sun Nov 08 01:03:52 2015 -0500
summary:
  #25446: Fix regression in smtplib's AUTH LOGIN support.

The auth method tests simply weren't adequate because of the fact that
smtpd doesn't support authentication.  I borrowed some of Milan's
code for that from issue #21935 and added it to the smtplib tests.
Also discovered that the direct test for the 'auth' method wasn't actually
testing anything and fixed it.

The fix makes the new authobject mechanism work the way it is
documented...the problem was that wasn't checking for a 334 return code
if an initial-response was provided, which works fine for auth plain
and cram-md5, but not for auth login.

files:
  Lib/smtplib.py           |   19 +-
  Lib/test/test_smtplib.py |  179 ++++++++++++++++++--------
  Misc/NEWS                |    2 +
  3 files changed, 132 insertions(+), 68 deletions(-)


diff --git a/Lib/smtplib.py b/Lib/smtplib.py
--- a/Lib/smtplib.py
+++ b/Lib/smtplib.py
@@ -630,12 +630,12 @@
             (code, resp) = self.docmd("AUTH", mechanism + " " + response)
         else:
             (code, resp) = self.docmd("AUTH", mechanism)
-            # Server replies with 334 (challenge) or 535 (not supported)
-            if code == 334:
-                challenge = base64.decodebytes(resp)
-                response = encode_base64(
-                    authobject(challenge).encode('ascii'), eol='')
-                (code, resp) = self.docmd(response)
+        # If server responds with a challenge, send the response.
+        if code == 334:
+            challenge = base64.decodebytes(resp)
+            response = encode_base64(
+                authobject(challenge).encode('ascii'), eol='')
+            (code, resp) = self.docmd(response)
         if code in (235, 503):
             return (code, resp)
         raise SMTPAuthenticationError(code, resp)
@@ -657,11 +657,10 @@
     def auth_login(self, challenge=None):
         """ Authobject to use with LOGIN authentication. Requires self.user and
         self.password to be set."""
-        (code, resp) = self.docmd(
-            encode_base64(self.user.encode('ascii'), eol=''))
-        if code == 334:
+        if challenge is None:
+            return self.user
+        else:
             return self.password
-        raise SMTPAuthenticationError(code, resp)
 
     def login(self, user, password, *, initial_response_ok=True):
         """Log in on an SMTP server that requires authentication.
diff --git a/Lib/test/test_smtplib.py b/Lib/test/test_smtplib.py
--- a/Lib/test/test_smtplib.py
+++ b/Lib/test/test_smtplib.py
@@ -1,8 +1,10 @@
 import asyncore
+import base64
 import email.mime.text
 from email.message import EmailMessage
 from email.base64mime import body_encode as encode_base64
 import email.utils
+import hmac
 import socket
 import smtpd
 import smtplib
@@ -623,20 +625,12 @@
 sim_auth = ('Mr.A at somewhere.com', 'somepassword')
 sim_cram_md5_challenge = ('PENCeUxFREJoU0NnbmhNWitOMjNGNn'
                           'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=')
-sim_auth_credentials = {
-    'login': 'TXIuQUBzb21ld2hlcmUuY29t',
-    'plain': 'AE1yLkFAc29tZXdoZXJlLmNvbQBzb21lcGFzc3dvcmQ=',
-    'cram-md5': ('TXIUQUBZB21LD2HLCMUUY29TIDG4OWQ0MJ'
-                 'KWZGQ4ODNMNDA4NTGXMDRLZWMYZJDMODG1'),
-    }
-sim_auth_login_user = 'TXIUQUBZB21LD2HLCMUUY29T'
-sim_auth_plain = 'AE1YLKFAC29TZXDOZXJLLMNVBQBZB21LCGFZC3DVCMQ='
-
 sim_lists = {'list-1':['Mr.A at somewhere.com','Mrs.C at somewhereesle.com'],
              'list-2':['Ms.B at xn--fo-fka.com',],
             }
 
 # Simulated SMTP channel & server
+class ResponseException(Exception): pass
 class SimSMTPChannel(smtpd.SMTPChannel):
 
     quit_response = None
@@ -646,12 +640,109 @@
     rcpt_count = 0
     rset_count = 0
     disconnect = 0
+    AUTH = 99    # Add protocol state to enable auth testing.
+    authenticated_user = None
 
     def __init__(self, extra_features, *args, **kw):
         self._extrafeatures = ''.join(
             [ "250-{0}\r\n".format(x) for x in extra_features ])
         super(SimSMTPChannel, self).__init__(*args, **kw)
 
+    # AUTH related stuff.  It would be nice if support for this were in smtpd.
+    def found_terminator(self):
+        if self.smtp_state == self.AUTH:
+            line = self._emptystring.join(self.received_lines)
+            print('Data:', repr(line), file=smtpd.DEBUGSTREAM)
+            self.received_lines = []
+            try:
+                self.auth_object(line)
+            except ResponseException as e:
+                self.smtp_state = self.COMMAND
+                self.push('%s %s' % (e.smtp_code, e.smtp_error))
+                return
+        super().found_terminator()
+
+
+    def smtp_AUTH(self, arg):
+        if not self.seen_greeting:
+            self.push('503 Error: send EHLO first')
+            return
+        if not self.extended_smtp or 'AUTH' not in self._extrafeatures:
+            self.push('500 Error: command "AUTH" not recognized')
+            return
+        if self.authenticated_user is not None:
+            self.push(
+                '503 Bad sequence of commands: already authenticated')
+            return
+        args = arg.split()
+        if len(args) not in [1, 2]:
+            self.push('501 Syntax: AUTH <mechanism> [initial-response]')
+            return
+        auth_object_name = '_auth_%s' % args[0].lower().replace('-', '_')
+        try:
+            self.auth_object = getattr(self, auth_object_name)
+        except AttributeError:
+            self.push('504 Command parameter not implemented: unsupported '
+                      ' authentication mechanism {!r}'.format(auth_object_name))
+            return
+        self.smtp_state = self.AUTH
+        self.auth_object(args[1] if len(args) == 2 else None)
+
+    def _authenticated(self, user, valid):
+        if valid:
+            self.authenticated_user = user
+            self.push('235 Authentication Succeeded')
+        else:
+            self.push('535 Authentication credentials invalid')
+        self.smtp_state = self.COMMAND
+
+    def _decode_base64(self, string):
+        return base64.decodebytes(string.encode('ascii')).decode('utf-8')
+
+    def _auth_plain(self, arg=None):
+        if arg is None:
+            self.push('334 ')
+        else:
+            logpass = self._decode_base64(arg)
+            try:
+                *_, user, password = logpass.split('\0')
+            except ValueError as e:
+                self.push('535 Splitting response {!r} into user and password'
+                          ' failed: {}'.format(logpass, e))
+                return
+            self._authenticated(user, password == sim_auth[1])
+
+    def _auth_login(self, arg=None):
+        if arg is None:
+            # base64 encoded 'Username:'
+            self.push('334 VXNlcm5hbWU6')
+        elif not hasattr(self, '_auth_login_user'):
+            self._auth_login_user = self._decode_base64(arg)
+            # base64 encoded 'Password:'
+            self.push('334 UGFzc3dvcmQ6')
+        else:
+            password = self._decode_base64(arg)
+            self._authenticated(self._auth_login_user, password == sim_auth[1])
+            del self._auth_login_user
+
+    def _auth_cram_md5(self, arg=None):
+        if arg is None:
+            self.push('334 {}'.format(sim_cram_md5_challenge))
+        else:
+            logpass = self._decode_base64(arg)
+            try:
+                user, hashed_pass = logpass.split()
+            except ValueError as e:
+                self.push('535 Splitting response {!r} into user and password'
+                          'failed: {}'.format(logpass, e))
+                return False
+            valid_hashed_pass = hmac.HMAC(
+                sim_auth[1].encode('ascii'),
+                self._decode_base64(sim_cram_md5_challenge).encode('ascii'),
+                'md5').hexdigest()
+            self._authenticated(user, hashed_pass == valid_hashed_pass)
+    # end AUTH related stuff.
+
     def smtp_EHLO(self, arg):
         resp = ('250-testhost\r\n'
                 '250-EXPN\r\n'
@@ -683,20 +774,6 @@
         else:
             self.push('550 No access for you!')
 
-    def smtp_AUTH(self, arg):
-        mech = arg.strip().lower()
-        if mech=='cram-md5':
-            self.push('334 {}'.format(sim_cram_md5_challenge))
-        elif mech not in sim_auth_credentials:
-            self.push('504 auth type unimplemented')
-            return
-        elif mech=='plain':
-            self.push('334 ')
-        elif mech=='login':
-            self.push('334 ')
-        else:
-            self.push('550 No access for you!')
-
     def smtp_QUIT(self, arg):
         if self.quit_response is None:
             super(SimSMTPChannel, self).smtp_QUIT(arg)
@@ -841,63 +918,49 @@
         self.assertEqual(smtp.expn(u), expected_unknown)
         smtp.quit()
 
-    # SimSMTPChannel doesn't fully support AUTH because it requires a
-    # synchronous read to obtain the credentials...so instead smtpd
-    # sees the credential sent by smtplib's login method as an unknown command,
-    # which results in smtplib raising an auth error.  Fortunately the error
-    # message contains the encoded credential, so we can partially check that it
-    # was generated correctly (partially, because the 'word' is uppercased in
-    # the error message).
-
     def testAUTH_PLAIN(self):
         self.serv.add_feature("AUTH PLAIN")
         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
-        try: smtp.login(sim_auth[0], sim_auth[1], initial_response_ok=False)
-        except smtplib.SMTPAuthenticationError as err:
-            self.assertIn(sim_auth_plain, str(err))
+        resp = smtp.login(sim_auth[0], sim_auth[1])
+        self.assertEqual(resp, (235, b'Authentication Succeeded'))
         smtp.close()
 
     def testAUTH_LOGIN(self):
         self.serv.add_feature("AUTH LOGIN")
         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
-        try: smtp.login(sim_auth[0], sim_auth[1])
-        except smtplib.SMTPAuthenticationError as err:
-            self.assertIn(sim_auth_login_user, str(err))
+        resp = smtp.login(sim_auth[0], sim_auth[1])
+        self.assertEqual(resp, (235, b'Authentication Succeeded'))
         smtp.close()
 
     def testAUTH_CRAM_MD5(self):
         self.serv.add_feature("AUTH CRAM-MD5")
         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
-
-        try: smtp.login(sim_auth[0], sim_auth[1])
-        except smtplib.SMTPAuthenticationError as err:
-            self.assertIn(sim_auth_credentials['cram-md5'], str(err))
+        resp = smtp.login(sim_auth[0], sim_auth[1])
+        self.assertEqual(resp, (235, b'Authentication Succeeded'))
         smtp.close()
 
     def testAUTH_multiple(self):
         # Test that multiple authentication methods are tried.
         self.serv.add_feature("AUTH BOGUS PLAIN LOGIN CRAM-MD5")
         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
-        try: smtp.login(sim_auth[0], sim_auth[1])
-        except smtplib.SMTPAuthenticationError as err:
-            self.assertIn(sim_auth_login_user, str(err))
+        resp = smtp.login(sim_auth[0], sim_auth[1])
+        self.assertEqual(resp, (235, b'Authentication Succeeded'))
         smtp.close()
 
     def test_auth_function(self):
-        smtp = smtplib.SMTP(HOST, self.port,
-                            local_hostname='localhost', timeout=15)
-        self.serv.add_feature("AUTH CRAM-MD5")
-        smtp.user, smtp.password = sim_auth[0], sim_auth[1]
-        supported = {'CRAM-MD5': smtp.auth_cram_md5,
-                     'PLAIN': smtp.auth_plain,
-                     'LOGIN': smtp.auth_login,
-                    }
-        for mechanism, method in supported.items():
-            try: smtp.auth(mechanism, method, initial_response_ok=False)
-            except smtplib.SMTPAuthenticationError as err:
-                self.assertIn(sim_auth_credentials[mechanism.lower()].upper(),
-                              str(err))
-        smtp.close()
+        supported = {'CRAM-MD5', 'PLAIN', 'LOGIN'}
+        for mechanism in supported:
+            self.serv.add_feature("AUTH {}".format(mechanism))
+        for mechanism in supported:
+            with self.subTest(mechanism=mechanism):
+                smtp = smtplib.SMTP(HOST, self.port,
+                                    local_hostname='localhost', timeout=15)
+                smtp.ehlo('foo')
+                smtp.user, smtp.password = sim_auth[0], sim_auth[1]
+                method = 'auth_' + mechanism.lower().replace('-', '_')
+                resp = smtp.auth(mechanism, getattr(smtp, method))
+                self.assertEqual(resp, (235, b'Authentication Succeeded'))
+                smtp.close()
 
     def test_quit_resets_greeting(self):
         smtp = smtplib.SMTP(HOST, self.port,
diff --git a/Misc/NEWS b/Misc/NEWS
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -58,6 +58,8 @@
 Library
 -------
 
+- Issue #25446: Fix regression in smtplib's AUTH LOGIN support.
+
 - Issue #18010: Fix the pydoc web server's module search function to handle
   exceptions from importing packages.
 

-- 
Repository URL: https://hg.python.org/cpython


More information about the Python-checkins mailing list