[Python-checkins] cpython (merge 3.5 -> default): Merge: #25446: Fix regression in smtplib's AUTH LOGIN support.

r.david.murray python-checkins at python.org
Sun Nov 8 01:06:01 EST 2015


https://hg.python.org/cpython/rev/7368b86432c6
changeset:   99010:7368b86432c6
parent:      99008:13550b3ca1a6
parent:      99009:d13263ecf0c6
user:        R David Murray <rdmurray at bitdance.com>
date:        Sun Nov 08 01:05:11 2015 -0500
summary:
  Merge: #25446: Fix regression in smtplib's AUTH LOGIN support.

files:
  Lib/smtplib.py           |   19 +-
  Lib/test/test_smtplib.py |  179 ++++++++++++++++++--------
  Misc/NEWS                |    2 +
  3 files changed, 132 insertions(+), 68 deletions(-)


diff --git a/Lib/smtplib.py b/Lib/smtplib.py
--- a/Lib/smtplib.py
+++ b/Lib/smtplib.py
@@ -630,12 +630,12 @@
             (code, resp) = self.docmd("AUTH", mechanism + " " + response)
         else:
             (code, resp) = self.docmd("AUTH", mechanism)
-            # Server replies with 334 (challenge) or 535 (not supported)
-            if code == 334:
-                challenge = base64.decodebytes(resp)
-                response = encode_base64(
-                    authobject(challenge).encode('ascii'), eol='')
-                (code, resp) = self.docmd(response)
+        # If server responds with a challenge, send the response.
+        if code == 334:
+            challenge = base64.decodebytes(resp)
+            response = encode_base64(
+                authobject(challenge).encode('ascii'), eol='')
+            (code, resp) = self.docmd(response)
         if code in (235, 503):
             return (code, resp)
         raise SMTPAuthenticationError(code, resp)
@@ -657,11 +657,10 @@
     def auth_login(self, challenge=None):
         """ Authobject to use with LOGIN authentication. Requires self.user and
         self.password to be set."""
-        (code, resp) = self.docmd(
-            encode_base64(self.user.encode('ascii'), eol=''))
-        if code == 334:
+        if challenge is None:
+            return self.user
+        else:
             return self.password
-        raise SMTPAuthenticationError(code, resp)
 
     def login(self, user, password, *, initial_response_ok=True):
         """Log in on an SMTP server that requires authentication.
diff --git a/Lib/test/test_smtplib.py b/Lib/test/test_smtplib.py
--- a/Lib/test/test_smtplib.py
+++ b/Lib/test/test_smtplib.py
@@ -1,8 +1,10 @@
 import asyncore
+import base64
 import email.mime.text
 from email.message import EmailMessage
 from email.base64mime import body_encode as encode_base64
 import email.utils
+import hmac
 import socket
 import smtpd
 import smtplib
@@ -623,20 +625,12 @@
 sim_auth = ('Mr.A at somewhere.com', 'somepassword')
 sim_cram_md5_challenge = ('PENCeUxFREJoU0NnbmhNWitOMjNGNn'
                           'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=')
-sim_auth_credentials = {
-    'login': 'TXIuQUBzb21ld2hlcmUuY29t',
-    'plain': 'AE1yLkFAc29tZXdoZXJlLmNvbQBzb21lcGFzc3dvcmQ=',
-    'cram-md5': ('TXIUQUBZB21LD2HLCMUUY29TIDG4OWQ0MJ'
-                 'KWZGQ4ODNMNDA4NTGXMDRLZWMYZJDMODG1'),
-    }
-sim_auth_login_user = 'TXIUQUBZB21LD2HLCMUUY29T'
-sim_auth_plain = 'AE1YLKFAC29TZXDOZXJLLMNVBQBZB21LCGFZC3DVCMQ='
-
 sim_lists = {'list-1':['Mr.A at somewhere.com','Mrs.C at somewhereesle.com'],
              'list-2':['Ms.B at xn--fo-fka.com',],
             }
 
 # Simulated SMTP channel & server
+class ResponseException(Exception): pass
 class SimSMTPChannel(smtpd.SMTPChannel):
 
     quit_response = None
@@ -646,12 +640,109 @@
     rcpt_count = 0
     rset_count = 0
     disconnect = 0
+    AUTH = 99    # Add protocol state to enable auth testing.
+    authenticated_user = None
 
     def __init__(self, extra_features, *args, **kw):
         self._extrafeatures = ''.join(
             [ "250-{0}\r\n".format(x) for x in extra_features ])
         super(SimSMTPChannel, self).__init__(*args, **kw)
 
+    # AUTH related stuff.  It would be nice if support for this were in smtpd.
+    def found_terminator(self):
+        if self.smtp_state == self.AUTH:
+            line = self._emptystring.join(self.received_lines)
+            print('Data:', repr(line), file=smtpd.DEBUGSTREAM)
+            self.received_lines = []
+            try:
+                self.auth_object(line)
+            except ResponseException as e:
+                self.smtp_state = self.COMMAND
+                self.push('%s %s' % (e.smtp_code, e.smtp_error))
+                return
+        super().found_terminator()
+
+
+    def smtp_AUTH(self, arg):
+        if not self.seen_greeting:
+            self.push('503 Error: send EHLO first')
+            return
+        if not self.extended_smtp or 'AUTH' not in self._extrafeatures:
+            self.push('500 Error: command "AUTH" not recognized')
+            return
+        if self.authenticated_user is not None:
+            self.push(
+                '503 Bad sequence of commands: already authenticated')
+            return
+        args = arg.split()
+        if len(args) not in [1, 2]:
+            self.push('501 Syntax: AUTH <mechanism> [initial-response]')
+            return
+        auth_object_name = '_auth_%s' % args[0].lower().replace('-', '_')
+        try:
+            self.auth_object = getattr(self, auth_object_name)
+        except AttributeError:
+            self.push('504 Command parameter not implemented: unsupported '
+                      ' authentication mechanism {!r}'.format(auth_object_name))
+            return
+        self.smtp_state = self.AUTH
+        self.auth_object(args[1] if len(args) == 2 else None)
+
+    def _authenticated(self, user, valid):
+        if valid:
+            self.authenticated_user = user
+            self.push('235 Authentication Succeeded')
+        else:
+            self.push('535 Authentication credentials invalid')
+        self.smtp_state = self.COMMAND
+
+    def _decode_base64(self, string):
+        return base64.decodebytes(string.encode('ascii')).decode('utf-8')
+
+    def _auth_plain(self, arg=None):
+        if arg is None:
+            self.push('334 ')
+        else:
+            logpass = self._decode_base64(arg)
+            try:
+                *_, user, password = logpass.split('\0')
+            except ValueError as e:
+                self.push('535 Splitting response {!r} into user and password'
+                          ' failed: {}'.format(logpass, e))
+                return
+            self._authenticated(user, password == sim_auth[1])
+
+    def _auth_login(self, arg=None):
+        if arg is None:
+            # base64 encoded 'Username:'
+            self.push('334 VXNlcm5hbWU6')
+        elif not hasattr(self, '_auth_login_user'):
+            self._auth_login_user = self._decode_base64(arg)
+            # base64 encoded 'Password:'
+            self.push('334 UGFzc3dvcmQ6')
+        else:
+            password = self._decode_base64(arg)
+            self._authenticated(self._auth_login_user, password == sim_auth[1])
+            del self._auth_login_user
+
+    def _auth_cram_md5(self, arg=None):
+        if arg is None:
+            self.push('334 {}'.format(sim_cram_md5_challenge))
+        else:
+            logpass = self._decode_base64(arg)
+            try:
+                user, hashed_pass = logpass.split()
+            except ValueError as e:
+                self.push('535 Splitting response {!r} into user and password'
+                          'failed: {}'.format(logpass, e))
+                return False
+            valid_hashed_pass = hmac.HMAC(
+                sim_auth[1].encode('ascii'),
+                self._decode_base64(sim_cram_md5_challenge).encode('ascii'),
+                'md5').hexdigest()
+            self._authenticated(user, hashed_pass == valid_hashed_pass)
+    # end AUTH related stuff.
+
     def smtp_EHLO(self, arg):
         resp = ('250-testhost\r\n'
                 '250-EXPN\r\n'
@@ -683,20 +774,6 @@
         else:
             self.push('550 No access for you!')
 
-    def smtp_AUTH(self, arg):
-        mech = arg.strip().lower()
-        if mech=='cram-md5':
-            self.push('334 {}'.format(sim_cram_md5_challenge))
-        elif mech not in sim_auth_credentials:
-            self.push('504 auth type unimplemented')
-            return
-        elif mech=='plain':
-            self.push('334 ')
-        elif mech=='login':
-            self.push('334 ')
-        else:
-            self.push('550 No access for you!')
-
     def smtp_QUIT(self, arg):
         if self.quit_response is None:
             super(SimSMTPChannel, self).smtp_QUIT(arg)
@@ -841,63 +918,49 @@
         self.assertEqual(smtp.expn(u), expected_unknown)
         smtp.quit()
 
-    # SimSMTPChannel doesn't fully support AUTH because it requires a
-    # synchronous read to obtain the credentials...so instead smtpd
-    # sees the credential sent by smtplib's login method as an unknown command,
-    # which results in smtplib raising an auth error.  Fortunately the error
-    # message contains the encoded credential, so we can partially check that it
-    # was generated correctly (partially, because the 'word' is uppercased in
-    # the error message).
-
     def testAUTH_PLAIN(self):
         self.serv.add_feature("AUTH PLAIN")
         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
-        try: smtp.login(sim_auth[0], sim_auth[1], initial_response_ok=False)
-        except smtplib.SMTPAuthenticationError as err:
-            self.assertIn(sim_auth_plain, str(err))
+        resp = smtp.login(sim_auth[0], sim_auth[1])
+        self.assertEqual(resp, (235, b'Authentication Succeeded'))
         smtp.close()
 
     def testAUTH_LOGIN(self):
         self.serv.add_feature("AUTH LOGIN")
         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
-        try: smtp.login(sim_auth[0], sim_auth[1])
-        except smtplib.SMTPAuthenticationError as err:
-            self.assertIn(sim_auth_login_user, str(err))
+        resp = smtp.login(sim_auth[0], sim_auth[1])
+        self.assertEqual(resp, (235, b'Authentication Succeeded'))
         smtp.close()
 
     def testAUTH_CRAM_MD5(self):
         self.serv.add_feature("AUTH CRAM-MD5")
         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
-
-        try: smtp.login(sim_auth[0], sim_auth[1])
-        except smtplib.SMTPAuthenticationError as err:
-            self.assertIn(sim_auth_credentials['cram-md5'], str(err))
+        resp = smtp.login(sim_auth[0], sim_auth[1])
+        self.assertEqual(resp, (235, b'Authentication Succeeded'))
         smtp.close()
 
     def testAUTH_multiple(self):
         # Test that multiple authentication methods are tried.
         self.serv.add_feature("AUTH BOGUS PLAIN LOGIN CRAM-MD5")
         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
-        try: smtp.login(sim_auth[0], sim_auth[1])
-        except smtplib.SMTPAuthenticationError as err:
-            self.assertIn(sim_auth_login_user, str(err))
+        resp = smtp.login(sim_auth[0], sim_auth[1])
+        self.assertEqual(resp, (235, b'Authentication Succeeded'))
         smtp.close()
 
     def test_auth_function(self):
-        smtp = smtplib.SMTP(HOST, self.port,
-                            local_hostname='localhost', timeout=15)
-        self.serv.add_feature("AUTH CRAM-MD5")
-        smtp.user, smtp.password = sim_auth[0], sim_auth[1]
-        supported = {'CRAM-MD5': smtp.auth_cram_md5,
-                     'PLAIN': smtp.auth_plain,
-                     'LOGIN': smtp.auth_login,
-                    }
-        for mechanism, method in supported.items():
-            try: smtp.auth(mechanism, method, initial_response_ok=False)
-            except smtplib.SMTPAuthenticationError as err:
-                self.assertIn(sim_auth_credentials[mechanism.lower()].upper(),
-                              str(err))
-        smtp.close()
+        supported = {'CRAM-MD5', 'PLAIN', 'LOGIN'}
+        for mechanism in supported:
+            self.serv.add_feature("AUTH {}".format(mechanism))
+        for mechanism in supported:
+            with self.subTest(mechanism=mechanism):
+                smtp = smtplib.SMTP(HOST, self.port,
+                                    local_hostname='localhost', timeout=15)
+                smtp.ehlo('foo')
+                smtp.user, smtp.password = sim_auth[0], sim_auth[1]
+                method = 'auth_' + mechanism.lower().replace('-', '_')
+                resp = smtp.auth(mechanism, getattr(smtp, method))
+                self.assertEqual(resp, (235, b'Authentication Succeeded'))
+                smtp.close()
 
     def test_quit_resets_greeting(self):
         smtp = smtplib.SMTP(HOST, self.port,
diff --git a/Misc/NEWS b/Misc/NEWS
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -79,6 +79,8 @@
 Library
 -------
 
+- Issue #25446: Fix regression in smtplib's AUTH LOGIN support.
+
 - Issue #18010: Fix the pydoc web server's module search function to handle
   exceptions from importing packages.
 

-- 
Repository URL: https://hg.python.org/cpython


More information about the Python-checkins mailing list