[Python-checkins] cpython (merge 3.5 -> default): Merge: #25446: Fix regression in smtplib's AUTH LOGIN support.
r.david.murray
python-checkins at python.org
Sun Nov 8 01:06:01 EST 2015
https://hg.python.org/cpython/rev/7368b86432c6
changeset: 99010:7368b86432c6
parent: 99008:13550b3ca1a6
parent: 99009:d13263ecf0c6
user: R David Murray <rdmurray at bitdance.com>
date: Sun Nov 08 01:05:11 2015 -0500
summary:
Merge: #25446: Fix regression in smtplib's AUTH LOGIN support.
files:
Lib/smtplib.py | 19 +-
Lib/test/test_smtplib.py | 179 ++++++++++++++++++--------
Misc/NEWS | 2 +
3 files changed, 132 insertions(+), 68 deletions(-)
diff --git a/Lib/smtplib.py b/Lib/smtplib.py
--- a/Lib/smtplib.py
+++ b/Lib/smtplib.py
@@ -630,12 +630,12 @@
(code, resp) = self.docmd("AUTH", mechanism + " " + response)
else:
(code, resp) = self.docmd("AUTH", mechanism)
- # Server replies with 334 (challenge) or 535 (not supported)
- if code == 334:
- challenge = base64.decodebytes(resp)
- response = encode_base64(
- authobject(challenge).encode('ascii'), eol='')
- (code, resp) = self.docmd(response)
+ # If server responds with a challenge, send the response.
+ if code == 334:
+ challenge = base64.decodebytes(resp)
+ response = encode_base64(
+ authobject(challenge).encode('ascii'), eol='')
+ (code, resp) = self.docmd(response)
if code in (235, 503):
return (code, resp)
raise SMTPAuthenticationError(code, resp)
@@ -657,11 +657,10 @@
def auth_login(self, challenge=None):
""" Authobject to use with LOGIN authentication. Requires self.user and
self.password to be set."""
- (code, resp) = self.docmd(
- encode_base64(self.user.encode('ascii'), eol=''))
- if code == 334:
+ if challenge is None:
+ return self.user
+ else:
return self.password
- raise SMTPAuthenticationError(code, resp)
def login(self, user, password, *, initial_response_ok=True):
"""Log in on an SMTP server that requires authentication.
diff --git a/Lib/test/test_smtplib.py b/Lib/test/test_smtplib.py
--- a/Lib/test/test_smtplib.py
+++ b/Lib/test/test_smtplib.py
@@ -1,8 +1,10 @@
import asyncore
+import base64
import email.mime.text
from email.message import EmailMessage
from email.base64mime import body_encode as encode_base64
import email.utils
+import hmac
import socket
import smtpd
import smtplib
@@ -623,20 +625,12 @@
sim_auth = ('Mr.A at somewhere.com', 'somepassword')
sim_cram_md5_challenge = ('PENCeUxFREJoU0NnbmhNWitOMjNGNn'
'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=')
-sim_auth_credentials = {
- 'login': 'TXIuQUBzb21ld2hlcmUuY29t',
- 'plain': 'AE1yLkFAc29tZXdoZXJlLmNvbQBzb21lcGFzc3dvcmQ=',
- 'cram-md5': ('TXIUQUBZB21LD2HLCMUUY29TIDG4OWQ0MJ'
- 'KWZGQ4ODNMNDA4NTGXMDRLZWMYZJDMODG1'),
- }
-sim_auth_login_user = 'TXIUQUBZB21LD2HLCMUUY29T'
-sim_auth_plain = 'AE1YLKFAC29TZXDOZXJLLMNVBQBZB21LCGFZC3DVCMQ='
-
sim_lists = {'list-1':['Mr.A at somewhere.com','Mrs.C at somewhereesle.com'],
'list-2':['Ms.B at xn--fo-fka.com',],
}
# Simulated SMTP channel & server
+class ResponseException(Exception): pass
class SimSMTPChannel(smtpd.SMTPChannel):
quit_response = None
@@ -646,12 +640,109 @@
rcpt_count = 0
rset_count = 0
disconnect = 0
+ AUTH = 99 # Add protocol state to enable auth testing.
+ authenticated_user = None
def __init__(self, extra_features, *args, **kw):
self._extrafeatures = ''.join(
[ "250-{0}\r\n".format(x) for x in extra_features ])
super(SimSMTPChannel, self).__init__(*args, **kw)
+ # AUTH related stuff. It would be nice if support for this were in smtpd.
+ def found_terminator(self):
+ if self.smtp_state == self.AUTH:
+ line = self._emptystring.join(self.received_lines)
+ print('Data:', repr(line), file=smtpd.DEBUGSTREAM)
+ self.received_lines = []
+ try:
+ self.auth_object(line)
+ except ResponseException as e:
+ self.smtp_state = self.COMMAND
+ self.push('%s %s' % (e.smtp_code, e.smtp_error))
+ return
+ super().found_terminator()
+
+
+ def smtp_AUTH(self, arg):
+ if not self.seen_greeting:
+ self.push('503 Error: send EHLO first')
+ return
+ if not self.extended_smtp or 'AUTH' not in self._extrafeatures:
+ self.push('500 Error: command "AUTH" not recognized')
+ return
+ if self.authenticated_user is not None:
+ self.push(
+ '503 Bad sequence of commands: already authenticated')
+ return
+ args = arg.split()
+ if len(args) not in [1, 2]:
+ self.push('501 Syntax: AUTH <mechanism> [initial-response]')
+ return
+ auth_object_name = '_auth_%s' % args[0].lower().replace('-', '_')
+ try:
+ self.auth_object = getattr(self, auth_object_name)
+ except AttributeError:
+ self.push('504 Command parameter not implemented: unsupported '
+ ' authentication mechanism {!r}'.format(auth_object_name))
+ return
+ self.smtp_state = self.AUTH
+ self.auth_object(args[1] if len(args) == 2 else None)
+
+ def _authenticated(self, user, valid):
+ if valid:
+ self.authenticated_user = user
+ self.push('235 Authentication Succeeded')
+ else:
+ self.push('535 Authentication credentials invalid')
+ self.smtp_state = self.COMMAND
+
+ def _decode_base64(self, string):
+ return base64.decodebytes(string.encode('ascii')).decode('utf-8')
+
+ def _auth_plain(self, arg=None):
+ if arg is None:
+ self.push('334 ')
+ else:
+ logpass = self._decode_base64(arg)
+ try:
+ *_, user, password = logpass.split('\0')
+ except ValueError as e:
+ self.push('535 Splitting response {!r} into user and password'
+ ' failed: {}'.format(logpass, e))
+ return
+ self._authenticated(user, password == sim_auth[1])
+
+ def _auth_login(self, arg=None):
+ if arg is None:
+ # base64 encoded 'Username:'
+ self.push('334 VXNlcm5hbWU6')
+ elif not hasattr(self, '_auth_login_user'):
+ self._auth_login_user = self._decode_base64(arg)
+ # base64 encoded 'Password:'
+ self.push('334 UGFzc3dvcmQ6')
+ else:
+ password = self._decode_base64(arg)
+ self._authenticated(self._auth_login_user, password == sim_auth[1])
+ del self._auth_login_user
+
+ def _auth_cram_md5(self, arg=None):
+ if arg is None:
+ self.push('334 {}'.format(sim_cram_md5_challenge))
+ else:
+ logpass = self._decode_base64(arg)
+ try:
+ user, hashed_pass = logpass.split()
+ except ValueError as e:
+ self.push('535 Splitting response {!r} into user and password'
+ 'failed: {}'.format(logpass, e))
+ return False
+ valid_hashed_pass = hmac.HMAC(
+ sim_auth[1].encode('ascii'),
+ self._decode_base64(sim_cram_md5_challenge).encode('ascii'),
+ 'md5').hexdigest()
+ self._authenticated(user, hashed_pass == valid_hashed_pass)
+ # end AUTH related stuff.
+
def smtp_EHLO(self, arg):
resp = ('250-testhost\r\n'
'250-EXPN\r\n'
@@ -683,20 +774,6 @@
else:
self.push('550 No access for you!')
- def smtp_AUTH(self, arg):
- mech = arg.strip().lower()
- if mech=='cram-md5':
- self.push('334 {}'.format(sim_cram_md5_challenge))
- elif mech not in sim_auth_credentials:
- self.push('504 auth type unimplemented')
- return
- elif mech=='plain':
- self.push('334 ')
- elif mech=='login':
- self.push('334 ')
- else:
- self.push('550 No access for you!')
-
def smtp_QUIT(self, arg):
if self.quit_response is None:
super(SimSMTPChannel, self).smtp_QUIT(arg)
@@ -841,63 +918,49 @@
self.assertEqual(smtp.expn(u), expected_unknown)
smtp.quit()
- # SimSMTPChannel doesn't fully support AUTH because it requires a
- # synchronous read to obtain the credentials...so instead smtpd
- # sees the credential sent by smtplib's login method as an unknown command,
- # which results in smtplib raising an auth error. Fortunately the error
- # message contains the encoded credential, so we can partially check that it
- # was generated correctly (partially, because the 'word' is uppercased in
- # the error message).
-
def testAUTH_PLAIN(self):
self.serv.add_feature("AUTH PLAIN")
smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
- try: smtp.login(sim_auth[0], sim_auth[1], initial_response_ok=False)
- except smtplib.SMTPAuthenticationError as err:
- self.assertIn(sim_auth_plain, str(err))
+ resp = smtp.login(sim_auth[0], sim_auth[1])
+ self.assertEqual(resp, (235, b'Authentication Succeeded'))
smtp.close()
def testAUTH_LOGIN(self):
self.serv.add_feature("AUTH LOGIN")
smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
- try: smtp.login(sim_auth[0], sim_auth[1])
- except smtplib.SMTPAuthenticationError as err:
- self.assertIn(sim_auth_login_user, str(err))
+ resp = smtp.login(sim_auth[0], sim_auth[1])
+ self.assertEqual(resp, (235, b'Authentication Succeeded'))
smtp.close()
def testAUTH_CRAM_MD5(self):
self.serv.add_feature("AUTH CRAM-MD5")
smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
-
- try: smtp.login(sim_auth[0], sim_auth[1])
- except smtplib.SMTPAuthenticationError as err:
- self.assertIn(sim_auth_credentials['cram-md5'], str(err))
+ resp = smtp.login(sim_auth[0], sim_auth[1])
+ self.assertEqual(resp, (235, b'Authentication Succeeded'))
smtp.close()
def testAUTH_multiple(self):
# Test that multiple authentication methods are tried.
self.serv.add_feature("AUTH BOGUS PLAIN LOGIN CRAM-MD5")
smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
- try: smtp.login(sim_auth[0], sim_auth[1])
- except smtplib.SMTPAuthenticationError as err:
- self.assertIn(sim_auth_login_user, str(err))
+ resp = smtp.login(sim_auth[0], sim_auth[1])
+ self.assertEqual(resp, (235, b'Authentication Succeeded'))
smtp.close()
def test_auth_function(self):
- smtp = smtplib.SMTP(HOST, self.port,
- local_hostname='localhost', timeout=15)
- self.serv.add_feature("AUTH CRAM-MD5")
- smtp.user, smtp.password = sim_auth[0], sim_auth[1]
- supported = {'CRAM-MD5': smtp.auth_cram_md5,
- 'PLAIN': smtp.auth_plain,
- 'LOGIN': smtp.auth_login,
- }
- for mechanism, method in supported.items():
- try: smtp.auth(mechanism, method, initial_response_ok=False)
- except smtplib.SMTPAuthenticationError as err:
- self.assertIn(sim_auth_credentials[mechanism.lower()].upper(),
- str(err))
- smtp.close()
+ supported = {'CRAM-MD5', 'PLAIN', 'LOGIN'}
+ for mechanism in supported:
+ self.serv.add_feature("AUTH {}".format(mechanism))
+ for mechanism in supported:
+ with self.subTest(mechanism=mechanism):
+ smtp = smtplib.SMTP(HOST, self.port,
+ local_hostname='localhost', timeout=15)
+ smtp.ehlo('foo')
+ smtp.user, smtp.password = sim_auth[0], sim_auth[1]
+ method = 'auth_' + mechanism.lower().replace('-', '_')
+ resp = smtp.auth(mechanism, getattr(smtp, method))
+ self.assertEqual(resp, (235, b'Authentication Succeeded'))
+ smtp.close()
def test_quit_resets_greeting(self):
smtp = smtplib.SMTP(HOST, self.port,
diff --git a/Misc/NEWS b/Misc/NEWS
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -79,6 +79,8 @@
Library
-------
+- Issue #25446: Fix regression in smtplib's AUTH LOGIN support.
+
- Issue #18010: Fix the pydoc web server's module search function to handle
exceptions from importing packages.
--
Repository URL: https://hg.python.org/cpython
More information about the Python-checkins
mailing list